KNet for OPC (KNetOPC): Introduction to connectors
Overview
KNetOPC connectors are Apache Kafka™ Connect plugins that bridge OPC-UA servers and Apache Kafka™ clusters. Two connectors are available:
- Source Connector (
org.mases.knetopc.source.KNetOPCSourceConnector): subscribes to one or more OPC-UA nodes and publishes each change notification as a record to one or more Kafka topics. - Sink Connector (
org.mases.knetopc.sink.KNetOPCSinkConnector): receives command records from a Kafka topic, executes the corresponding OPC-UA operation (Read, Write, Call, and more) on the server, and publishes the result to a separate response topic.
Both connectors integrate natively with KNet Connect SDK and follow the standard Kafka Connect deployment and configuration model — standalone or distributed — with no changes to the Connect runtime itself.
Note
Before deploying KNetOPC connectors, review the following resources:
Architecture
KNetOPC connectors have a two-layer architecture:
- Java layer: registers the connector within the Apache Kafka™ Connect runtime, exposes all configuration properties, and handles the Connect lifecycle (task allocation, poll cycles, offset management). Each connector is identified by its fully-qualified Java class name (
org.mases.knetopc.source.KNetOPCSourceConnectorandorg.mases.knetopc.sink.KNetOPCSinkConnector). - .NET layer: contains the OPC-UA client logic — session management, node subscription, data encoding, and request/response dispatch. The Java layer delegates all OPC-UA operations to the corresponding .NET class via KNet Connect SDK. Both connectors are implemented in the same .NET assembly
MASES.KNet.OPC.
This design keeps OPC-UA–specific logic in .NET while remaining fully compatible with the standard Kafka Connect infrastructure, including predicates, transformations, and the distributed REST management API.
Data flow
Source Connector
OPC-UA Server
└─► KNet Connect SDK receives change notification
└─► Predicates evaluated (record filtered if false)
└─► Transformations applied
└─► Record published to Apache Kafka™ topic
Sink Connector
.NET application (or KNetOPCClient)
└─► Publishes command record to request topic (topics=)
└─► Sink Connector receives command
└─► Executes OPC-UA operation (Read / Write / Call / ...)
└─► Publishes result to response topic (knet.opc.sink.response.topic)
└─► .NET application receives response
Supported encodings (Source Connector)
The Source Connector supports three OPC-UA encoding formats. The encoding is configured in the OPC-UA configuration file (see Source connector configuration) and determines the Kafka Connect value converter that must be used:
| Encoding | Value converter |
|---|---|
| JSON | org.apache.kafka.connect.storage.StringConverter |
| XML | org.apache.kafka.connect.storage.StringConverter |
| Binary | org.apache.kafka.connect.converters.ByteArrayConverter |
Note
The Binary encoding requires a dedicated connector properties file because the value converter differs from the JSON/XML case. Pre-made configuration files are provided in the connector package for each encoding variant.
The Sink Connector always uses org.apache.kafka.connect.converters.ByteArrayConverter for both key and value converters.
Deployment
Kafka Connect connectors can be deployed in two modes. KNetOPC supports both.
Standalone mode
In standalone mode a single worker process hosts the connector. The connector is configured through a .properties file passed to the Connect worker at startup.
The Source Connector ships with two pre-made standalone configuration files:
connect-knet-opc-source.properties— for JSON and XML encoding.connect-knet-opc-source-binary.properties— for Binary encoding.
The Sink Connector ships with one pre-made standalone configuration file:
connect-knet-opc-sink.properties
Distributed mode
In distributed mode the Connect runtime exposes a REST API. Connectors are registered, started, stopped, paused, and resumed by sending JSON payloads to that endpoint.
The Source Connector ships with two pre-made distributed configuration files:
connect-knet-opc-source.json— for JSON and XML encoding.connect-knet-opc-source-binary.json— for Binary encoding.
The Sink Connector ships with one pre-made distributed configuration file:
connect-knet-opc-sink.json
KNet for OPC Source connector
Connector configuration properties
| Property | Required | Default / fixed value | Description |
|---|---|---|---|
name |
Yes | — | The connector instance name within the Kafka Connect runtime. |
connector.class |
Yes | org.mases.knetopc.source.KNetOPCSourceConnector |
DO NOT CHANGE. |
tasks.max |
No | 1 |
Maximum number of tasks the Connect runtime may allocate to this connector. |
key.converter |
Yes | org.apache.kafka.connect.storage.StringConverter |
DO NOT CHANGE. |
value.converter |
Yes | org.apache.kafka.connect.storage.StringConverter (JSON/XML) or org.apache.kafka.connect.converters.ByteArrayConverter (Binary) |
DO NOT CHANGE. Must match the encoding configured in the OPC-UA configuration file. |
knet.opc.config.filename |
Yes* | — | Path to the OPC-UA configuration file (KNetOPCSource.json or KNetOPCSourceBinary.json). If left as a bare filename, the connector searches in the package's etc subfolder. |
knet.opc.config.content |
Yes* | — | The OPC-UA configuration file encoded as a Base64 string, generated with the MASES.KNet.OPC.Configuration.Converter tool. |
knet.opc.source.maxdatachangeinpoll |
No | 100 |
Maximum number of change notifications returned in a single Connect poll cycle. |
knet.opc.source.splitsubscriptionbytask |
No | true |
When true, subscriptions are distributed across multiple tasks (up to tasks.max). |
*Exactly one of knet.opc.config.filename and knet.opc.config.content must be provided.
For the full OPC-UA configuration file reference see Source connector configuration.
Examples
Standalone — JSON encoding
name=knet-opc-source
connector.class=org.mases.knetopc.source.KNetOPCSourceConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
knet.opc.config.filename=KNetOPCSource.json
knet.opc.source.maxdatachangeinpoll=100
knet.opc.source.splitsubscriptionbytask=true
KNetOPCSource.json
{
"ApplicationName": "MyApp",
"ApplicationVersion": "1.0.0",
"SessionConfiguration": {
"ConfigFilename": "KNetOPC.Config.xml",
"EndPoint": "opc.tcp://172.31.48.1:62640/masesgroup/ServerSimulator",
"SessionName": "MyApp"
},
"Subscriptions": [
{
"DefaultConnectParameters": {
"Topic": "myTopic",
"EncodingType": 2
},
"MonitoredItems": [
{
"NodeId": "ns=2;s=Realtimedata",
"IsRoot": true
}
]
}
]
}
Standalone — Binary encoding
name=knet-opc-source-binary
connector.class=org.mases.knetopc.source.KNetOPCSourceConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
knet.opc.config.filename=KNetOPCSourceBinary.json
KNetOPCSourceBinary.json — identical structure to KNetOPCSource.json with "EncodingType": 0.
Distributed mode
{
"name": "knet-opc-source",
"config": {
"tasks.max": "1",
"connector.class": "org.mases.knetopc.source.KNetOPCSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"knet.opc.config.filename": "KNetOPCSource.json"
}
}
KNet for OPC Sink connector
The KNet for OPC Sink connector implements a request/response pattern over Kafka: it reads command records from a request topic (topics), executes the corresponding OPC-UA operation on the server, and publishes the result to a separate response topic (knet.opc.sink.response.topic).
This design allows any .NET application to interact with an OPC-UA server through the standard Kafka transport — or through the KNetOPCSinkClient class, which implements the OPC Foundation ISessionClient interface with Kafka as the transport layer.
Important
tasks.max and max.poll.records must both be left at 1. The Sink connector requires strict sequentiality between request and response. Do not increase these values.
Connector configuration properties
| Property | Required | Default / fixed value | Description |
|---|---|---|---|
name |
Yes | — | The connector instance name within the Kafka Connect runtime. |
connector.class |
Yes | org.mases.knetopc.sink.KNetOPCSinkConnector |
DO NOT CHANGE. |
tasks.max |
Yes | 1 |
DO NOT CHANGE. Must be 1 to preserve request/response sequentiality. |
max.poll.records |
Yes | 1 |
DO NOT CHANGE. Must be 1 to process one command at a time. |
topics |
Yes | — | The Kafka topic from which the connector reads command records. Must match the RequestTopic configured in KNetOPCClient. |
key.converter |
Yes | org.apache.kafka.connect.storage.StringConverter |
DO NOT CHANGE. |
value.converter |
Yes | org.apache.kafka.connect.converters.ByteArrayConverter |
DO NOT CHANGE. |
knet.opc.config.filename |
Yes* | — | Path to the OPC-UA configuration file (KNetOPCSink.json). |
knet.opc.config.content |
Yes* | — | The OPC-UA configuration file encoded as a Base64 string. |
knet.opc.sink.bootstrap.servers |
Yes | — | Bootstrap servers address of the Kafka cluster used to publish responses (e.g. localhost:9092). |
knet.opc.sink.response.topic |
Yes | — | The Kafka topic to which the connector writes result records. Must match the ResponseTopic configured in KNetOPCClient. |
knet.opc.sink.enable.transactions |
No | false |
When true, the connector uses a transactional producer to publish responses. |
*Exactly one of knet.opc.config.filename and knet.opc.config.content must be provided.
For the full OPC-UA configuration file reference see Sink connector configuration.
Standalone configuration example
name=knet-opc-sink
connector.class=org.mases.knetopc.sink.KNetOPCSinkConnector
# Leave both tasks.max and max.poll.records untouched: the connector checks these values at runtime
tasks.max=1
max.poll.records=1
# The topic where requests are written and will be read from the connector
topics=topic-request
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
knet.opc.config.filename=KNetOPCSink.json
# The address of the broker used to write results
knet.opc.sink.bootstrap.servers=localhost:9092
# The topic where responses will be written
knet.opc.sink.response.topic=topic-response
knet.opc.sink.enable.transactions=false
Distributed configuration example
{
"name": "knet-opc-sink",
"config": {
"tasks.max": "1",
"max.poll.records": "1",
"connector.class": "org.mases.knetopc.sink.KNetOPCSinkConnector",
"topics": "topic-request",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"knet.opc.config.filename": "KNetOPCSink.json",
"knet.opc.sink.bootstrap.servers": "localhost:9092",
"knet.opc.sink.response.topic": "topic-response",
"knet.opc.sink.enable.transactions": "false"
}
}
REST API — distributed mode management
Once the Connect runtime is started, use the REST endpoint to manage connectors. For a full reference see Kafka Connect REST Interface.
# List registered plugins
curl http://localhost:8083/connector-plugins/
# List registered connectors
curl http://localhost:8083/connectors/
# Inspect a connector's configuration
curl http://localhost:8083/connectors/knet-opc-source
# Register and start a connector
curl -X POST -H "Content-Type: application/json" \
http://localhost:8083/connectors \
--data "@connect-knet-opc-source.json"
# Stop a connector
curl -X PUT http://localhost:8083/connectors/knet-opc-source/stop
# Resume a connector (after an explicit stop)
curl -X PUT http://localhost:8083/connectors/knet-opc-source/resume
# Restart a connector (after a failure)
curl -X POST http://localhost:8083/connectors/knet-opc-source/restart