KNet for OPC (KNetOPC): KEFCore and SignalR integration
Overview
The KEFCore integration layer makes OPC-UA change notifications available as a standard EF Core data source backed by Apache Kafka™ topics. Because KEFCore keeps the DbContext continuously updated as new records arrive from the Kafka topic, it is a natural fit for push-based notification: any change tracked by EF Core's ChangeTracker can be forwarded in real time to connected browser clients using ASP.NET Core SignalR.
The result is a complete, middleware-free pipeline from the plant floor to the browser:
OPC-UA Server
└─► KNetOPC Source Connector (Kafka Connect)
└─► KEFCore transformation applied
└─► Apache Kafka™ topic
└─► KEFCore DbContext — ChangeTracker updated via EF Core StateManager
└─► SignalR Hub
└─► Browser / dashboard client
No additional message broker, no polling loop, no custom serialization layer. The same DbContext that a backend service uses for queries is the source of the SignalR push notifications.
Architecture
KEFCore change tracking
KEFCore keeps the DbContext in sync with the backing Kafka topic by routing incoming records through the EF Core StateManager, which updates entity state and raises the standard ChangeTracker events. No KEFCore-specific API is needed to subscribe to updates — ChangeTracker.StateChanged and ChangeTracker.Tracked are the correct integration points.
When a MonitoredItemNotificationEntity is added or updated in the DbContext (because the corresponding Kafka record changed), the ChangeTracker event fires and the application can forward the entity to any downstream consumer — including a SignalR hub.
Kafka Streams and RocksDB state store
KEFCore supports a set of advanced features built on Apache Kafka™ Streams, including the use of RocksDB as a persistent local state store, enabled via UsePersistentStorage = true on the context options. When active, the DbContext materializes the topic data into a RocksDB instance co-located with the application process. This has important consequences for the SignalR use case:
- Reliable initial snapshot. The full current state of the topic is available locally in RocksDB. A newly connected dashboard client can fetch a complete, consistent snapshot without replaying the entire Kafka topic or querying an external store.
- LINQ query performance. The
KEFCoreStoreLookupConventionallows per-entity query optimization flags — single-key lookup, range scan, reverse range scan, prefix scan — to be pushed directly into RocksDB's native API. Dashboards querying filtered subsets of nodes (e.g. all nodes under a given namespace prefix) benefit directly from this. Only the flags relevant to actual query patterns for that entity should be enabled. - Durability across restarts. RocksDB state survives process restarts. On restart, the application resumes from the persisted state and catches up only with records produced while it was offline, rather than re-reading the entire topic.
SignalR hub
The SignalR hub acts as the bridge between the ChangeTracker event and the connected browser clients. When a MonitoredItemNotificationEntity update is tracked, the hub broadcasts the new value to all clients subscribed to that node, or to filtered groups (e.g. by NodeId).
Hosting model
Both the KEFCore DbContext and the SignalR hub run inside the same ASP.NET Core host. The DbContext is registered as a hosted background service that consumes the Kafka topic continuously; the SignalR hub is registered as a standard hub endpoint. No inter-process communication is needed.
Integration pattern
- Register KEFCore as a hosted service in the ASP.NET Core DI container, configured against the Kafka topic receiving the KNetOPC KEFCore-transformed records. Set
UsePersistentStorage = trueif snapshot and query performance are required. - Subscribe to
ChangeTrackerevents inside the hosted service. On eachStateChangedorTrackedevent, resolveIHubContext<OpcDataHub>from DI and callClients.Group(...).SendAsync(...). - Expose the SignalR hub at a well-known endpoint. Browser clients connect via the SignalR JavaScript client and join node-specific groups.
- Expose a REST endpoint backed by the same
DbContextfor the initial page load — the client fetches the current snapshot (served from the RocksDB state store) and then switches to the SignalR push channel for subsequent updates.
Example
The following example shows the essential wiring. It assumes the KEFCore transformation is configured to write MonitoredItemNotificationEntity records to the topic opc.data.
Entity configuration
using MASES.EntityFrameworkCore.KNet.Metadata;
using MASES.EntityFrameworkCore.KNet.Serialization.Avro;
using MASES.EntityFrameworkCore.KNet.Serialization.Avro.Storage;
using MASES.KNet.OPC.KEFCore.Model;
using Microsoft.EntityFrameworkCore;
public class OpcDbContext : KEFCoreDbContext
{
public DbSet<MonitoredItemNotificationEntity> MonitoredItems { get; set; }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
// Pin the topic name and declare the JSON SerDes on the entity
modelBuilder.Entity<MonitoredItemNotificationEntity>().ToKEFCoreTopic("opc.data"); // must match the transformation target topic
modelBuilder.Entity<MonitoredItemNotificationEntity>().HasKEFCoreStoreLookup(prefixScan: true, keyRange: false); // Enable prefix scan for NodeId-based filtering; stable topic name
modelBuilder.Entity<MonitoredItemNotificationEntity>()
.HasKEFCoreSerDes( // must match the transformation target Serialization format
keySerDesSelectorType: typeof(AvroKEFCoreSerDes.Key.BinaryRaw<>),
valueSerDesSelectorType: typeof(AvroKEFCoreSerDes.ValueContainer.BinaryRaw<>),
valueContainerType: typeof(AvroValueContainer<>));
base.OnModelCreating(modelBuilder);
}
}
Hub definition
using Microsoft.AspNetCore.SignalR;
public class OpcDataHub : Hub
{
// Clients call this to join a node-specific group
public async Task SubscribeToNode(string nodeId)
=> await Groups.AddToGroupAsync(Context.ConnectionId, nodeId);
public async Task UnsubscribeFromNode(string nodeId)
=> await Groups.RemoveFromGroupAsync(Context.ConnectionId, nodeId);
}
Hosted background service
using Microsoft.AspNetCore.SignalR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Hosting;
public class OpcKafkaListenerService : BackgroundService
{
private readonly IHubContext<OpcDataHub> _hub;
private readonly IServiceScopeFactory _scopeFactory;
public OpcKafkaListenerService(
IHubContext<OpcDataHub> hub,
IServiceScopeFactory scopeFactory)
{
_hub = hub;
_scopeFactory = scopeFactory;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await using var scope = _scopeFactory.CreateAsyncScope();
var db = scope.ServiceProvider.GetRequiredService<OpcDbContext>();
// KEFCore routes incoming Kafka records through the EF Core StateManager,
// which raises standard ChangeTracker events. No KEFCore-specific API needed.
db.ChangeTracker.StateChanged += async (_, args) =>
{
if (args.Entry.Entity is MonitoredItemNotificationEntity entity
&& args.NewState == EntityState.Modified)
{
await _hub.Clients
.Group(entity.NodeId)
.SendAsync("NodeUpdated", entity, stoppingToken);
}
};
db.ChangeTracker.Tracked += async (_, args) =>
{
if (args.Entry.Entity is MonitoredItemNotificationEntity entity
&& !args.FromQuery)
{
await _hub.Clients
.Group(entity.NodeId)
.SendAsync("NodeUpdated", entity, stoppingToken);
}
};
await db.Set<MonitoredItemNotificationEntity>().LoadAsync(stoppingToken);
await Task.Delay(Timeout.Infinite, stoppingToken);
}
}
ASP.NET Core startup
// Initialize KEFCore once at application startup
KEFCore.CreateGlobalInstance();
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSignalR();
builder.Services.AddHostedService<OpcKafkaListenerService>();
builder.Services.AddDbContext<OpcDbContext>(options =>
options.UseKafkaCluster("opc-cluster", cluster =>
cluster.WithBootstrapServers("localhost:9092"))
.UsePersistentStorage(true)); // enables RocksDB state store
var app = builder.Build();
app.MapHub<OpcDataHub>("/opc-hub");
// REST endpoint for initial snapshot — served directly from the RocksDB state store
app.MapGet("/api/opc/nodes", async (OpcDbContext db) =>
await db.MonitoredItems.ToListAsync());
app.Run();
Browser client (JavaScript)
const connection = new signalR.HubConnectionBuilder()
.withUrl("/opc-hub")
.withAutomaticReconnect()
.build();
connection.on("NodeUpdated", (entity) => {
console.log(`${entity.nodeId}: ${entity.value} @ ${entity.sourceTimestamp}`);
// update chart, gauge, table row, etc.
});
// Fetch current state from RocksDB before switching to push
const snapshot = await fetch("/api/opc/nodes").then(r => r.json());
renderDashboard(snapshot);
await connection.start();
await connection.invoke("SubscribeToNode", "ns=2;s=Realtimedata");
Considerations
Throughput and back-pressure. ChangeTracker events are raised on the EF Core pipeline. For high-frequency OPC-UA nodes (sub-100 ms publishing intervals), avoid doing heavy work inside the event handler directly. Instead, write updated entities into a Channel<MonitoredItemNotificationEntity> and have a dedicated consumer task forward them to the SignalR hub, so the KEFCore consumption loop is never blocked.
Group granularity. Using the NodeId as the SignalR group name works well when clients subscribe to specific nodes. For broader dashboards that display many nodes simultaneously, broadcasting to Clients.All is simpler and avoids group management overhead.
RocksDB and the initial snapshot. When UsePersistentStorage = true, the snapshot served by the REST endpoint is read directly from the local RocksDB store — no Kafka replay required. The snapshot is consistent with the current state of the DbContext and is available immediately after startup. Without RocksDB, state is held in memory only and is lost on restart.
Store lookup flags. The KEFCoreStoreLookupConvention allows per-entity query optimization flags to be declared via [KEFCoreStoreLookupAttribute] or the HasKEFCoreStoreLookup() fluent API. Enable only the flags that match the actual query patterns for that entity — for example, UseStorePrefixScan = true is appropriate for queries filtering by NodeId namespace prefix, but adds overhead if never used. See conventions and performance tips for guidance.
Scaling out. When running multiple ASP.NET Core instances, use the SignalR Redis backplane or Azure SignalR Service to fan out hub messages across nodes. For the KEFCore consumer, assign each instance to the same Kafka consumer group so that topic partitions are distributed across instances automatically, and each instance only processes — and broadcasts — the records it owns.