KNet for OPC (KNetOPC): Client library
Overview
The NuGet package MASES.KNet.OPC.Client provides two complementary .NET client classes that together cover the full OPC-UA interaction model over Apache Kafka™:
| Class | Role | Complements |
|---|---|---|
KNetOPCSinkClient |
Sends OPC-UA request/response operations (Read, Write, Call, Browse, …) through the Sink connector. Implements ISessionClient. |
Source connector |
KNetOPCSourceClient |
Subscribes to OPC-UA change notifications published by the Source connector and delivers MonitoredItemNotification records to the application. |
Sink connector |
The two clients are designed to be used together: the Source connector + KNetOPCSourceClient handle the continuous data flow from OPC-UA nodes to the application, while the Sink connector + KNetOPCSinkClient handle on-demand interactions with the OPC-UA server. An application that needs both read-only monitoring and active control can instantiate both clients against the same Kafka cluster.
Installation
dotnet add package MASES.KNet.OPC.Client
JVM initialization — KNetOPCClientEngine
KNetOPCClientEngine is the class responsible for initializing the JVM environment used by both KNetOPCSinkClient and KNetOPCSourceClient. It extends KNetCore<KNetOPCClientEngine> and is started automatically the first time a client is instantiated.
public class KNetOPCClientEngine : KNetCore<KNetOPCClientEngine>
{
}
In most cases no explicit interaction is required. However, because KNetCore<T> exposes static properties for fine-grained JVM configuration, KNetOPCClientEngine can be configured before the first client is created to control JVM startup options — for example heap size, additional JVM arguments, or the JVM library path:
// Optional: configure JVM before the first client is instantiated
KNetOPCClientEngine.ApplicationHeapSize = "512M";
KNetOPCClientEngine.ApplicationInitialHeapSize = "256M";
KNetOPCClientEngine.ApplicationEnableDebug = true;
// Now instantiate the client — KNetOPCClientEngine starts automatically
var sinkClient = new KNetOPCSinkClient("localhost:9092", "topic-request", "topic-response");
sinkClient.ConnectAndStart();
If you need to start the engine explicitly before the first client (e.g. to verify JVM availability at startup), call:
KNetOPCClientEngine.CreateGlobalInstance();
CreateGlobalInstance() is idempotent — subsequent calls are no-ops. Both KNetOPCSinkClient and KNetOPCSourceClient call it internally if no global instance is present, so explicit invocation is optional.
Note
All JVM configuration properties (ApplicationHeapSize, ApplicationInitialHeapSize, custom JVM arguments, etc.) must be set before CreateGlobalInstance() is called, either explicitly or implicitly through the first client instantiation. Configuration changes after JVM startup are ignored.
KNetOPCSinkClient
KNetOPCSinkClient implements the OPC Foundation ISessionClient interface with Apache Kafka™ as the transport layer. It routes OPC-UA operations through the Sink connector request/response topics — the Kafka transport is entirely transparent to the caller.
How it works
.NET application
└─► Calls ISessionClient method (Read, Write, Call, Browse, ...)
└─► KNetOPCSinkClient serializes the request
└─► Publishes record to request topic (topics=)
└─► Sink connector executes operation on OPC-UA server
└─► Publishes result to response topic
└─► KNetOPCSinkClient returns result to caller
Supported operations
All operations in the table below are implemented as full request/response over Kafka. Begin*/End* variants and subscription-related operations are not implemented — subscriptions are handled by the Source connector and KNetOPCSourceClient.
| OPC-UA operation | Status |
|---|---|
Read |
✅ |
HistoryRead |
✅ |
Write |
✅ |
HistoryUpdate |
✅ |
Call |
✅ |
Browse |
✅ |
BrowseNext |
✅ |
TranslateBrowsePathsToNodeIds |
✅ |
RegisterNodes |
✅ |
UnregisterNodes |
✅ |
AddNodes |
✅ |
AddReferences |
✅ |
DeleteNodes |
✅ |
DeleteReferences |
✅ |
QueryFirst |
✅ |
QueryNext |
✅ |
Cancel |
✅ |
Begin*/End* variants |
➖ Not implemented |
| Subscribe / MonitoredItems | ➖ Use Source connector + KNetOPCSourceClient |
Constructor
public KNetOPCSinkClient(
string bootstrapServers,
string requestTopic,
string responseTopic,
ITelemetryContext telemetryContext = null)
| Parameter | Description |
|---|---|
bootstrapServers |
Kafka bootstrap servers, e.g. MYCLUSTER:9092. |
requestTopic |
Topic where command records are published. Must match topics in the Sink connector properties. |
responseTopic |
Topic where result records are received. Must match knet.opc.sink.response.topic in the Sink connector properties. |
telemetryContext |
Optional telemetry context for logging and diagnostics. |
Properties
| Property | Type | Default | Description |
|---|---|---|---|
BootstrapServers |
string | — | Bootstrap servers passed to the constructor. |
ClusterId |
string | — | Kafka cluster ID. Available after Connect(). |
RequestTopic |
string | — | Request topic passed to the constructor. |
ResponseTopic |
string | — | Response topic passed to the constructor. |
EncodingType |
EncodingType |
Json |
Encoding used for message serialization. Can be changed at runtime. |
GroupId |
string | auto GUID | Kafka consumer group ID for the internal response consumer. |
AutoCreateTopic |
bool | false |
When true, ConnectAndStart() creates RequestTopic and ResponseTopic if absent. Partition count is forced to 1. |
ReplicationFactor |
short | 1 |
Replication factor used when AutoCreateTopic is true. |
AdvancedOptions |
AdvancedOptionsType |
— | Producer, consumer, and topic configuration. See KNetOPCSinkClient advanced options. |
AdvancedSecurityOptions |
AdvancedSecurityOptionsType |
— | SSL/TLS and SASL configuration. See Security options. |
Lifecycle methods
| Method | Description |
|---|---|
Connect(string bootstrapControllers = null) |
Connects to the Kafka cluster. bootstrapControllers supersedes BootstrapServers for the initial admin connection if provided. |
CreateTopics() |
Explicitly creates RequestTopic and ResponseTopic. Partition count forced to 1. |
DeleteTopics() |
Deletes RequestTopic and ResponseTopic. Pending unprocessed requests are discarded. |
ConnectAndStart() |
Main startup method: connects, optionally creates topics (AutoCreateTopic), creates internal producer and consumer, and records the current head offset of ResponseTopic to skip stale responses from previous sessions. Call once before issuing any operations. |
Usage example
using MASES.KNet.OPC.Client;
var sinkClient = new KNetOPCSinkClient(
bootstrapServers: "localhost:9092",
requestTopic: "topic-request", // must match topics= in Sink connector
responseTopic: "topic-response"); // must match knet.opc.sink.response.topic
sinkClient.ConnectAndStart();
// Use as ISessionClient — compatible with any OPC-UA extension that accepts ISessionClient
ISessionClient session = sinkClient;
// Read
var readResponse = await session.ReadAsync(new ReadRequest
{
NodesToRead = [ new ReadValueId { NodeId = NodeId.Parse("ns=2;s=Realtimedata"), AttributeId = Attributes.Value } ]
});
// Write
await session.WriteAsync(new WriteRequest
{
NodesToWrite = [ new WriteValue { NodeId = NodeId.Parse("ns=2;s=SetPoint"), AttributeId = Attributes.Value, Value = new DataValue(new Variant(42.0)) } ]
});
// Call
var callResponse = await session.CallAsync(new CallRequest
{
MethodsToCall = [ new CallMethodRequest { ObjectId = NodeId.Parse("ns=2;s=MyObject"), MethodId = NodeId.Parse("ns=2;s=MyMethod") } ]
});
// Browse
var browseResponse = await session.BrowseAsync(new BrowseRequest
{
NodesToBrowse = [ new BrowseDescription { NodeId = ObjectIds.ObjectsFolder, BrowseDirection = BrowseDirection.Forward, ReferenceTypeId = ReferenceTypeIds.HierarchicalReferences, IncludeSubtypes = true, ResultMask = (uint)BrowseResultMask.All } ]
});
OPC Foundation SDK extension methods
Because KNetOPCSinkClient implements ISessionClient, any OPC Foundation SDK extension method or helper that operates on ISessionClient (or on the broader ISession) works transparently against the Kafka transport. The following examples show representative SDK helpers from the Opc.Ua.Client namespace:
using Opc.Ua;
using Opc.Ua.Client;
ISessionClient session = sinkClient;
// Read a single node value using the SDK helper (builds the ReadRequest internally)
DataValue value = await session.ReadValueAsync(
NodeId.Parse("ns=2;s=Realtimedata"),
cancellationToken);
// Write a single value using the SDK helper
await session.WriteValueAsync(
NodeId.Parse("ns=2;s=SetPoint"),
new DataValue(new Variant(42.0)),
cancellationToken);
// Translate a browse path to a NodeId
BrowsePathResult[] results = await session.TranslateBrowsePathAsync(
ObjectIds.ObjectsFolder,
new RelativePath("MyObject/MyVariable"),
cancellationToken);
// Recursive browse using the SDK utility
ReferenceDescriptionCollection references = await session.FetchReferencesAsync(
ObjectIds.ObjectsFolder,
cancellationToken);
Any third-party OPC-UA companion specification library or aggregation layer that accepts ISessionClient can be pointed at a KNetOPCSinkClient instance and will use Kafka as its transport without any code change:
// Example: third-party companion spec client that accepts ISessionClient
ISessionClient session = sinkClient;
var companionClient = new MyCompanionSpecClient(session);
await companionClient.ReadDataAsync(cancellationToken);
Note
SDK extension methods that internally call Begin*/End* variants or subscription-related APIs (CreateMonitoredItems, CreateSubscription, etc.) are not supported. For subscription-based data access, use the Source connector and KNetOPCSourceClient.
KNetOPCSinkClient advanced options
AdvancedOptions (type AdvancedOptionsType) provides fine-grained control over the internal Kafka producer, consumer, and topic configuration. Access it before calling ConnectAndStart().
| Property | Type | Default | Description |
|---|---|---|---|
RequestTopicConfig |
TopicConfigBuilder |
null |
Extended topic configuration for RequestTopic. When null, cluster defaults apply. |
ResponseTopicConfig |
TopicConfigBuilder |
null |
Extended topic configuration for ResponseTopic. When null, cluster defaults apply. |
ProducerConfig |
ProducerConfigBuilder |
— | Extended producer configuration. KeySerializerClass and ValueSerializerClass are set internally — do not override. |
ConsumerConfig |
ConsumerConfigBuilder |
— | Extended consumer configuration. KeyDeserializerClass, ValueDeserializerClass, and AllowAutoCreateTopics are set internally — do not override. |
KNetOPCSourceClient
KNetOPCSourceClient subscribes to a Kafka topic published by the Source connector and delivers MonitoredItemNotification records to the application as they arrive. It implements IDisposable.
How it works
OPC-UA Server
└─► Source connector publishes change notifications to Kafka topic
└─► KNetOPCSourceClient consumes records
└─► Delivers MonitoredItemNotification to application callback
Constructor
public KNetOPCSourceClient(
string bootstrapServers,
string topic,
ITelemetryContext telemetryContext = null)
| Parameter | Description |
|---|---|
bootstrapServers |
Kafka bootstrap servers, e.g. MYCLUSTER:9092. |
topic |
The Kafka topic published by the Source connector. Must match the topic configured in the Source connector's OPC-UA configuration file. |
telemetryContext |
Optional telemetry context for logging and diagnostics. |
Properties
| Property | Type | Default | Description |
|---|---|---|---|
BootstrapServers |
string | — | Bootstrap servers passed to the constructor. |
ClusterId |
string | — | Kafka cluster ID. Available after connection. |
Topic |
string | — | The topic passed to the constructor. |
GroupId |
string | auto GUID | Kafka consumer group ID. Override before starting to enable multiple consumers in the same group. |
AdvancedOptions |
AdvancedOptionsType |
— | Consumer configuration. See KNetOPCSourceClient advanced options. |
AdvancedSecurityOptions |
AdvancedSecurityOptionsType |
— | SSL/TLS and SASL configuration. See Security options. |
Usage example
using MASES.KNet.OPC.Client;
using var sourceClient = new KNetOPCSourceClient(
bootstrapServers: "localhost:9092",
topic: "myTopic"); // must match the topic in the Source connector OPC-UA config
sourceClient.OnNotification += (nodeId, notification) =>
{
Console.WriteLine($"{nodeId}: {notification.Value} @ {notification.SourceTimestamp}");
};
await sourceClient.ConnectAndStartAsync(cancellationToken);
KNetOPCSourceClient advanced options
AdvancedOptions (type AdvancedOptionsType) provides control over the internal Kafka consumer. Access it before starting the client.
| Property | Type | Default | Description |
|---|---|---|---|
ConsumerConfig |
ConsumerConfigBuilder |
— | Extended consumer configuration. KeyDeserializerClass, ValueDeserializerClass, and AllowAutoCreateTopics are set internally — do not override. |
Security options
Both KNetOPCSinkClient and KNetOPCSourceClient expose AdvancedSecurityOptions (type AdvancedSecurityOptionsType) for SSL/TLS and SASL configuration. When set, the security settings are applied uniformly to the admin client, producer (Sink only), and consumer.
| Property | Type | Default | Description |
|---|---|---|---|
SecurityProtocol |
string | null |
Security protocol for the Kafka connection (e.g. SSL, SASL_SSL, SASL_PLAINTEXT). When null, plaintext is used. |
SslConfigs |
SslConfigsBuilder |
null |
SSL/TLS configuration. Applied when SecurityProtocol is set. |
SaslConfigs |
SaslConfigsBuilder |
null |
SASL configuration. Applied when SecurityProtocol is set. |
Example — TLS connection
sinkClient.AdvancedSecurityOptions.SecurityProtocol = "SSL";
sinkClient.AdvancedSecurityOptions.SslConfigs = SslConfigsBuilder.Create()
.WithSslCaLocation("/path/to/ca.crt")
.WithSslCertificateLocation("/path/to/client.crt")
.WithSslKeyLocation("/path/to/client.key");
sinkClient.ConnectAndStart();
The same pattern applies to KNetOPCSourceClient.AdvancedSecurityOptions.
Note
KNetOPCSinkClient.RequestTopic and KNetOPCSinkClient.ResponseTopic must match exactly the values configured in the Sink connector (topics and knet.opc.sink.response.topic). A mismatch results in requests being silently undelivered or responses being missed.
Note
KNetOPCSourceClient.Topic must match the topic configured in the Source connector's OPC-UA configuration file (Subscriptions[].DefaultConnectParameters.Topic). A mismatch results in no notifications being received.