Table of Contents

KNet for OPC (KNetOPC): current available KEFCore transformations

This page describes each KEFCore transformation available in KNetOPC, the entity model produced by each transformation.type, and the full configuration reference. For a general introduction to KEFCore transformations, the pipeline, and the common configuration properties, see Introduction to KEFCore.

Summary

Transformation class Serialization format Description
KEFCoreTransformationJson JSON Converts records using the KEFCore JSON SerDes (DefaultKEFCoreSerDes).
KEFCoreTransformationAvro Avro Converts records using the KEFCore Avro SerDes (AvroKEFCoreSerDes).
KEFCoreTransformationProtobuf Protobuf Converts records using the KEFCore Protobuf SerDes (ProtobufKEFCoreSerDes).

Available transformation.type values:

transformation.type Description
MonitoredItemNotificationTransformation Maps OPC-UA MonitoredItemNotification records to the MonitoredItemNotificationEntity EF Core entity.

KEFCoreTransformationJson

Java class: org.mases.kefcoreopc.transformations.KEFCoreTransformationJson

Converts each OPC-UA change notification record to a KEFCore-compatible record serialized using the JSON format (DefaultKEFCoreSerDes). The consuming .NET DbContext must be configured with DefaultKEFCoreSerDes on both key and value.

Configuration properties and example

This transformation has no additional configuration properties beyond the common base properties.

Example

transforms=ToKEFCore
transforms.ToKEFCore.type=org.mases.kefcoreopc.transformations.KEFCoreTransformationJson
transforms.ToKEFCore.transformation.type=MonitoredItemNotificationTransformation

KEFCoreTransformationAvro

Java class: org.mases.kefcoreopc.transformations.KEFCoreTransformationAvro

Converts each OPC-UA change notification record to a KEFCore-compatible record serialized using the Avro binary format (AvroKEFCoreSerDes). The consuming .NET DbContext must be configured with AvroKEFCoreSerDes on both key and value.

Configuration properties and example

This transformation has no additional configuration properties beyond the common base properties.

Example

transforms=ToKEFCore
transforms.ToKEFCore.type=org.mases.kefcoreopc.transformations.KEFCoreTransformationAvro
transforms.ToKEFCore.transformation.type=MonitoredItemNotificationTransformation

KEFCoreTransformationProtobuf

Java class: org.mases.kefcoreopc.transformations.KEFCoreTransformationProtobuf

Converts each OPC-UA change notification record to a KEFCore-compatible record serialized using the Protobuf format (ProtobufKEFCoreSerDes). The consuming .NET DbContext must be configured with ProtobufKEFCoreSerDes on both key and value.

Configuration properties and example

This transformation has no additional configuration properties beyond the common base properties.

Example

transforms=ToKEFCore
transforms.ToKEFCore.type=org.mases.kefcoreopc.transformations.KEFCoreTransformationProtobuf
transforms.ToKEFCore.transformation.type=MonitoredItemNotificationTransformation

MonitoredItemNotificationTransformation

transformation.type value: MonitoredItemNotificationTransformation

Converts each OPC-UA MonitoredItemNotification record received from the KNet for OPC Source connector into an instance of the MonitoredItemNotificationEntity EF Core entity. This is the default and currently the only available transformation.type.

The MonitoredItemNotificationEntity model

The MonitoredItemNotificationEntity model

MonitoredItemNotificationEntity is the EF Core entity produced by this transformation. It is the target type registered in the KEFCore DbContext and represents a single OPC-UA monitored node at a given point in time.

Primary key

Property .NET type Description
NodeIdString string String representation of the OPC-UA NodeId of the monitored item (e.g. ns=2;s=Realtimedata). Used as the Kafka record key and as the EF Core primary key.

Complex type: MonitoredItemNotificationBag

The MonitoredItemNotification property of the entity is of complex type MonitoredItemNotificationBag. It contains all the fields extracted from the OPC-UA MonitoredItemNotification, including metadata and the typed scalar value.

Metadata fields
Property .NET type Description
ClientHandle uint The client-assigned handle of the monitored item, as defined in the OPC-UA subscription.
StatusCode uint The OPC-UA StatusCode of the data value (raw code). Use StatusCode.IsGood() / IsBad() to evaluate quality.
SourceTimestamp DateTime The timestamp assigned by the OPC-UA data source at the time the value changed.
ServerTimestamp DateTime The timestamp assigned by the OPC-UA server at the time it processed the change notification.
IsNull bool true if the OPC-UA value is null (i.e. no value was present in the notification).
BuiltInType BuiltInType The OPC-UA BuiltInType of the value (e.g. Boolean, Int32, Double, String). Use this to determine which typed value property to read.
ValueRank int The value rank of the data value (see OPC-UA specification). ValueRanks.Scalar (-1) indicates a scalar value.
Typed value fields

Only the field corresponding to the BuiltInType of the current notification is populated; all other fields are null. Array and non-scalar values are not mapped to individual typed fields.

Property .NET type OPC-UA BuiltInType
Bool bool? Boolean
Byte byte? Byte
SByte sbyte? SByte
Short short? Int16
UShort ushort? UInt16
Int int? Int32
UInt uint? UInt32
Long long? Int64
ULong ulong? UInt64
Float float? Float
Double double? Double
String string String
Timestamp DateTime DateTime
Guid Guid? Guid
ByteString byte[] ByteString
Note

When IsNull is true, all typed value fields are null or default. Always check IsNull before reading any typed value field.

Consuming the entity with KEFCore

The following example shows a minimal DbContext configuration to read MonitoredItemNotificationEntity records from a Kafka topic serialized with the JSON format.

The entity class MonitoredItemNotificationEntity is defined an external assembly, so EF Core uses ToKEFCoreTopic to pin a stable topic name and HasKEFCoreSerDes to declare the Avro SerDes — both resolved by KEFCore conventions at model build time:

using MASES.EntityFrameworkCore.KNet.Metadata;
using MASES.EntityFrameworkCore.KNet.Serialization.Avro;
using MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage;
using MASES.KNet.OPC.KEFCore.Model;
using Microsoft.EntityFrameworkCore;

public class OpcDbContext : KEFCoreDbContext
{
    public DbSet<MonitoredItemNotificationEntity> MonitoredItems { get; set; }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        // Pin the topic name and declare the JSON SerDes on the entity
        modelBuilder.Entity<MonitoredItemNotificationEntity>().ToKEFCoreTopic("opc.data"); // must match the transformation target topic
        modelBuilder.Entity<MonitoredItemNotificationEntity>()
                    .HasKEFCoreSerDes( // must match the transformation target Serialization format
                    keySerDesSelectorType: typeof(AvroKEFCoreSerDes.Key.BinaryRaw<>),
                    valueSerDesSelectorType: typeof(AvroKEFCoreSerDes.ValueContainer.BinaryRaw<>),
                    valueContainerType: typeof(AvroValueContainer<>));
        
        base.OnModelCreating(modelBuilder);
    }
}

Before using the context, initialize the KEFCore global instance and create the context with the required connection properties:

// Initialize KEFCore once at application startup
KEFCore.CreateGlobalInstance();

using var ctx = new OpcDbContext()
{
    BootstrapServers = "localhost:9092",
    ApplicationId = "my-opc-reader",
};

ctx.Database.EnsureCreated();

Once the context is ready, standard LINQ queries can be used:

// Get the latest value of a specific node
var entity = ctx.MonitoredItems
                .FirstOrDefault(e => e.NodeIdString == "ns=2;s=Realtimedata");

if (entity != null && !entity.MonitoredItemNotification.IsNull)
{
    Console.WriteLine($"Value: {entity.MonitoredItemNotification.Double}");
    Console.WriteLine($"Quality: {entity.MonitoredItemNotification.StatusCode}");
    Console.WriteLine($"Timestamp: {entity.MonitoredItemNotification.SourceTimestamp}");
}