diff --git a/go.mod b/go.mod
index fa91210beccf1..c1d29d2d59055 100644
--- a/go.mod
+++ b/go.mod
@@ -159,6 +159,7 @@ require (
github.com/p4lang/p4runtime v1.4.0
github.com/pborman/ansi v1.0.0
github.com/pcolladosoto/goslurm v0.1.0
+ github.com/pelletier/go-toml v1.9.5
github.com/peterbourgon/unixtransport v0.0.4
github.com/pion/dtls/v2 v2.2.12
github.com/prometheus-community/pro-bing v0.4.1
diff --git a/go.sum b/go.sum
index 58b901287ccef..f49ca46febd30 100644
--- a/go.sum
+++ b/go.sum
@@ -2071,6 +2071,7 @@ github.com/pcolladosoto/goslurm v0.1.0 h1:d2KigvDfsIIeVeHHj/pTtajz2T0cHHqhGk9iJW
github.com/pcolladosoto/goslurm v0.1.0/go.mod h1:eLuBFfN/tj4O/HDMrAJXb+3s3rGhdHQVZFcOUV1Sbbo=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc=
+github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pengsrc/go-shared v0.2.1-0.20190131101655-1999055a4a14 h1:XeOYlK9W1uCmhjJSsY78Mcuh7MVkNjTzmHx1yBzizSU=
github.com/pengsrc/go-shared v0.2.1-0.20190131101655-1999055a4a14/go.mod h1:jVblp62SafmidSkvWrXyxAme3gaTfEtWwRPGz5cpvHg=
diff --git a/plugins/inputs/all/opcua_event_subscription.go b/plugins/inputs/all/opcua_event_subscription.go
new file mode 100644
index 0000000000000..2ea3e28721fe9
--- /dev/null
+++ b/plugins/inputs/all/opcua_event_subscription.go
@@ -0,0 +1,5 @@
+//go:build !custom || inputs || inputs.opcua_event_subscription
+
+package all
+
+import _ "github.com/influxdata/telegraf/plugins/inputs/opcua_event_subscription" // register plugin
diff --git a/plugins/inputs/opcua_event_subscription/README.md b/plugins/inputs/opcua_event_subscription/README.md
new file mode 100644
index 0000000000000..b3f3b22969536
--- /dev/null
+++ b/plugins/inputs/opcua_event_subscription/README.md
@@ -0,0 +1,159 @@
+# OPC UA Event Monitoring Telegraf Plugin
+
+This input plugin, `opcua_event_subscription`, enables monitoring of
+OPC UA events by subscribing to specific node IDs and filtering events based on
+event_type and source_name. The plugin also supports secure OPC UA connections,
+allowing the use of client certificates and private keys for encrypted
+communication with the server.
+
+## Features
+
+- Connects to an OPC UA server to subscribe to a specified event_type.
+- Filters events based on source_name and event_type.
+- Allows configuration of specific node IDs and fields to monitor for event data.
+- Supports secure OPC UA connections, including options for setting SecurityMode (None, Sign, SignAndEncrypt) and SecurityPolicy (None, Basic256Sha256).
+- Allows the use of client certificates and private keys for secure communication with the OPC UA server, enabling encrypted connections.
+
+## Requirements
+
+- [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/)
+- Go environment for building the plugin.
+- An accessible OPC UA server with alarms and conditions support.
+- [gopcua](https://github.com/gopcua/opcua) Go client library.
+
+## Installation
+
+- Place the `opcua_event_subscription` plugin in the Telegraf plugin directory.
+- Ensure the `opcua_event_subscription` directory is included in your Go path.
+- Build and install the plugin according to Telegraf’s external plugin documentation.
+
+## Configuration
+
+```toml @sample.conf
+[[inputs.opcua_event_subscription]]
+ ## OPC UA Server Endpoint
+ endpoint = "opc.tcp://localhost:4840"
+
+ ## Polling interval
+ interval = "10s"
+
+ ## Event Type Filter
+ event_type = "ns=0;i=2041"
+
+ ## Node IDs to subscribe to
+ node_ids = ["ns=2;s=0:East/Blue"]
+
+ ## Source Name Filter (optional)
+ source_names = ["SourceName1", "SourceName2"]
+
+ ## Fields to retrieve (optional)
+ fields = ["Severity", "Message", "Time"]
+
+ ## Security mode and policy (optional)
+ security_mode = "None"
+ security_policy = "None"
+
+ ## Client certificate and key (optional)
+ certificate = "/path/to/cert.pem"
+ private_key = "/path/to/key.pem"
+
+ ## Connection and Request Timeout (optional)
+ connection_timeout = "10s"
+ request_timeout = "10s"
+```
+
+## Configuration Parameters
+
+- `endpoint` The OPC UA server’s endpoint URL.
+- `interval` Polling interval for data collection, e.g., 10s.
+- `node_ids` A list of OPC UA node identifiers (NodeIds) specifying the nodes to monitor for event notifications, which are associated with the defined event type.
+- `event_type` Defines the type or level of events to capture from the monitored nodes.
+- `source_names` Specifies OPCUA Event source_names to filter on
+- `fields` Specifies the fields to capture from event notifications.
+- `security_mode` Sets the OPC UA security mode (None, Sign, SignAndEncrypt).
+- `security_policy` Specifies the OPC UA security policy (None, Basic256Sha256).
+- `certificate` Path to the client certificate (in PEM format) if required.
+- `private_key` Path to the private key (in PEM format) if required.
+- `connection_timeout` Defines the maximum time the client will wait to establish a connection with the server before considering the attempt failed (default 10s).
+- `request_timeout` Specifies the maximum time the client will wait for a response from the server after a request has been sent before timing out (default 10s).
+
+## Global configuration options
+
+In addition to the plugin-specific configuration settings, plugins support
+additional global and plugin configuration settings. These settings are used to
+modify metrics, tags, and field or create aliases and configure ordering, etc.
+See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
+
+[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
+
+## Security
+
+If secure connections are required, set security_mode
+and security_policy based on the OPC UA server’s requirements.
+Provide paths to certificate and private_key in PEM format.
+
+## How it works
+
+Once Telegraf starts with this plugin, it establishes a connection
+to the OPC UA server, subscribes to the specified event_type’s Node-ID,
+and collects events that meet the defined criteria.
+The `node_ids` parameter specifies the nodes to monitor for
+events (monitored items).
+However, the actual subscription is based on the `event_type`,
+which determines the events that are capture.
+
+## Metrics
+
+Measurement names are based on the OPC UA fields selected in the
+telegraf config.
+All the fields are added to the Output `fields`.
+All metrics receive the node_id & opcua_host `tags` indicating
+the related NodeID and OPCUA Server where the event is coming from.
+
+## Example Output
+
+```bash
+{
+ "fields": {
+ "EventType": "i=10751",
+ "Message": "The alarm severity has increased.",
+ "SourceName": "SouthMotor",
+ "Time": "2024-12-09 07:46:48.8492578 +0000 UTC"
+ },
+ "name": "opcua_event_subscription",
+ "tags": {
+ "host": "myHost",
+ "node_id": "ns=2;s=0:East/Blue",
+ "opcua_host": "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer"
+ },
+ "timestamp": 1733730411
+}
+```
+
+## Troubleshooting
+
+- Ensure this plugin directory is in Telegraf’s Go path.
+- Compile and run Telegraf with this plugin enabled to verify the connection and data collection.
+- Check the Telegraf logs for any configuration or connection errors, and troubleshoot accordingly.
+
+## Development
+
+For testing purposes,
+you can test the plugin using the `opcua_event_subscription_test` file.
+The tests will automatically use the
+`SampleConfig` defined in the plugin and connect to a
+demo OPC UA server to perform subscriptions.
+To run the tests, simply execute the following command:
+
+```batch
+ go test -v
+```
+
+## Limitations
+
+- Does not allow multiple event_types within one subscription. To subscribe to multiple event_types use multiple input plugins within your telegraf.conf.
+- Where-Filter is limited to the OPC-UA field source_name.
+- This Plugin is only developed for event notifications. Data Change notifications are not supported.
+- SamplingInterval is set to 10000.0 // 10 sec.
+- QueueSize is set to 10.
+- All retrieved fields are forwarded as `fields`, while the opcua_host and the node_id that triggers the event are forwarded as `tags`
diff --git a/plugins/inputs/opcua_event_subscription/nodeID_wrapper.go b/plugins/inputs/opcua_event_subscription/nodeID_wrapper.go
new file mode 100644
index 0000000000000..dd362b1232f1f
--- /dev/null
+++ b/plugins/inputs/opcua_event_subscription/nodeID_wrapper.go
@@ -0,0 +1,19 @@
+package opcua_event_subscription
+
+import (
+ "fmt"
+ "github.com/gopcua/opcua/ua"
+)
+
+type NodeIDWrapper struct {
+ ID *ua.NodeID
+}
+
+func (n *NodeIDWrapper) UnmarshalText(text []byte) error {
+ nodeID, err := ua.ParseNodeID(string(text))
+ if err != nil {
+ return fmt.Errorf("failed to parse NodeID from text: %w", err)
+ }
+ n.ID = nodeID
+ return nil
+}
diff --git a/plugins/inputs/opcua_event_subscription/notification_handler.go b/plugins/inputs/opcua_event_subscription/notification_handler.go
new file mode 100644
index 0000000000000..ad55a2dbb7ced
--- /dev/null
+++ b/plugins/inputs/opcua_event_subscription/notification_handler.go
@@ -0,0 +1,67 @@
+package opcua_event_subscription
+
+import (
+ "fmt"
+ "github.com/gopcua/opcua"
+ "github.com/gopcua/opcua/ua"
+ "github.com/influxdata/telegraf"
+ "sync"
+)
+
+type NotificationHandler struct {
+ Fields []string
+ Log telegraf.Logger
+ Endpoint string
+ ClientHandleToNodeID *sync.Map
+}
+
+func (nh *NotificationHandler) HandleNotification(notification *opcua.PublishNotificationData, acc telegraf.Accumulator) {
+ switch v := notification.Value.(type) {
+ case *ua.EventNotificationList:
+ nh.handleEventNotification(v, acc)
+ default:
+ nh.Log.Infof("Received unknown notification type: %T", v)
+ }
+}
+
+func (nh *NotificationHandler) handleEventNotification(notification *ua.EventNotificationList, acc telegraf.Accumulator) {
+ for _, event := range notification.Events {
+ fields := make(map[string]interface{})
+ for i, field := range event.EventFields {
+ fieldName := nh.Fields[i]
+ value := field.Value()
+
+ if fieldName == "Message" {
+ if localizedText, ok := value.(*ua.LocalizedText); ok {
+ fields["Message"] = localizedText.Text
+ } else {
+ nh.Log.Warnf("Message field is not of type *ua.LocalizedText: %T", value)
+ }
+ continue
+ }
+ var stringValue string
+ switch v := value.(type) {
+ case string:
+ stringValue = v
+ case fmt.Stringer:
+ stringValue = v.String()
+ case nil:
+ stringValue = "null"
+ default:
+ stringValue = fmt.Sprintf("%v", v)
+ }
+ fields[fieldName] = stringValue
+ }
+
+ nodeID, ok := nh.ClientHandleToNodeID.Load(event.ClientHandle)
+ if !ok {
+ nh.Log.Warnf("NodeId not found for ClientHandle: %d", event.ClientHandle)
+ nodeID = "unknown"
+ }
+ tags := map[string]string{
+ "node_id": nodeID.(string),
+ "opcua_host": nh.Endpoint,
+ }
+ acc.AddFields("opcua_event_subscription", fields, tags)
+ }
+}
diff --git a/plugins/inputs/opcua_event_subscription/opcua_event_subscription.go b/plugins/inputs/opcua_event_subscription/opcua_event_subscription.go
new file mode 100644
index 0000000000000..3701521491f56
--- /dev/null
+++ b/plugins/inputs/opcua_event_subscription/opcua_event_subscription.go
@@ -0,0 +1,207 @@
+package opcua_event_subscription
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/gopcua/opcua"
+ "github.com/influxdata/telegraf"
+ "github.com/influxdata/telegraf/config"
+ opcuaclient "github.com/influxdata/telegraf/plugins/common/opcua"
+ "github.com/influxdata/telegraf/plugins/inputs"
+)
+
+type OpcuaEventSubscription struct {
+ Endpoint string `toml:"endpoint"`
+ Interval config.Duration `toml:"interval"`
+ EventType NodeIDWrapper `toml:"event_type"`
+ NodeIDs []NodeIDWrapper `toml:"node_ids"`
+ SourceNames []string `toml:"source_names"`
+ Fields []string `toml:"fields"`
+ SecurityMode string `toml:"security_mode"`
+ SecurityPolicy string `toml:"security_policy"`
+ Certificate string `toml:"certificate"`
+ PrivateKey string `toml:"private_key"`
+ ConnectionTimeout config.Duration `toml:"connection_timeout"`
+ RequestTimeout config.Duration `toml:"request_timeout"`
+
+ Client *opcuaclient.OpcUAClient
+ SubscriptionManager *SubscriptionManager
+ NotificationHandler *NotificationHandler
+ Cancel context.CancelFunc
+ Log telegraf.Logger
+ ClientHandleToNodeID sync.Map
+}
+
+func (o *OpcuaEventSubscription) SampleConfig() string {
+ return `
+ ## OPC UA Server Endpoint
+ endpoint = "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer"
+
+ ## Polling interval
+ interval = "10s"
+
+ ## Event Type Filter
+ event_type = "ns=0;i=2041"
+
+ ## Node IDs to subscribe to
+ node_ids = ["ns=2;s=0:East/Blue"]
+
+ ## Source Name Filter (optional)
+ source_names = ["SourceName1", "SourceName2"]
+
+ ## Fields to be returned
+ fields = ["Severity", "Message"]
+
+ ## Security mode and policy (optional)
+ security_mode = "None"
+ security_policy = "None"
+
+ ## Client certificate and key (optional)
+ certificate = ""
+ private_key = ""
+
+ ## Connection and Request Timeout (optional)
+ connection_timeout = "10s"
+ request_timeout = "10s"
+ `
+}
+
+func (o *OpcuaEventSubscription) Start() error {
+ o.Log.Info("******************START******************")
+
+ if o.Endpoint == "" {
+ return errors.New("missing mandatory field: endpoint")
+ }
+
+ if o.Interval <= 0 {
+ return errors.New("missing or invalid mandatory field: interval")
+ }
+
+ if len(o.NodeIDs) == 0 {
+ return errors.New("missing mandatory field: node_ids")
+ }
+
+ if o.EventType.ID == nil {
+ return errors.New("missing mandatory field: event_type")
+ }
+
+ if len(o.Fields) == 0 {
+ return errors.New("missing mandatory field: fields")
+ }
+
+ if o.ConnectionTimeout == 0 {
+ o.Log.Debug("ConnectionTimeout not set. Set to default value of 10s")
+ o.ConnectionTimeout = config.Duration(10 * time.Second) // Default to 10 seconds
+ }
+ if o.RequestTimeout == 0 {
+ o.Log.Debug("RequestTimeout not set. Set to default value of 10s")
+ o.RequestTimeout = config.Duration(10 * time.Second) // Default to 10 seconds
+ }
+
+ clientConfig := &opcuaclient.OpcUAClientConfig{
+ Endpoint: o.Endpoint,
+ SecurityPolicy: o.SecurityPolicy,
+ SecurityMode: o.SecurityMode,
+ Certificate: o.Certificate,
+ PrivateKey: o.PrivateKey,
+ ConnectTimeout: o.ConnectionTimeout,
+ RequestTimeout: o.RequestTimeout,
+ }
+
+ client, err := clientConfig.CreateClient(o.Log)
+ if err != nil {
+ return fmt.Errorf("failed to create OPC UA client: %w", err)
+ }
+ o.Client = client
+
+ err = o.Client.Connect(context.Background())
+ if err != nil {
+ return fmt.Errorf("failed to connect to OPC UA server: %w", err)
+ }
+
+ o.SubscriptionManager = &SubscriptionManager{
+ Client: o.Client.Client,
+ EventType: o.EventType,
+ NodeIDs: o.NodeIDs,
+ Fields: o.Fields,
+ SourceNames: o.SourceNames,
+ Log: o.Log,
+ Interval: time.Duration(o.Interval),
+ ClientHandleToNodeID: &o.ClientHandleToNodeID,
+ }
+
+ o.NotificationHandler = &NotificationHandler{
+ Fields: o.Fields,
+ Log: o.Log,
+ Endpoint: o.Endpoint,
+ ClientHandleToNodeID: &o.ClientHandleToNodeID,
+ }
+
+ return nil
+}
+
+func (o *OpcuaEventSubscription) Gather(acc telegraf.Accumulator) error {
+ if o.Client == nil {
+ return errors.New("client is not initialized")
+ }
+
+ if len(o.SubscriptionManager.subscriptions) == 0 {
+ ctx := context.Background()
+ notifyCh := make(chan *opcua.PublishNotificationData)
+
+ if err := o.SubscriptionManager.CreateSubscription(ctx, notifyCh); err != nil {
+ return fmt.Errorf("failed to create subscription: %w", err)
+ }
+
+ if err := o.SubscriptionManager.Subscribe(ctx); err != nil {
+ return fmt.Errorf("failed to subscribe: %w", err)
+ }
+
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ o.Log.Warn("Context cancelled, stopping Goroutine")
+ return
+ case notification := <-notifyCh:
+ if notification.Error != nil {
+ o.Log.Errorf("Notification error: %v", notification.Error)
+ continue
+ }
+ o.NotificationHandler.HandleNotification(notification, acc)
+ }
+ }
+ }()
+ }
+
+ return nil
+}
+
+func (o *OpcuaEventSubscription) Stop() {
+ o.Log.Info("******************STOP******************")
+ if o.Cancel != nil {
+ o.Cancel()
+ }
+ if o.Client != nil {
+ for _, sub := range o.SubscriptionManager.subscriptions {
+ err := sub.Cancel(context.Background())
+ if err != nil {
+ o.Log.Errorf("Failed to cancel subscription: %v", err)
+ }
+ }
+ err := o.Client.Disconnect(context.Background())
+ if err != nil {
+ o.Log.Errorf("Failed to disconnect client: %v", err)
+ }
+ }
+}
+
+func init() {
+ inputs.Add("opcua_event_subscription", func() telegraf.Input {
+ return &OpcuaEventSubscription{}
+ })
+}
diff --git a/plugins/inputs/opcua_event_subscription/opcua_event_subscription_test.go b/plugins/inputs/opcua_event_subscription/opcua_event_subscription_test.go
new file mode 100644
index 0000000000000..4db823bbc67a8
--- /dev/null
+++ b/plugins/inputs/opcua_event_subscription/opcua_event_subscription_test.go
@@ -0,0 +1,105 @@
+package opcua_event_subscription
+
+import (
+ "fmt"
+ "github.com/influxdata/telegraf/config"
+ "github.com/influxdata/telegraf/testutil"
+ "github.com/pelletier/go-toml"
+ "github.com/stretchr/testify/require"
+ "testing"
+ "time"
+)
+
+type TempConfig struct {
+ Endpoint string `toml:"endpoint"`
+ Interval string `toml:"interval"`
+ EventType string `toml:"event_type"`
+ NodeIDs []string `toml:"node_ids"`
+ SourceNames []string `toml:"source_names"`
+ Fields []string `toml:"fields"`
+ SecurityMode string `toml:"security_mode"`
+ SecurityPolicy string `toml:"security_policy"`
+ Certificate string `toml:"certificate"`
+ PrivateKey string `toml:"private_key"`
+}
+
+func LoadSampleConfigToPlugin() (*OpcuaEventSubscription, error) {
+ plugin := &OpcuaEventSubscription{}
+ sampleConfig := plugin.SampleConfig()
+ tempConfig := &TempConfig{}
+
+ err := toml.Unmarshal([]byte(sampleConfig), tempConfig)
+ if err != nil {
+ return nil, fmt.Errorf("failed to unmarshal sample config: %w", err)
+ }
+
+ plugin.Endpoint = tempConfig.Endpoint
+ plugin.Interval = config.Duration(time.Second * 10) // Default to 10s for simplicity
+ plugin.EventType = NodeIDWrapper{}
+ if err := plugin.EventType.UnmarshalText([]byte(tempConfig.EventType)); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal EventType: %w", err)
+ }
+
+ for _, nodeIDStr := range tempConfig.NodeIDs {
+ nodeIDWrapper := NodeIDWrapper{}
+ if err := nodeIDWrapper.UnmarshalText([]byte(nodeIDStr)); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal NodeID: %w", err)
+ }
+ plugin.NodeIDs = append(plugin.NodeIDs, nodeIDWrapper)
+ }
+
+ plugin.SourceNames = tempConfig.SourceNames
+ plugin.Fields = tempConfig.Fields
+ plugin.SecurityMode = tempConfig.SecurityMode
+ plugin.SecurityPolicy = tempConfig.SecurityPolicy
+ plugin.Certificate = tempConfig.Certificate
+ plugin.PrivateKey = tempConfig.PrivateKey
+
+ return plugin, nil
+}
+
+func TestStart(t *testing.T) {
+ plugin, err := LoadSampleConfigToPlugin()
+ require.NoError(t, err)
+
+ plugin.Log = testutil.Logger{}
+
+ err = plugin.Start()
+ require.NoError(t, err)
+
+ require.NotNil(t, plugin.SubscriptionManager)
+ require.NotNil(t, plugin.NotificationHandler)
+
+ // Clean up after the test
+ plugin.Stop()
+}
+
+func TestGather(t *testing.T) {
+ plugin, err := LoadSampleConfigToPlugin()
+ require.NoError(t, err)
+ plugin.Log = testutil.Logger{}
+
+ acc := &testutil.Accumulator{}
+
+ err = plugin.Start()
+ require.NoError(t, err)
+
+ err = plugin.Gather(acc)
+ require.NoError(t, err)
+
+ plugin.Stop()
+}
+
+func TestStop(t *testing.T) {
+ plugin, err := LoadSampleConfigToPlugin()
+ require.NoError(t, err)
+ plugin.Log = testutil.Logger{}
+
+ err = plugin.Start()
+ require.NoError(t, err)
+
+ plugin.SubscriptionManager = &SubscriptionManager{}
+
+ plugin.Stop()
+ require.Nil(t, plugin.Cancel)
+}
diff --git a/plugins/inputs/opcua_event_subscription/subscription_manager.go b/plugins/inputs/opcua_event_subscription/subscription_manager.go
new file mode 100644
index 0000000000000..6a1c80c912106
--- /dev/null
+++ b/plugins/inputs/opcua_event_subscription/subscription_manager.go
@@ -0,0 +1,146 @@
+package opcua_event_subscription
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ "github.com/gopcua/opcua"
+ "github.com/gopcua/opcua/id"
+ "github.com/gopcua/opcua/ua"
+ "github.com/influxdata/telegraf"
+ "sync"
+ "time"
+)
+
+type SubscriptionManager struct {
+ Client *opcua.Client
+ NodeIDs []NodeIDWrapper
+ Fields []string
+ EventType NodeIDWrapper
+ SourceNames []string
+ NotifyChannels []chan *opcua.PublishNotificationData
+ subscriptions []*opcua.Subscription
+ Log telegraf.Logger
+ Interval time.Duration
+ ClientHandleToNodeID *sync.Map
+}
+
+func (sm *SubscriptionManager) CreateSubscription(ctx context.Context, notifyCh chan *opcua.PublishNotificationData) error {
+ if len(sm.subscriptions) == 0 {
+ if ctx == nil {
+ return errors.New("context is nil")
+ }
+ if notifyCh == nil {
+ return errors.New("notification channel is nil")
+ }
+ sm.NotifyChannels = append(sm.NotifyChannels, notifyCh)
+
+ sub, err := sm.Client.Subscribe(ctx, &opcua.SubscriptionParameters{
+ Interval: sm.Interval,
+ }, notifyCh)
+ if err != nil {
+ return fmt.Errorf("failed to create subscription: %w", err)
+ }
+ sm.subscriptions = append(sm.subscriptions, sub)
+ }
+ return nil
+}
+
+func (sm *SubscriptionManager) Subscribe(ctx context.Context) error {
+ filter := ua.EventFilter{
+ SelectClauses: sm.createSelectClauses(),
+ WhereClause: sm.createWhereClauses(),
+ }
+
+ filterExtObj := ua.ExtensionObject{
+ EncodingMask: ua.ExtensionObjectBinary,
+ TypeID: &ua.ExpandedNodeID{
+ NodeID: ua.NewNumericNodeID(0, id.EventFilter_Encoding_DefaultBinary),
+ },
+ Value: filter,
+ }
+
+ for i, nodeID := range sm.NodeIDs {
+ miCreateRequest := &ua.MonitoredItemCreateRequest{
+ ItemToMonitor: &ua.ReadValueID{
+ NodeID: nodeID.ID,
+ AttributeID: ua.AttributeIDEventNotifier,
+ DataEncoding: &ua.QualifiedName{},
+ },
+ MonitoringMode: ua.MonitoringModeReporting,
+ RequestedParameters: &ua.MonitoringParameters{
+ ClientHandle: uint32(i),
+ SamplingInterval: 10000.0, // 10 seconds
+ QueueSize: 10,
+ DiscardOldest: true,
+ Filter: &filterExtObj,
+ },
+ }
+ sm.ClientHandleToNodeID.Store(uint32(i), nodeID.ID.String())
+ res, err := sm.subscriptions[0].Monitor(ctx, ua.TimestampsToReturnBoth, miCreateRequest)
+ if err != nil || res.Results[0].StatusCode != ua.StatusOK {
+ return fmt.Errorf("failed to create monitored item: %w", err)
+ }
+ }
+ sm.Log.Info("Subscribed successfully")
+ return nil
+}
+
+func (sm *SubscriptionManager) createSelectClauses() []*ua.SimpleAttributeOperand {
+ selects := make([]*ua.SimpleAttributeOperand, len(sm.Fields))
+ for i, name := range sm.Fields {
+ selects[i] = &ua.SimpleAttributeOperand{
+ TypeDefinitionID: ua.NewNumericNodeID(sm.EventType.ID.Namespace(), sm.EventType.ID.IntID()),
+ BrowsePath: []*ua.QualifiedName{{NamespaceIndex: 0, Name: name}},
+ AttributeID: ua.AttributeIDValue,
+ }
+ }
+ return selects
+}
+
+func (sm *SubscriptionManager) createWhereClauses() *ua.ContentFilter {
+ if len(sm.SourceNames) == 0 {
+ return &ua.ContentFilter{
+ Elements: make([]*ua.ContentFilterElement, 0),
+ }
+ }
+ operands := make([]*ua.ExtensionObject, 0)
+ for _, sourceName := range sm.SourceNames {
+ literalOperand := &ua.ExtensionObject{
+ EncodingMask: 1,
+ TypeID: &ua.ExpandedNodeID{
+ NodeID: ua.NewNumericNodeID(0, id.LiteralOperand_Encoding_DefaultBinary),
+ },
+ Value: ua.LiteralOperand{
+ Value: ua.MustVariant(sourceName),
+ },
+ }
+ operands = append(operands, literalOperand)
+ }
+
+ attributeOperand := &ua.ExtensionObject{
+ EncodingMask: ua.ExtensionObjectBinary,
+ TypeID: &ua.ExpandedNodeID{
+ NodeID: ua.NewNumericNodeID(0, id.SimpleAttributeOperand_Encoding_DefaultBinary),
+ },
+ Value: &ua.SimpleAttributeOperand{
+ TypeDefinitionID: ua.NewNumericNodeID(sm.EventType.ID.Namespace(), sm.EventType.ID.IntID()),
+ BrowsePath: []*ua.QualifiedName{
+ {NamespaceIndex: 0, Name: "SourceName"},
+ },
+ AttributeID: ua.AttributeIDValue,
+ },
+ }
+
+ filterElement := &ua.ContentFilterElement{
+ FilterOperator: ua.FilterOperatorInList,
+ FilterOperands: append([]*ua.ExtensionObject{attributeOperand}, operands...),
+ }
+
+ wheres := &ua.ContentFilter{
+ Elements: []*ua.ContentFilterElement{filterElement},
+ }
+
+ return wheres
+}
diff --git a/plugins/parsers/xpath/testcases/openweathermap_5d.xml b/plugins/parsers/xpath/testcases/openweathermap_5d.xml
index 2b7dc83a5b86b..a3fe64be57f96 100644
--- a/plugins/parsers/xpath/testcases/openweathermap_5d.xml
+++ b/plugins/parsers/xpath/testcases/openweathermap_5d.xml
@@ -1,38 +1,38 @@
-
-
-
- London
-
- GB
- 3600
-
-
-
- 2015-06-30T00:00:00Z
-
-
-
-
-
-
-
+
+
+
+ London
+
+ GB
+ 3600
+
+
+
+ 2015-06-30T00:00:00Z
+
+
+
+
+
+
+