KNet for OPC (KNetOPC): current available KEFCore transformations
This page describes each KEFCore transformation available in KNetOPC, the entity model produced by each transformation.type, and the full configuration reference. For a general introduction to KEFCore transformations, the pipeline, and the common configuration properties, see Introduction to KEFCore.
Summary
| Transformation class | Serialization format | Description |
|---|---|---|
| KEFCoreTransformationJson | JSON | Converts records using the KEFCore JSON SerDes (DefaultKEFCoreSerDes). |
| KEFCoreTransformationAvro | Avro | Converts records using the KEFCore Avro SerDes (AvroKEFCoreSerDes). |
| KEFCoreTransformationProtobuf | Protobuf | Converts records using the KEFCore Protobuf SerDes (ProtobufKEFCoreSerDes). |
Available transformation.type values:
transformation.type |
Description |
|---|---|
| MonitoredItemNotificationTransformation | Maps OPC-UA MonitoredItemNotification records to the MonitoredItemNotificationEntity EF Core entity. |
KEFCoreTransformationJson
Java class: org.mases.kefcoreopc.transformations.KEFCoreTransformationJson
Converts each OPC-UA change notification record to a KEFCore-compatible record serialized using the JSON format (DefaultKEFCoreSerDes). The consuming .NET DbContext must be configured with DefaultKEFCoreSerDes on both key and value.
Configuration properties and example
This transformation has no additional configuration properties beyond the common base properties.
Example
transforms=ToKEFCore
transforms.ToKEFCore.type=org.mases.kefcoreopc.transformations.KEFCoreTransformationJson
transforms.ToKEFCore.transformation.type=MonitoredItemNotificationTransformation
KEFCoreTransformationAvro
Java class: org.mases.kefcoreopc.transformations.KEFCoreTransformationAvro
Converts each OPC-UA change notification record to a KEFCore-compatible record serialized using the Avro binary format (AvroKEFCoreSerDes). The consuming .NET DbContext must be configured with AvroKEFCoreSerDes on both key and value.
Configuration properties and example
This transformation has no additional configuration properties beyond the common base properties.
Example
transforms=ToKEFCore
transforms.ToKEFCore.type=org.mases.kefcoreopc.transformations.KEFCoreTransformationAvro
transforms.ToKEFCore.transformation.type=MonitoredItemNotificationTransformation
KEFCoreTransformationProtobuf
Java class: org.mases.kefcoreopc.transformations.KEFCoreTransformationProtobuf
Converts each OPC-UA change notification record to a KEFCore-compatible record serialized using the Protobuf format (ProtobufKEFCoreSerDes). The consuming .NET DbContext must be configured with ProtobufKEFCoreSerDes on both key and value.
Configuration properties and example
This transformation has no additional configuration properties beyond the common base properties.
Example
transforms=ToKEFCore
transforms.ToKEFCore.type=org.mases.kefcoreopc.transformations.KEFCoreTransformationProtobuf
transforms.ToKEFCore.transformation.type=MonitoredItemNotificationTransformation
MonitoredItemNotificationTransformation
transformation.type value: MonitoredItemNotificationTransformation
Converts each OPC-UA MonitoredItemNotification record received from the KNet for OPC Source connector into an instance of the MonitoredItemNotificationEntity EF Core entity. This is the default and currently the only available transformation.type.
The MonitoredItemNotificationEntity model
The MonitoredItemNotificationEntity model
MonitoredItemNotificationEntity is the EF Core entity produced by this transformation. It is the target type registered in the KEFCore DbContext and represents a single OPC-UA monitored node at a given point in time.
Primary key
| Property | .NET type | Description |
|---|---|---|
NodeIdString |
string |
String representation of the OPC-UA NodeId of the monitored item (e.g. ns=2;s=Realtimedata). Used as the Kafka record key and as the EF Core primary key. |
Complex type: MonitoredItemNotificationBag
The MonitoredItemNotification property of the entity is of complex type MonitoredItemNotificationBag. It contains all the fields extracted from the OPC-UA MonitoredItemNotification, including metadata and the typed scalar value.
Metadata fields
| Property | .NET type | Description |
|---|---|---|
ClientHandle |
uint |
The client-assigned handle of the monitored item, as defined in the OPC-UA subscription. |
StatusCode |
uint |
The OPC-UA StatusCode of the data value (raw code). Use StatusCode.IsGood() / IsBad() to evaluate quality. |
SourceTimestamp |
DateTime |
The timestamp assigned by the OPC-UA data source at the time the value changed. |
ServerTimestamp |
DateTime |
The timestamp assigned by the OPC-UA server at the time it processed the change notification. |
IsNull |
bool |
true if the OPC-UA value is null (i.e. no value was present in the notification). |
BuiltInType |
BuiltInType |
The OPC-UA BuiltInType of the value (e.g. Boolean, Int32, Double, String). Use this to determine which typed value property to read. |
ValueRank |
int |
The value rank of the data value (see OPC-UA specification). ValueRanks.Scalar (-1) indicates a scalar value. |
Typed value fields
Only the field corresponding to the BuiltInType of the current notification is populated; all other fields are null. Array and non-scalar values are not mapped to individual typed fields.
| Property | .NET type | OPC-UA BuiltInType |
|---|---|---|
Bool |
bool? |
Boolean |
Byte |
byte? |
Byte |
SByte |
sbyte? |
SByte |
Short |
short? |
Int16 |
UShort |
ushort? |
UInt16 |
Int |
int? |
Int32 |
UInt |
uint? |
UInt32 |
Long |
long? |
Int64 |
ULong |
ulong? |
UInt64 |
Float |
float? |
Float |
Double |
double? |
Double |
String |
string |
String |
Timestamp |
DateTime |
DateTime |
Guid |
Guid? |
Guid |
ByteString |
byte[] |
ByteString |
Note
When IsNull is true, all typed value fields are null or default. Always check IsNull before reading any typed value field.
Consuming the entity with KEFCore
The following example shows a minimal DbContext configuration to read MonitoredItemNotificationEntity records from a Kafka topic serialized with the JSON format.
The entity class MonitoredItemNotificationEntity is defined an external assembly, so EF Core uses ToKEFCoreTopic to pin a stable topic name and HasKEFCoreSerDes to declare the Avro SerDes — both resolved by KEFCore conventions at model build time:
using MASES.EntityFrameworkCore.KNet.Metadata;
using MASES.EntityFrameworkCore.KNet.Serialization.Avro;
using MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage;
using MASES.KNet.OPC.KEFCore.Model;
using Microsoft.EntityFrameworkCore;
public class OpcDbContext : KEFCoreDbContext
{
public DbSet<MonitoredItemNotificationEntity> MonitoredItems { get; set; }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
// Pin the topic name and declare the JSON SerDes on the entity
modelBuilder.Entity<MonitoredItemNotificationEntity>().ToKEFCoreTopic("opc.data"); // must match the transformation target topic
modelBuilder.Entity<MonitoredItemNotificationEntity>()
.HasKEFCoreSerDes( // must match the transformation target Serialization format
keySerDesSelectorType: typeof(AvroKEFCoreSerDes.Key.BinaryRaw<>),
valueSerDesSelectorType: typeof(AvroKEFCoreSerDes.ValueContainer.BinaryRaw<>),
valueContainerType: typeof(AvroValueContainer<>));
base.OnModelCreating(modelBuilder);
}
}
Before using the context, initialize the KEFCore global instance and create the context with the required connection properties:
// Initialize KEFCore once at application startup
KEFCore.CreateGlobalInstance();
using var ctx = new OpcDbContext()
{
BootstrapServers = "localhost:9092",
ApplicationId = "my-opc-reader",
};
ctx.Database.EnsureCreated();
Once the context is ready, standard LINQ queries can be used:
// Get the latest value of a specific node
var entity = ctx.MonitoredItems
.FirstOrDefault(e => e.NodeIdString == "ns=2;s=Realtimedata");
if (entity != null && !entity.MonitoredItemNotification.IsNull)
{
Console.WriteLine($"Value: {entity.MonitoredItemNotification.Double}");
Console.WriteLine($"Quality: {entity.MonitoredItemNotification.StatusCode}");
Console.WriteLine($"Timestamp: {entity.MonitoredItemNotification.SourceTimestamp}");
}