KNet for OPC (KNetOPC): Introduction to predicates
Overview
Predicates are filter conditions evaluated on each OPC-UA change notification record before it is published to an Apache Kafka™ topic. When a predicate returns false for a given record, that record is discarded and never written to the topic.
KNetOPC predicates extend the Apache Kafka™ Connect predicate mechanism, which is a standard feature of the Kafka Connect framework. They integrate natively with KNet Connect SDK and follow the same configuration and activation model used by any Kafka Connect predicate.
Architecture
KNetOPC predicates have a two-layer architecture:
- Java layer: activates the predicate within the Apache Kafka™ Connect runtime and exposes the configuration to the connector properties file. Each predicate is identified by its fully-qualified Java class name (e.g.
org.mases.knetopc.source.transformations.predicates.KNetOPCPredicateGood). - .NET layer: contains the actual filtering logic. The Java layer delegates execution to the corresponding .NET class via KNet Connect SDK. The .NET assembly involved is
MASES.KNet.OPC.TransformationsPredicates.
This design keeps the OPC-UA–specific logic in .NET while staying fully compatible with the standard Kafka Connect predicate infrastructure.
How predicates are applied
The predicate pipeline is applied by KNet before any transformation and before the record is handed to the Kafka Connect framework for publishing. The evaluation sequence is:
OPC-UA Server
└─► KNet Connect SDK receives change notification
└─► Predicates evaluated (record filtered if false)
└─► Transformations applied
└─► Record published to Apache Kafka™ topic
If no predicate is configured, all change notifications are published.
Configuration
Predicates are configured in the connector properties file using the standard Kafka Connect predicate syntax. The predicates key lists one or more predicate aliases; each alias is then configured with its own predicates.<alias>.* properties.
Common configuration properties
All KNetOPC predicates share the following base properties, in addition to any predicate-specific ones:
| Property | Type | Required | Default | Description |
|---|---|---|---|---|
application.name |
string | No | "" |
Application name used in logging and telemetry. |
application.version |
string | No | "" |
Application version used in logging and telemetry. |
Example: single predicate
The following example configures the connector to publish only records with a Good OPC-UA status code:
predicates=OnlyGood
predicates.OnlyGood.type=org.mases.knetopc.source.transformations.predicates.KNetOPCPredicateGood
predicates.OnlyGood.application.name=MyApp
predicates.OnlyGood.application.version=1.0.0
Example: multiple predicates with negation
Kafka Connect allows combining predicates with the standard negate flag and by applying a predicate to a specific transformation. The following example discards records with a Bad status code and records older than 60 seconds:
predicates=NoBad,NotTooOld
predicates.NoBad.type=org.mases.knetopc.source.transformations.predicates.KNetOPCPredicateBad
predicates.NoBad.negate=true
predicates.NotTooOld.type=org.mases.knetopc.source.transformations.predicates.KNetOPCPredicateNotOldThan
predicates.NotTooOld.delta.time.in.seconds=60
Note
The negate property is a standard Kafka Connect feature: when set to true, the predicate result is inverted. In the example above, KNetOPCPredicateBad with negate=true passes only records that are not Bad.
Available predicates
The following predicates are available out of the box. For the full configuration reference of each predicate see Available predicates.
| Class name | Description |
|---|---|
KNetOPCPredicateGood |
Passes records whose OPC-UA StatusCode is Good. |
KNetOPCPredicateBad |
Passes records whose OPC-UA StatusCode is Bad. |
KNetOPCPredicateBoolean |
Passes records whose node value (Boolean type) matches a configured value. |
KNetOPCPredicateNotOldThan |
Passes records whose timestamp is not older than a configured number of seconds. |
KNetOPCPredicateValueCompare |
Passes records whose scalar node value satisfies a comparison condition against a configured threshold. |