Table of Contents

KNet for OPC (KNetOPC): usage

Here a step by step guide to use the current available packages.

Before start any operation take a look at the following resources:

The following chapters describe configuration and usage.

KNet for OPC Source connector

The KNet for OPC Source connector is an OPC-UA connector for Apache Kafka™ Connect which connects to any OPC-UA Server and transfers data to one or multiple topics of an Apache Kafka™ Cluster. The connector shall be installed following the standard instruction available on Apache Kafka™ Connect and Confluent - How to Use Kafka Connect When the connector is installed it is possible to configure it. To do that there are some files to be changed based on the target environment and Apache Kafka™ Connect behavior.

Standalone configuration files

The following is a description of the file used to start the connector in standalone mode:

  • connect-knet-opc-source.properties: it is the file which contains some info used from Apache Kafka™ Connect to start the connector; the most important properties are:
    • name (mandatory): the name of the connector within Apache Kafka™ Connect
    • connector.class=org.mases.knetopc.KNetOPCSourceConnector (mandatory): DO NOT CHANGE since it is the class used from Apache Kafka™ Connect to load the connector
    • tasks.max (optional): set to the maximum number of tasks which Apache Kafka™ Connect can allocate
    • key.converter=org.apache.kafka.connect.storage.StringConverter (mandatory): DO NOT CHANGE since it is the class used from Apache Kafka™ Connect for data conversion and this value is expected from KNet for OPC Source connector
    • value.converter=org.apache.kafka.connect.storage.StringConverter (mandatory): DO NOT CHANGE since it is the class used from Apache Kafka™ Connect for data conversion and this value is expected from KNet for OPC Source connector
    • knet.dotnet.classname=MASES.KNet.OPC.Source.KNetOPCSource, MASES.KNet.OPC.Source (mandatory): DO NOT CHANGE since it is the class used from KNet Connect SDK to load .NET side assemblies
    • knet.opc.configfilename=KNetOPC.json (mandatory): the KNetOPC.json file defines how the KNet for OPC Source connector works; leave the connector to search for it or set the path to the absolute location of KNetOPC.json, pointing to the etc subfolder of the KNet for OPC Source connector package
    • knet.opc.source.maxdatachangeinpoll=100 (optional): defines the maximum data changes shall be returned back on each Apache Kafka™ Connect request cycle
    • knet.opc.source.splitsubscriptionbytask=true (optional): if true the subscriptions defined in KNetOPC.json are managed from multiple tasks (see tasks.max)

Since KNet for OPC Source connector supports many encodings (Binary, Xml, Json) the previous configuration is defined for Json and Xml encoding. A Binary encoding needs some changes in the configuration and it is available a pre-made file:

  • connect-knet-opc-source-binary.properties: it is the file which contains some info used from Apache Kafka™ Connect to start the connector in Binary encoding mode; the most important properties are:
    • name=knet-opc-source (mandatory): the name of the connector within Apache Kafka™ Connect
    • connector.class=org.mases.knetopc.KNetOPCSourceConnector (mandatory): DO NOT CHANGE since it is the class used from Apache Kafka™ Connect to load the connector
    • tasks.max=1 (optional): set to the maximum number of tasks which Apache Kafka™ Connect can allocate
    • key.converter=org.apache.kafka.connect.storage.StringConverter (mandatory): DO NOT CHANGE since it is the class used from Apache Kafka™ Connect for data conversion and this value is expected from KNet for OPC Source connector
    • value.converter=org.apache.kafka.connect.converters.ByteArrayConverter (mandatory): DO NOT CHANGE since it is the class used from Apache Kafka™ Connect for data conversion and this value is expected from KNet for OPC Source connector
    • knet.dotnet.classname=MASES.KNet.OPC.Source.KNetOPCSource, MASES.KNet.OPC.Source (mandatory): DO NOT CHANGE since it is the class used from KNet Connect SDK to load .NET side assemblies
    • knet.opc.configfilename=KNetOPCBinary.json (mandatory): the KNetOPCBinary.json file defines how the KNet for OPC Source connector works; leave the connector to search for it or set the path to the absolute location of KNetOPCBinary.json, pointing to the etc subfolder of the KNet for OPC Source connector package
    • knet.opc.source.maxdatachangeinpoll=100 (optional): defines the maximum data changes shall be returned back on each Apache Kafka™ Connect request cycle
    • knet.opc.source.splitsubscriptionbytask=true (optional): if true the subscriptions defined in KNetOPCBinary.json are managed from multiple tasks (see tasks.max)

KNetOPC.json, or KNetOPCBinary.json, are described in a specific chapter.

Distrubuted configuration

The following is a description of the file used to start the connector in distrubuted mode. Once the Apache Kafka™ Connect runtime was started, it exposes a REST endpoint can be used to instruct the runtime to load, start, stop, pause, resume connectors. The behavior is managed using json files, or strings, sent to the REST endpoint. The configuration used from KNet for OPC Source connector is available in connect-knet-opc-source.json and it is like the following:

{
  "name": "knet-opc-source",
  "config": {
    "tasks.max": "1",
    "connector.class": "org.mases.knetopc.KNetOPCSourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "knet.dotnet.classname": "MASES.KNet.OPC.Source.KNetOPCSource, MASES.KNet.OPC.Source",
    "knet.opc.configfilename": "KNetOPC.json"
  }
}

the properties follows the same rules described in the Standalone configuration files, just change the location and they are under the config node of the Json.

As described in Standalone configuration files, the Binary encoding needs some changes and they are available in a premade specific file connect-knet-opc-source-binary.json and it is like the following:

{
  "name": "knet-opc-source-binary",
  "config": {
    "tasks.max": "1",
    "connector.class": "org.mases.knetopc.KNetOPCSourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "knet.dotnet.classname": "MASES.KNet.OPC.Source.KNetOPCSource, MASES.KNet.OPC.Source",
    "knet.opc.configfilename": "KNetOPCBinary.json"
  }
}

KNetOPC.json, or KNetOPCBinary.json, are described in a specific chapter.

Connector start/stop/resume and so on

When the configuration files are finalized to start the connector in distributed mode the REST endpoint shall be used. Here below a list of useful commands, for further information see [Kafka Connect REST Interface for Confluent Platform](https://docs.confluent.io/platform/current/connect/references/restapi.html.

  • List the registered plugins:
curl http://localhost:8083/connector-plugins/
  • List the registered connectors:
curl http://localhost:8083/connectors/
  • Reports the configuration information for the specific connector:
curl http://localhost:8083/connectors/knet-opc-source
  • Register a configuration and start the connector:
curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors --data "@connect-knet-opc-source.json"
curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors --data "@connect-knet-opc-source-binary.json"
  • Using the values of name from the previous examples:

    • Stop: stops gracefully the exection of the specific connector instance
    curl -X PUT http://localhost:8083/connectors/knet-opc-source/stop
    
    curl -X PUT http://localhost:8083/connectors/knet-opc-source-binary/stop
    
    • Resume: resume the execution of the specific connector instance;

      • if the connector was explicitly stopped, with this command the connector restarts
      curl -X PUT http://localhost:8083/connectors/knet-opc-source/resume
      
      curl -X PUT http://localhost:8083/connectors/knet-opc-source-binary/resume
      
      • if the connector failed, with this command the connector restarts
      curl -X POST http://localhost:8083/connectors/knet-opc-source/restart
      
      curl -X POST http://localhost:8083/connectors/knet-opc-source-binary/restart
      

KNetOPC-KNetOPCBinary file

KNetOPC.json, or KNetOPCBinary.json, contains OPC-UA specific configuration information used from the KNet for OPC Source connector. The reason of an external file is related to the limitations of the properties can be used directly from the Apache Kafka™ Connect runtime configuration files, while KNetOPC.json, or KNetOPCBinary.json, uses nested Json structures.

Examples of the files are:

  • KNetOPC.json: in this case the encoding used is Json
{
  "ApplicationName": "TestConsole",
  "ApplicationVersion": "1.0.0",
  "SessionConfiguration": {
    "ConfigFilename": "KNetOPC.Config.xml",
    "EndPoint": "opc.tcp://172.31.48.1:62640/masesgroup/ServerSimulator",
    "SessionName": "TestConsole"
  },
  "Subscriptions": [
    {
      "DefaultConnectParameters": {
        "Topic": "myTopic",
        "EncodingType": "Json"
      },
      "MonitoredItems": [
        {
          "NodeId": "ns=2;s=Realtimedata",
          "IsRoot": true
        }
      ]
    }
  ]
}
  • KNetOPCBinary.json: in this case the encoding used is Binary and shall be used only when the connector starts with connect-knet-opc-source-binary.properties (standalone) or connect-knet-opc-source-binary.json (distributed)
{
  "ApplicationName": "TestConsole",
  "ApplicationVersion": "1.0.0",
  "SessionConfiguration": {
    "ConfigFilename": "KNetOPC.Config.xml",
    "EndPoint": "opc.tcp://172.31.48.1:62640/masesgroup/ServerSimulator",
    "SessionName": "TestConsole"
  },
  "Subscriptions": [
    {
      "DefaultConnectParameters": {
        "Topic": "myTopic",
        "EncodingType": "Binary"
      },
      "MonitoredItems": [
        {
          "NodeId": "ns=2;s=Realtimedata",
          "IsRoot": true
        }
      ]
    }
  ]
}

Here a basic description of the most notable properties in the configuration:

  • ApplicationName: the user defined name associated to the application where KNet for OPC Source connector is in use; can be used to describe the published data and can be added to the record key
  • ApplicationVersion: the user defined version associated to the application where KNet for OPC Source connector is in use; can be used to describe the published data and can be added to the record key
  • SessionConfiguration: it is a mandatory section which describes how to connect to the OPC-UA Server
    • ConfigFilename: it is a mandatory configuration file loaded from OPC-UA Stack, for further information take a look at the resources of OPC Foundation
    • EndPoint: it is the URL, exposed from OPC-UA Server, where KNet for OPC Source connector will connect
    • SessionName: it is the name of the session, when requested from KNet for OPC Source connector; it is optional and fallbacks to an internal identifier if not set
  • Subscriptions: an array of elements describing how and what subscribes to receive change notifications
    • DefaultConnectParameters: it contains the default values used for all MonitoredItems when data are pushed from Apache Kafka™ Connect
      • Topic: the topic where the data will be sent
      • EncodingType: the type of encoding to be used, if not set defaults to Json
    • MonitoredItems: an array of elements describing the items to be subscribed
      • NodeId: represents the node, expressed in the OPC-UA format, to be subscribed
      • IsRoot: inform the KNet for OPC Source connector that NodeId represents the root of a tree to be traversed and the connector subscribes to all childs of NodeId; if not available, or set to false, only the specific node will be subscribed

The configuration contains many other optional properties which overrides default values. For a complete description see KNet for OPC Configuration.