Table of Contents

KNet for OPC (KNetOPC): Introduction to KEFCore

Overview

KEFCore is the Entity Framework Core provider for Apache Kafka™. It allows .NET applications to interact with data stored in Apache Kafka™ topics using the standard EF Core programming model: DbContext, LINQ queries, and strongly-typed .NET entities.

KNetOPC integrates KEFCore through a dedicated set of Kafka Connect transformations. These transformations convert OPC-UA change notification records, produced by the KNet for OPC Source connector, into a format that KEFCore can directly consume. A .NET application can then query the resulting Kafka topics as if they were a standard EF Core data source, without writing any low-level Kafka consumer code.

Role of transformations in the pipeline

Transformations in Kafka Connect are processing steps applied to each record after predicates and before the record is written to the target Kafka topic. In the KNetOPC pipeline:

OPC-UA Server
    └─► KNet Connect SDK receives change notification
            └─► Predicates evaluated (record filtered if false)
                    └─► Transformations applied
                            └─► KEFCore-compatible record written to Apache Kafka™ topic
                                    └─► .NET application queries data via KEFCore / EF Core

Without a KEFCore transformation the OPC-UA notification is published using the raw OPC-UA encoding (Binary, Xml, or Json) configured in the connector. With a KEFCore transformation, the record is re-serialized using the KEFCore wire format and the record key and value schemas are replaced with the ones expected by the KEFCore provider: a .NET application can then open a DbContext backed by the same topic and query the data directly.

Architecture

KEFCore transformations follow the same two-layer architecture used by predicates:

  • Java layer: activates the transformation within the Apache Kafka™ Connect runtime, exposes the configuration properties, and handles topic routing logic (prefix, explicit name, collapse to partition 0). The Java layer can apply topic routing entirely on its own when avoid.dotnet.invocation is set to true.
  • .NET layer: contains the serialization and model conversion logic. The Java layer delegates record conversion to the corresponding .NET class in the assembly MASES.KNet.OPC.KEFCore.Transformations via KNet Connect SDK.

Serialization formats

Three serialization formats are available, each corresponding to a distinct transformation class. The choice of format determines how the key and value are encoded in the Kafka record and which KEFCore SerDes must be configured in the consuming .NET application.

Format Java class suffix KEFCore SerDes family
JSON KEFCoreTransformationJson DefaultKEFCoreSerDes
Avro KEFCoreTransformationAvro AvroKEFCoreSerDes
Protobuf KEFCoreTransformationProtobuf ProtobufKEFCoreSerDes

The format must match the SerDes configured in the DbContext of the consuming application. Mixing formats between the producer side (transformation) and the consumer side (KEFCore DbContext) will cause deserialization errors at runtime.

The entity model

KEFCore works with strongly-typed EF Core entities. Each transformation converts an OPC-UA record into an instance of a specific .NET entity type, which becomes the value stored in the Kafka topic. The entity type is selected by the transformation.type configuration property.

The currently available transformation type is MonitoredItemNotificationTransformation, which maps OPC-UA MonitoredItemNotification records to the MonitoredItemNotificationEntity EF Core entity. This entity exposes the NodeId as the primary key and wraps all notification fields — timestamps, status code, and the typed scalar value — in a MonitoredItemNotificationBag complex type. For the full model description see Available KEFCore transformations.

Configuration

KEFCore transformations are configured in the connector properties file using the standard Kafka Connect transformation syntax. The transforms key lists one or more transformation aliases; each alias is then configured with its own transforms.<alias>.* properties.

Common configuration properties

All KEFCore transformations share the following configuration properties:

Property Type Required Default Description
application.name string No "" Application name used in logging and telemetry.
application.version string No "" Application version used in logging and telemetry.
transformation.type string No MonitoredItemNotificationTransformation Selects the internal conversion logic. See Available KEFCore transformations for the list of supported values.
avoid.dotnet.invocation boolean No false When true, the .NET layer is not invoked and only the Java-side topic routing logic is applied. Use this only when the record is already in the correct KEFCore format and only topic or partition adjustments are needed.
collapse.to.one.partition boolean No false When true, all records are written to partition 0, regardless of the partitioning strategy configured at the broker or connector level.
topic.prefix string No "" When set, the value is prepended to the current topic name using the format [prefix].[currentTopicName]. Applied before explicit.destination.topic.name.
replace.topic.name.with.entityname boolean No false When true, the topic name is replaced with the name of the EF Core entity type produced by the transformation (e.g. MonitoredItemNotificationEntity). Mutually exclusive with explicit.destination.topic.name.
explicit.destination.topic.name string No "" When set, overrides the topic name entirely with the specified value. Takes precedence over topic.prefix and replace.topic.name.with.entityname.

Topic routing precedence

When multiple topic routing properties are set, the following precedence applies:

  1. topic.prefix is applied first: [prefix].[currentTopicName]
  2. explicit.destination.topic.name overrides the result of step 1 entirely
  3. replace.topic.name.with.entityname overrides the result of step 1 with the entity type name
Note

explicit.destination.topic.name and replace.topic.name.with.entityname should not be set simultaneously. If both are set, explicit.destination.topic.name takes precedence as it is evaluated last in the Java layer.

Example

The following example configures the JSON transformation, routes all records to a topic named opc.data, and sends them all to partition 0:

transforms=ToKEFCore
transforms.ToKEFCore.type=org.mases.kefcoreopc.transformations.KEFCoreTransformationJson
transforms.ToKEFCore.transformation.type=MonitoredItemNotificationTransformation
transforms.ToKEFCore.explicit.destination.topic.name=opc.data
transforms.ToKEFCore.collapse.to.one.partition=true