Table of Contents

KNet for OPC (KNetOPC): Introduction to connectors

Overview

KNetOPC connectors are Apache Kafka™ Connect plugins that bridge OPC-UA servers and Apache Kafka™ clusters. Two connectors are available:

  • Source Connector (org.mases.knetopc.source.KNetOPCSourceConnector): subscribes to one or more OPC-UA nodes and publishes each change notification as a record to one or more Kafka topics.
  • Sink Connector (org.mases.knetopc.sink.KNetOPCSinkConnector): receives command records from a Kafka topic, executes the corresponding OPC-UA operation (Read, Write, Call, and more) on the server, and publishes the result to a separate response topic.

Both connectors integrate natively with KNet Connect SDK and follow the standard Kafka Connect deployment and configuration model — standalone or distributed — with no changes to the Connect runtime itself.

Note

Before deploying KNetOPC connectors, review the following resources:

Architecture

KNetOPC connectors have a two-layer architecture:

  • Java layer: registers the connector within the Apache Kafka™ Connect runtime, exposes all configuration properties, and handles the Connect lifecycle (task allocation, poll cycles, offset management). Each connector is identified by its fully-qualified Java class name (org.mases.knetopc.source.KNetOPCSourceConnector and org.mases.knetopc.sink.KNetOPCSinkConnector).
  • .NET layer: contains the OPC-UA client logic — session management, node subscription, data encoding, and request/response dispatch. The Java layer delegates all OPC-UA operations to the corresponding .NET class via KNet Connect SDK. Both connectors are implemented in the same .NET assembly MASES.KNet.OPC.

This design keeps OPC-UA–specific logic in .NET while remaining fully compatible with the standard Kafka Connect infrastructure, including predicates, transformations, and the distributed REST management API.

Data flow

Source Connector

OPC-UA Server
    └─► KNet Connect SDK receives change notification
            └─► Predicates evaluated (record filtered if false)
                    └─► Transformations applied
                            └─► Record published to Apache Kafka™ topic

Sink Connector

.NET application (or KNetOPCClient)
    └─► Publishes command record to request topic (topics=)
            └─► Sink Connector receives command
                    └─► Executes OPC-UA operation (Read / Write / Call / ...)
                            └─► Publishes result to response topic (knet.opc.sink.response.topic)
                                    └─► .NET application receives response

Supported encodings (Source Connector)

The Source Connector supports three OPC-UA encoding formats. The encoding is configured in the OPC-UA configuration file (see Source connector configuration) and determines the Kafka Connect value converter that must be used:

Encoding Value converter
JSON org.apache.kafka.connect.storage.StringConverter
XML org.apache.kafka.connect.storage.StringConverter
Binary org.apache.kafka.connect.converters.ByteArrayConverter
Note

The Binary encoding requires a dedicated connector properties file because the value converter differs from the JSON/XML case. Pre-made configuration files are provided in the connector package for each encoding variant.

The Sink Connector always uses org.apache.kafka.connect.converters.ByteArrayConverter for both key and value converters.

Deployment

Kafka Connect connectors can be deployed in two modes. KNetOPC supports both.

Standalone mode

In standalone mode a single worker process hosts the connector. The connector is configured through a .properties file passed to the Connect worker at startup.

The Source Connector ships with two pre-made standalone configuration files:

  • connect-knet-opc-source.properties — for JSON and XML encoding.
  • connect-knet-opc-source-binary.properties — for Binary encoding.

The Sink Connector ships with one pre-made standalone configuration file:

  • connect-knet-opc-sink.properties

Distributed mode

In distributed mode the Connect runtime exposes a REST API. Connectors are registered, started, stopped, paused, and resumed by sending JSON payloads to that endpoint.

The Source Connector ships with two pre-made distributed configuration files:

  • connect-knet-opc-source.json — for JSON and XML encoding.
  • connect-knet-opc-source-binary.json — for Binary encoding.

The Sink Connector ships with one pre-made distributed configuration file:

  • connect-knet-opc-sink.json

KNet for OPC Source connector

Connector configuration properties

Property Required Default / fixed value Description
name Yes The connector instance name within the Kafka Connect runtime.
connector.class Yes org.mases.knetopc.source.KNetOPCSourceConnector DO NOT CHANGE.
tasks.max No 1 Maximum number of tasks the Connect runtime may allocate to this connector.
key.converter Yes org.apache.kafka.connect.storage.StringConverter DO NOT CHANGE.
value.converter Yes org.apache.kafka.connect.storage.StringConverter (JSON/XML) or org.apache.kafka.connect.converters.ByteArrayConverter (Binary) DO NOT CHANGE. Must match the encoding configured in the OPC-UA configuration file.
knet.opc.config.filename Yes* Path to the OPC-UA configuration file (KNetOPCSource.json or KNetOPCSourceBinary.json). If left as a bare filename, the connector searches in the package's etc subfolder.
knet.opc.config.content Yes* The OPC-UA configuration file encoded as a Base64 string, generated with the MASES.KNet.OPC.Configuration.Converter tool.
knet.opc.source.maxdatachangeinpoll No 100 Maximum number of change notifications returned in a single Connect poll cycle.
knet.opc.source.splitsubscriptionbytask No true When true, subscriptions are distributed across multiple tasks (up to tasks.max).

*Exactly one of knet.opc.config.filename and knet.opc.config.content must be provided.

For the full OPC-UA configuration file reference see Source connector configuration.

Examples

Standalone — JSON encoding

name=knet-opc-source
connector.class=org.mases.knetopc.source.KNetOPCSourceConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
knet.opc.config.filename=KNetOPCSource.json
knet.opc.source.maxdatachangeinpoll=100
knet.opc.source.splitsubscriptionbytask=true

KNetOPCSource.json

{
  "ApplicationName": "MyApp",
  "ApplicationVersion": "1.0.0",
  "SessionConfiguration": {
    "ConfigFilename": "KNetOPC.Config.xml",
    "EndPoint": "opc.tcp://172.31.48.1:62640/masesgroup/ServerSimulator",
    "SessionName": "MyApp"
  },
  "Subscriptions": [
    {
      "DefaultConnectParameters": {
        "Topic": "myTopic",
        "EncodingType": 2
      },
      "MonitoredItems": [
        {
          "NodeId": "ns=2;s=Realtimedata",
          "IsRoot": true
        }
      ]
    }
  ]
}

Standalone — Binary encoding

name=knet-opc-source-binary
connector.class=org.mases.knetopc.source.KNetOPCSourceConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
knet.opc.config.filename=KNetOPCSourceBinary.json

KNetOPCSourceBinary.json — identical structure to KNetOPCSource.json with "EncodingType": 0.

Distributed mode

{
  "name": "knet-opc-source",
  "config": {
    "tasks.max": "1",
    "connector.class": "org.mases.knetopc.source.KNetOPCSourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "knet.opc.config.filename": "KNetOPCSource.json"
  }
}

KNet for OPC Sink connector

The KNet for OPC Sink connector implements a request/response pattern over Kafka: it reads command records from a request topic (topics), executes the corresponding OPC-UA operation on the server, and publishes the result to a separate response topic (knet.opc.sink.response.topic).

This design allows any .NET application to interact with an OPC-UA server through the standard Kafka transport — or through the KNetOPCSinkClient class, which implements the OPC Foundation ISessionClient interface with Kafka as the transport layer.

Important

tasks.max and max.poll.records must both be left at 1. The Sink connector requires strict sequentiality between request and response. Do not increase these values.

Connector configuration properties

Property Required Default / fixed value Description
name Yes The connector instance name within the Kafka Connect runtime.
connector.class Yes org.mases.knetopc.sink.KNetOPCSinkConnector DO NOT CHANGE.
tasks.max Yes 1 DO NOT CHANGE. Must be 1 to preserve request/response sequentiality.
max.poll.records Yes 1 DO NOT CHANGE. Must be 1 to process one command at a time.
topics Yes The Kafka topic from which the connector reads command records. Must match the RequestTopic configured in KNetOPCClient.
key.converter Yes org.apache.kafka.connect.storage.StringConverter DO NOT CHANGE.
value.converter Yes org.apache.kafka.connect.converters.ByteArrayConverter DO NOT CHANGE.
knet.opc.config.filename Yes* Path to the OPC-UA configuration file (KNetOPCSink.json).
knet.opc.config.content Yes* The OPC-UA configuration file encoded as a Base64 string.
knet.opc.sink.bootstrap.servers Yes Bootstrap servers address of the Kafka cluster used to publish responses (e.g. localhost:9092).
knet.opc.sink.response.topic Yes The Kafka topic to which the connector writes result records. Must match the ResponseTopic configured in KNetOPCClient.
knet.opc.sink.enable.transactions No false When true, the connector uses a transactional producer to publish responses.

*Exactly one of knet.opc.config.filename and knet.opc.config.content must be provided.

For the full OPC-UA configuration file reference see Sink connector configuration.

Standalone configuration example

name=knet-opc-sink
connector.class=org.mases.knetopc.sink.KNetOPCSinkConnector
# Leave both tasks.max and max.poll.records untouched: the connector checks these values at runtime
tasks.max=1
max.poll.records=1
# The topic where requests are written and will be read from the connector
topics=topic-request

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

knet.opc.config.filename=KNetOPCSink.json

# The address of the broker used to write results
knet.opc.sink.bootstrap.servers=localhost:9092
# The topic where responses will be written
knet.opc.sink.response.topic=topic-response
knet.opc.sink.enable.transactions=false

Distributed configuration example

{
  "name": "knet-opc-sink",
  "config": {
    "tasks.max": "1",
    "max.poll.records": "1",
    "connector.class": "org.mases.knetopc.sink.KNetOPCSinkConnector",
    "topics": "topic-request",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "knet.opc.config.filename": "KNetOPCSink.json",
    "knet.opc.sink.bootstrap.servers": "localhost:9092",
    "knet.opc.sink.response.topic": "topic-response",
    "knet.opc.sink.enable.transactions": "false"
  }
}

REST API — distributed mode management

Once the Connect runtime is started, use the REST endpoint to manage connectors. For a full reference see Kafka Connect REST Interface.

# List registered plugins
curl http://localhost:8083/connector-plugins/

# List registered connectors
curl http://localhost:8083/connectors/

# Inspect a connector's configuration
curl http://localhost:8083/connectors/knet-opc-source

# Register and start a connector
curl -X POST -H "Content-Type: application/json" \
  http://localhost:8083/connectors \
  --data "@connect-knet-opc-source.json"

# Stop a connector
curl -X PUT http://localhost:8083/connectors/knet-opc-source/stop

# Resume a connector (after an explicit stop)
curl -X PUT http://localhost:8083/connectors/knet-opc-source/resume

# Restart a connector (after a failure)
curl -X POST http://localhost:8083/connectors/knet-opc-source/restart