Table of Contents

KNet for OPC (KNetOPC): Client library

Overview

The NuGet package MASES.KNet.OPC.Client provides two complementary .NET client classes that together cover the full OPC-UA interaction model over Apache Kafka™:

Class Role Complements
KNetOPCSinkClient Sends OPC-UA request/response operations (Read, Write, Call, Browse, …) through the Sink connector. Implements ISessionClient. Source connector
KNetOPCSourceClient Subscribes to OPC-UA change notifications published by the Source connector and delivers MonitoredItemNotification records to the application. Sink connector

The two clients are designed to be used together: the Source connector + KNetOPCSourceClient handle the continuous data flow from OPC-UA nodes to the application, while the Sink connector + KNetOPCSinkClient handle on-demand interactions with the OPC-UA server. An application that needs both read-only monitoring and active control can instantiate both clients against the same Kafka cluster.


Installation

dotnet add package MASES.KNet.OPC.Client

JVM initialization — KNetOPCClientEngine

KNetOPCClientEngine is the class responsible for initializing the JVM environment used by both KNetOPCSinkClient and KNetOPCSourceClient. It extends KNetCore<KNetOPCClientEngine> and is started automatically the first time a client is instantiated.

public class KNetOPCClientEngine : KNetCore<KNetOPCClientEngine>
{
}

In most cases no explicit interaction is required. However, because KNetCore<T> exposes static properties for fine-grained JVM configuration, KNetOPCClientEngine can be configured before the first client is created to control JVM startup options — for example heap size, additional JVM arguments, or the JVM library path:

// Optional: configure JVM before the first client is instantiated
KNetOPCClientEngine.ApplicationHeapSize = "512M";
KNetOPCClientEngine.ApplicationInitialHeapSize = "256M";
KNetOPCClientEngine.ApplicationEnableDebug = true;

// Now instantiate the client — KNetOPCClientEngine starts automatically
var sinkClient = new KNetOPCSinkClient("localhost:9092", "topic-request", "topic-response");
sinkClient.ConnectAndStart();

If you need to start the engine explicitly before the first client (e.g. to verify JVM availability at startup), call:

KNetOPCClientEngine.CreateGlobalInstance();

CreateGlobalInstance() is idempotent — subsequent calls are no-ops. Both KNetOPCSinkClient and KNetOPCSourceClient call it internally if no global instance is present, so explicit invocation is optional.

Note

All JVM configuration properties (ApplicationHeapSize, ApplicationInitialHeapSize, custom JVM arguments, etc.) must be set before CreateGlobalInstance() is called, either explicitly or implicitly through the first client instantiation. Configuration changes after JVM startup are ignored.


KNetOPCSinkClient

KNetOPCSinkClient implements the OPC Foundation ISessionClient interface with Apache Kafka™ as the transport layer. It routes OPC-UA operations through the Sink connector request/response topics — the Kafka transport is entirely transparent to the caller.

How it works

.NET application
    └─► Calls ISessionClient method (Read, Write, Call, Browse, ...)
            └─► KNetOPCSinkClient serializes the request
                    └─► Publishes record to request topic (topics=)
                            └─► Sink connector executes operation on OPC-UA server
                                    └─► Publishes result to response topic
                                            └─► KNetOPCSinkClient returns result to caller

Supported operations

All operations in the table below are implemented as full request/response over Kafka. Begin*/End* variants and subscription-related operations are not implemented — subscriptions are handled by the Source connector and KNetOPCSourceClient.

OPC-UA operation Status
Read
HistoryRead
Write
HistoryUpdate
Call
Browse
BrowseNext
TranslateBrowsePathsToNodeIds
RegisterNodes
UnregisterNodes
AddNodes
AddReferences
DeleteNodes
DeleteReferences
QueryFirst
QueryNext
Cancel
Begin*/End* variants ➖ Not implemented
Subscribe / MonitoredItems ➖ Use Source connector + KNetOPCSourceClient

Constructor

public KNetOPCSinkClient(
    string bootstrapServers,
    string requestTopic,
    string responseTopic,
    ITelemetryContext telemetryContext = null)
Parameter Description
bootstrapServers Kafka bootstrap servers, e.g. MYCLUSTER:9092.
requestTopic Topic where command records are published. Must match topics in the Sink connector properties.
responseTopic Topic where result records are received. Must match knet.opc.sink.response.topic in the Sink connector properties.
telemetryContext Optional telemetry context for logging and diagnostics.

Properties

Property Type Default Description
BootstrapServers string Bootstrap servers passed to the constructor.
ClusterId string Kafka cluster ID. Available after Connect().
RequestTopic string Request topic passed to the constructor.
ResponseTopic string Response topic passed to the constructor.
EncodingType EncodingType Json Encoding used for message serialization. Can be changed at runtime.
GroupId string auto GUID Kafka consumer group ID for the internal response consumer.
AutoCreateTopic bool false When true, ConnectAndStart() creates RequestTopic and ResponseTopic if absent. Partition count is forced to 1.
ReplicationFactor short 1 Replication factor used when AutoCreateTopic is true.
AdvancedOptions AdvancedOptionsType Producer, consumer, and topic configuration. See KNetOPCSinkClient advanced options.
AdvancedSecurityOptions AdvancedSecurityOptionsType SSL/TLS and SASL configuration. See Security options.

Lifecycle methods

Method Description
Connect(string bootstrapControllers = null) Connects to the Kafka cluster. bootstrapControllers supersedes BootstrapServers for the initial admin connection if provided.
CreateTopics() Explicitly creates RequestTopic and ResponseTopic. Partition count forced to 1.
DeleteTopics() Deletes RequestTopic and ResponseTopic. Pending unprocessed requests are discarded.
ConnectAndStart() Main startup method: connects, optionally creates topics (AutoCreateTopic), creates internal producer and consumer, and records the current head offset of ResponseTopic to skip stale responses from previous sessions. Call once before issuing any operations.

Usage example

using MASES.KNet.OPC.Client;

var sinkClient = new KNetOPCSinkClient(
    bootstrapServers: "localhost:9092",
    requestTopic:    "topic-request",   // must match topics= in Sink connector
    responseTopic:   "topic-response"); // must match knet.opc.sink.response.topic

sinkClient.ConnectAndStart();

// Use as ISessionClient — compatible with any OPC-UA extension that accepts ISessionClient
ISessionClient session = sinkClient;

// Read
var readResponse = await session.ReadAsync(new ReadRequest
{
    NodesToRead = [ new ReadValueId { NodeId = NodeId.Parse("ns=2;s=Realtimedata"), AttributeId = Attributes.Value } ]
});

// Write
await session.WriteAsync(new WriteRequest
{
    NodesToWrite = [ new WriteValue { NodeId = NodeId.Parse("ns=2;s=SetPoint"), AttributeId = Attributes.Value, Value = new DataValue(new Variant(42.0)) } ]
});

// Call
var callResponse = await session.CallAsync(new CallRequest
{
    MethodsToCall = [ new CallMethodRequest { ObjectId = NodeId.Parse("ns=2;s=MyObject"), MethodId = NodeId.Parse("ns=2;s=MyMethod") } ]
});

// Browse
var browseResponse = await session.BrowseAsync(new BrowseRequest
{
    NodesToBrowse = [ new BrowseDescription { NodeId = ObjectIds.ObjectsFolder, BrowseDirection = BrowseDirection.Forward, ReferenceTypeId = ReferenceTypeIds.HierarchicalReferences, IncludeSubtypes = true, ResultMask = (uint)BrowseResultMask.All } ]
});

OPC Foundation SDK extension methods

Because KNetOPCSinkClient implements ISessionClient, any OPC Foundation SDK extension method or helper that operates on ISessionClient (or on the broader ISession) works transparently against the Kafka transport. The following examples show representative SDK helpers from the Opc.Ua.Client namespace:

using Opc.Ua;
using Opc.Ua.Client;

ISessionClient session = sinkClient;

// Read a single node value using the SDK helper (builds the ReadRequest internally)
DataValue value = await session.ReadValueAsync(
    NodeId.Parse("ns=2;s=Realtimedata"),
    cancellationToken);

// Write a single value using the SDK helper
await session.WriteValueAsync(
    NodeId.Parse("ns=2;s=SetPoint"),
    new DataValue(new Variant(42.0)),
    cancellationToken);

// Translate a browse path to a NodeId
BrowsePathResult[] results = await session.TranslateBrowsePathAsync(
    ObjectIds.ObjectsFolder,
    new RelativePath("MyObject/MyVariable"),
    cancellationToken);

// Recursive browse using the SDK utility
ReferenceDescriptionCollection references = await session.FetchReferencesAsync(
    ObjectIds.ObjectsFolder,
    cancellationToken);

Any third-party OPC-UA companion specification library or aggregation layer that accepts ISessionClient can be pointed at a KNetOPCSinkClient instance and will use Kafka as its transport without any code change:

// Example: third-party companion spec client that accepts ISessionClient
ISessionClient session = sinkClient;
var companionClient = new MyCompanionSpecClient(session);
await companionClient.ReadDataAsync(cancellationToken);
Note

SDK extension methods that internally call Begin*/End* variants or subscription-related APIs (CreateMonitoredItems, CreateSubscription, etc.) are not supported. For subscription-based data access, use the Source connector and KNetOPCSourceClient.

KNetOPCSinkClient advanced options

AdvancedOptions (type AdvancedOptionsType) provides fine-grained control over the internal Kafka producer, consumer, and topic configuration. Access it before calling ConnectAndStart().

Property Type Default Description
RequestTopicConfig TopicConfigBuilder null Extended topic configuration for RequestTopic. When null, cluster defaults apply.
ResponseTopicConfig TopicConfigBuilder null Extended topic configuration for ResponseTopic. When null, cluster defaults apply.
ProducerConfig ProducerConfigBuilder Extended producer configuration. KeySerializerClass and ValueSerializerClass are set internally — do not override.
ConsumerConfig ConsumerConfigBuilder Extended consumer configuration. KeyDeserializerClass, ValueDeserializerClass, and AllowAutoCreateTopics are set internally — do not override.

KNetOPCSourceClient

KNetOPCSourceClient subscribes to a Kafka topic published by the Source connector and delivers MonitoredItemNotification records to the application as they arrive. It implements IDisposable.

How it works

OPC-UA Server
    └─► Source connector publishes change notifications to Kafka topic
            └─► KNetOPCSourceClient consumes records
                    └─► Delivers MonitoredItemNotification to application callback

Constructor

public KNetOPCSourceClient(
    string bootstrapServers,
    string topic,
    ITelemetryContext telemetryContext = null)
Parameter Description
bootstrapServers Kafka bootstrap servers, e.g. MYCLUSTER:9092.
topic The Kafka topic published by the Source connector. Must match the topic configured in the Source connector's OPC-UA configuration file.
telemetryContext Optional telemetry context for logging and diagnostics.

Properties

Property Type Default Description
BootstrapServers string Bootstrap servers passed to the constructor.
ClusterId string Kafka cluster ID. Available after connection.
Topic string The topic passed to the constructor.
GroupId string auto GUID Kafka consumer group ID. Override before starting to enable multiple consumers in the same group.
AdvancedOptions AdvancedOptionsType Consumer configuration. See KNetOPCSourceClient advanced options.
AdvancedSecurityOptions AdvancedSecurityOptionsType SSL/TLS and SASL configuration. See Security options.

Usage example

using MASES.KNet.OPC.Client;

using var sourceClient = new KNetOPCSourceClient(
    bootstrapServers: "localhost:9092",
    topic:           "myTopic"); // must match the topic in the Source connector OPC-UA config

sourceClient.OnNotification += (nodeId, notification) =>
{
    Console.WriteLine($"{nodeId}: {notification.Value} @ {notification.SourceTimestamp}");
};

await sourceClient.ConnectAndStartAsync(cancellationToken);

KNetOPCSourceClient advanced options

AdvancedOptions (type AdvancedOptionsType) provides control over the internal Kafka consumer. Access it before starting the client.

Property Type Default Description
ConsumerConfig ConsumerConfigBuilder Extended consumer configuration. KeyDeserializerClass, ValueDeserializerClass, and AllowAutoCreateTopics are set internally — do not override.

Security options

Both KNetOPCSinkClient and KNetOPCSourceClient expose AdvancedSecurityOptions (type AdvancedSecurityOptionsType) for SSL/TLS and SASL configuration. When set, the security settings are applied uniformly to the admin client, producer (Sink only), and consumer.

Property Type Default Description
SecurityProtocol string null Security protocol for the Kafka connection (e.g. SSL, SASL_SSL, SASL_PLAINTEXT). When null, plaintext is used.
SslConfigs SslConfigsBuilder null SSL/TLS configuration. Applied when SecurityProtocol is set.
SaslConfigs SaslConfigsBuilder null SASL configuration. Applied when SecurityProtocol is set.

Example — TLS connection

sinkClient.AdvancedSecurityOptions.SecurityProtocol = "SSL";
sinkClient.AdvancedSecurityOptions.SslConfigs = SslConfigsBuilder.Create()
    .WithSslCaLocation("/path/to/ca.crt")
    .WithSslCertificateLocation("/path/to/client.crt")
    .WithSslKeyLocation("/path/to/client.key");

sinkClient.ConnectAndStart();

The same pattern applies to KNetOPCSourceClient.AdvancedSecurityOptions.


Note

KNetOPCSinkClient.RequestTopic and KNetOPCSinkClient.ResponseTopic must match exactly the values configured in the Sink connector (topics and knet.opc.sink.response.topic). A mismatch results in requests being silently undelivered or responses being missed.

Note

KNetOPCSourceClient.Topic must match the topic configured in the Source connector's OPC-UA configuration file (Subscriptions[].DefaultConnectParameters.Topic). A mismatch results in no notifications being received.