Table of Contents

KNet for OPC (KNetOPC): Introduction to predicates

Overview

Predicates are filter conditions evaluated on each OPC-UA change notification record before it is published to an Apache Kafka™ topic. When a predicate returns false for a given record, that record is discarded and never written to the topic.

KNetOPC predicates extend the Apache Kafka™ Connect predicate mechanism, which is a standard feature of the Kafka Connect framework. They integrate natively with KNet Connect SDK and follow the same configuration and activation model used by any Kafka Connect predicate.

Architecture

KNetOPC predicates have a two-layer architecture:

  • Java layer: activates the predicate within the Apache Kafka™ Connect runtime and exposes the configuration to the connector properties file. Each predicate is identified by its fully-qualified Java class name (e.g. org.mases.knetopc.source.transformations.predicates.KNetOPCPredicateGood).
  • .NET layer: contains the actual filtering logic. The Java layer delegates execution to the corresponding .NET class via KNet Connect SDK. The .NET assembly involved is MASES.KNet.OPC.TransformationsPredicates.

This design keeps the OPC-UA–specific logic in .NET while staying fully compatible with the standard Kafka Connect predicate infrastructure.

How predicates are applied

The predicate pipeline is applied by KNet before any transformation and before the record is handed to the Kafka Connect framework for publishing. The evaluation sequence is:

OPC-UA Server
    └─► KNet Connect SDK receives change notification
            └─► Predicates evaluated (record filtered if false)
                    └─► Transformations applied
                            └─► Record published to Apache Kafka™ topic

If no predicate is configured, all change notifications are published.

Configuration

Predicates are configured in the connector properties file using the standard Kafka Connect predicate syntax. The predicates key lists one or more predicate aliases; each alias is then configured with its own predicates.<alias>.* properties.

Common configuration properties

All KNetOPC predicates share the following base properties, in addition to any predicate-specific ones:

Property Type Required Default Description
application.name string No "" Application name used in logging and telemetry.
application.version string No "" Application version used in logging and telemetry.

Example: single predicate

The following example configures the connector to publish only records with a Good OPC-UA status code:

predicates=OnlyGood
predicates.OnlyGood.type=org.mases.knetopc.source.transformations.predicates.KNetOPCPredicateGood
predicates.OnlyGood.application.name=MyApp
predicates.OnlyGood.application.version=1.0.0

Example: multiple predicates with negation

Kafka Connect allows combining predicates with the standard negate flag and by applying a predicate to a specific transformation. The following example discards records with a Bad status code and records older than 60 seconds:

predicates=NoBad,NotTooOld
predicates.NoBad.type=org.mases.knetopc.source.transformations.predicates.KNetOPCPredicateBad
predicates.NoBad.negate=true
predicates.NotTooOld.type=org.mases.knetopc.source.transformations.predicates.KNetOPCPredicateNotOldThan
predicates.NotTooOld.delta.time.in.seconds=60
Note

The negate property is a standard Kafka Connect feature: when set to true, the predicate result is inverted. In the example above, KNetOPCPredicateBad with negate=true passes only records that are not Bad.

Available predicates

The following predicates are available out of the box. For the full configuration reference of each predicate see Available predicates.

Class name Description
KNetOPCPredicateGood Passes records whose OPC-UA StatusCode is Good.
KNetOPCPredicateBad Passes records whose OPC-UA StatusCode is Bad.
KNetOPCPredicateBoolean Passes records whose node value (Boolean type) matches a configured value.
KNetOPCPredicateNotOldThan Passes records whose timestamp is not older than a configured number of seconds.
KNetOPCPredicateValueCompare Passes records whose scalar node value satisfies a comparison condition against a configured threshold.