diff --git a/go.mod b/go.mod index fa91210beccf1..c1d29d2d59055 100644 --- a/go.mod +++ b/go.mod @@ -159,6 +159,7 @@ require ( github.com/p4lang/p4runtime v1.4.0 github.com/pborman/ansi v1.0.0 github.com/pcolladosoto/goslurm v0.1.0 + github.com/pelletier/go-toml v1.9.5 github.com/peterbourgon/unixtransport v0.0.4 github.com/pion/dtls/v2 v2.2.12 github.com/prometheus-community/pro-bing v0.4.1 diff --git a/go.sum b/go.sum index 58b901287ccef..f49ca46febd30 100644 --- a/go.sum +++ b/go.sum @@ -2071,6 +2071,7 @@ github.com/pcolladosoto/goslurm v0.1.0 h1:d2KigvDfsIIeVeHHj/pTtajz2T0cHHqhGk9iJW github.com/pcolladosoto/goslurm v0.1.0/go.mod h1:eLuBFfN/tj4O/HDMrAJXb+3s3rGhdHQVZFcOUV1Sbbo= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc= +github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pengsrc/go-shared v0.2.1-0.20190131101655-1999055a4a14 h1:XeOYlK9W1uCmhjJSsY78Mcuh7MVkNjTzmHx1yBzizSU= github.com/pengsrc/go-shared v0.2.1-0.20190131101655-1999055a4a14/go.mod h1:jVblp62SafmidSkvWrXyxAme3gaTfEtWwRPGz5cpvHg= diff --git a/plugins/inputs/all/opcua_event_subscription.go b/plugins/inputs/all/opcua_event_subscription.go new file mode 100644 index 0000000000000..2ea3e28721fe9 --- /dev/null +++ b/plugins/inputs/all/opcua_event_subscription.go @@ -0,0 +1,5 @@ +//go:build !custom || inputs || inputs.opcua_event_subscription + +package all + +import _ "github.com/influxdata/telegraf/plugins/inputs/opcua_event_subscription" // register plugin diff --git a/plugins/inputs/opcua_event_subscription/README.md b/plugins/inputs/opcua_event_subscription/README.md new file mode 100644 index 0000000000000..b3f3b22969536 --- /dev/null +++ b/plugins/inputs/opcua_event_subscription/README.md @@ -0,0 +1,159 @@ +# OPC UA Event Monitoring Telegraf Plugin + +This input plugin, `opcua_event_subscription`, enables monitoring of +OPC UA events by subscribing to specific node IDs and filtering events based on +event_type and source_name. The plugin also supports secure OPC UA connections, +allowing the use of client certificates and private keys for encrypted +communication with the server. + +## Features + +- Connects to an OPC UA server to subscribe to a specified event_type. +- Filters events based on source_name and event_type. +- Allows configuration of specific node IDs and fields to monitor for event data. +- Supports secure OPC UA connections, including options for setting SecurityMode (None, Sign, SignAndEncrypt) and SecurityPolicy (None, Basic256Sha256). +- Allows the use of client certificates and private keys for secure communication with the OPC UA server, enabling encrypted connections. + +## Requirements + +- [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/) +- Go environment for building the plugin. +- An accessible OPC UA server with alarms and conditions support. +- [gopcua](https://github.com/gopcua/opcua) Go client library. + +## Installation + +- Place the `opcua_event_subscription` plugin in the Telegraf plugin directory. +- Ensure the `opcua_event_subscription` directory is included in your Go path. +- Build and install the plugin according to Telegraf’s external plugin documentation. + +## Configuration + +```toml @sample.conf +[[inputs.opcua_event_subscription]] + ## OPC UA Server Endpoint + endpoint = "opc.tcp://localhost:4840" + + ## Polling interval + interval = "10s" + + ## Event Type Filter + event_type = "ns=0;i=2041" + + ## Node IDs to subscribe to + node_ids = ["ns=2;s=0:East/Blue"] + + ## Source Name Filter (optional) + source_names = ["SourceName1", "SourceName2"] + + ## Fields to retrieve (optional) + fields = ["Severity", "Message", "Time"] + + ## Security mode and policy (optional) + security_mode = "None" + security_policy = "None" + + ## Client certificate and key (optional) + certificate = "/path/to/cert.pem" + private_key = "/path/to/key.pem" + + ## Connection and Request Timeout (optional) + connection_timeout = "10s" + request_timeout = "10s" +``` + +## Configuration Parameters + +- `endpoint` The OPC UA server’s endpoint URL. +- `interval` Polling interval for data collection, e.g., 10s. +- `node_ids` A list of OPC UA node identifiers (NodeIds) specifying the nodes to monitor for event notifications, which are associated with the defined event type. +- `event_type` Defines the type or level of events to capture from the monitored nodes. +- `source_names` Specifies OPCUA Event source_names to filter on +- `fields` Specifies the fields to capture from event notifications. +- `security_mode` Sets the OPC UA security mode (None, Sign, SignAndEncrypt). +- `security_policy` Specifies the OPC UA security policy (None, Basic256Sha256). +- `certificate` Path to the client certificate (in PEM format) if required. +- `private_key` Path to the private key (in PEM format) if required. +- `connection_timeout` Defines the maximum time the client will wait to establish a connection with the server before considering the attempt failed (default 10s). +- `request_timeout` Specifies the maximum time the client will wait for a response from the server after a request has been sent before timing out (default 10s). + +## Global configuration options + +In addition to the plugin-specific configuration settings, plugins support +additional global and plugin configuration settings. These settings are used to +modify metrics, tags, and field or create aliases and configure ordering, etc. +See the [CONFIGURATION.md][CONFIGURATION.md] for more details. + +[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins + +## Security + +If secure connections are required, set security_mode +and security_policy based on the OPC UA server’s requirements. +Provide paths to certificate and private_key in PEM format. + +## How it works + +Once Telegraf starts with this plugin, it establishes a connection +to the OPC UA server, subscribes to the specified event_type’s Node-ID, +and collects events that meet the defined criteria. +The `node_ids` parameter specifies the nodes to monitor for +events (monitored items). +However, the actual subscription is based on the `event_type`, +which determines the events that are capture. + +## Metrics + +Measurement names are based on the OPC UA fields selected in the +telegraf config. +All the fields are added to the Output `fields`. +All metrics receive the node_id & opcua_host `tags` indicating +the related NodeID and OPCUA Server where the event is coming from. + +## Example Output + +```bash +{ + "fields": { + "EventType": "i=10751", + "Message": "The alarm severity has increased.", + "SourceName": "SouthMotor", + "Time": "2024-12-09 07:46:48.8492578 +0000 UTC" + }, + "name": "opcua_event_subscription", + "tags": { + "host": "myHost", + "node_id": "ns=2;s=0:East/Blue", + "opcua_host": "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer" + }, + "timestamp": 1733730411 +} +``` + +## Troubleshooting + +- Ensure this plugin directory is in Telegraf’s Go path. +- Compile and run Telegraf with this plugin enabled to verify the connection and data collection. +- Check the Telegraf logs for any configuration or connection errors, and troubleshoot accordingly. + +## Development + +For testing purposes, +you can test the plugin using the `opcua_event_subscription_test` file. +The tests will automatically use the +`SampleConfig` defined in the plugin and connect to a +demo OPC UA server to perform subscriptions. +To run the tests, simply execute the following command: + +```batch + go test -v +``` + +## Limitations + +- Does not allow multiple event_types within one subscription. To subscribe to multiple event_types use multiple input plugins within your telegraf.conf. +- Where-Filter is limited to the OPC-UA field source_name. +- This Plugin is only developed for event notifications. Data Change notifications are not supported. +- SamplingInterval is set to 10000.0 // 10 sec. +- QueueSize is set to 10. +- All retrieved fields are forwarded as `fields`, while the opcua_host and the node_id that triggers the event are forwarded as `tags` diff --git a/plugins/inputs/opcua_event_subscription/nodeID_wrapper.go b/plugins/inputs/opcua_event_subscription/nodeID_wrapper.go new file mode 100644 index 0000000000000..dd362b1232f1f --- /dev/null +++ b/plugins/inputs/opcua_event_subscription/nodeID_wrapper.go @@ -0,0 +1,19 @@ +package opcua_event_subscription + +import ( + "fmt" + "github.com/gopcua/opcua/ua" +) + +type NodeIDWrapper struct { + ID *ua.NodeID +} + +func (n *NodeIDWrapper) UnmarshalText(text []byte) error { + nodeID, err := ua.ParseNodeID(string(text)) + if err != nil { + return fmt.Errorf("failed to parse NodeID from text: %w", err) + } + n.ID = nodeID + return nil +} diff --git a/plugins/inputs/opcua_event_subscription/notification_handler.go b/plugins/inputs/opcua_event_subscription/notification_handler.go new file mode 100644 index 0000000000000..ad55a2dbb7ced --- /dev/null +++ b/plugins/inputs/opcua_event_subscription/notification_handler.go @@ -0,0 +1,67 @@ +package opcua_event_subscription + +import ( + "fmt" + "github.com/gopcua/opcua" + "github.com/gopcua/opcua/ua" + "github.com/influxdata/telegraf" + "sync" +) + +type NotificationHandler struct { + Fields []string + Log telegraf.Logger + Endpoint string + ClientHandleToNodeID *sync.Map +} + +func (nh *NotificationHandler) HandleNotification(notification *opcua.PublishNotificationData, acc telegraf.Accumulator) { + switch v := notification.Value.(type) { + case *ua.EventNotificationList: + nh.handleEventNotification(v, acc) + default: + nh.Log.Infof("Received unknown notification type: %T", v) + } +} + +func (nh *NotificationHandler) handleEventNotification(notification *ua.EventNotificationList, acc telegraf.Accumulator) { + for _, event := range notification.Events { + fields := make(map[string]interface{}) + for i, field := range event.EventFields { + fieldName := nh.Fields[i] + value := field.Value() + + if fieldName == "Message" { + if localizedText, ok := value.(*ua.LocalizedText); ok { + fields["Message"] = localizedText.Text + } else { + nh.Log.Warnf("Message field is not of type *ua.LocalizedText: %T", value) + } + continue + } + var stringValue string + switch v := value.(type) { + case string: + stringValue = v + case fmt.Stringer: + stringValue = v.String() + case nil: + stringValue = "null" + default: + stringValue = fmt.Sprintf("%v", v) + } + fields[fieldName] = stringValue + } + + nodeID, ok := nh.ClientHandleToNodeID.Load(event.ClientHandle) + if !ok { + nh.Log.Warnf("NodeId not found for ClientHandle: %d", event.ClientHandle) + nodeID = "unknown" + } + tags := map[string]string{ + "node_id": nodeID.(string), + "opcua_host": nh.Endpoint, + } + acc.AddFields("opcua_event_subscription", fields, tags) + } +} diff --git a/plugins/inputs/opcua_event_subscription/opcua_event_subscription.go b/plugins/inputs/opcua_event_subscription/opcua_event_subscription.go new file mode 100644 index 0000000000000..3701521491f56 --- /dev/null +++ b/plugins/inputs/opcua_event_subscription/opcua_event_subscription.go @@ -0,0 +1,207 @@ +package opcua_event_subscription + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/gopcua/opcua" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + opcuaclient "github.com/influxdata/telegraf/plugins/common/opcua" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type OpcuaEventSubscription struct { + Endpoint string `toml:"endpoint"` + Interval config.Duration `toml:"interval"` + EventType NodeIDWrapper `toml:"event_type"` + NodeIDs []NodeIDWrapper `toml:"node_ids"` + SourceNames []string `toml:"source_names"` + Fields []string `toml:"fields"` + SecurityMode string `toml:"security_mode"` + SecurityPolicy string `toml:"security_policy"` + Certificate string `toml:"certificate"` + PrivateKey string `toml:"private_key"` + ConnectionTimeout config.Duration `toml:"connection_timeout"` + RequestTimeout config.Duration `toml:"request_timeout"` + + Client *opcuaclient.OpcUAClient + SubscriptionManager *SubscriptionManager + NotificationHandler *NotificationHandler + Cancel context.CancelFunc + Log telegraf.Logger + ClientHandleToNodeID sync.Map +} + +func (o *OpcuaEventSubscription) SampleConfig() string { + return ` + ## OPC UA Server Endpoint + endpoint = "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer" + + ## Polling interval + interval = "10s" + + ## Event Type Filter + event_type = "ns=0;i=2041" + + ## Node IDs to subscribe to + node_ids = ["ns=2;s=0:East/Blue"] + + ## Source Name Filter (optional) + source_names = ["SourceName1", "SourceName2"] + + ## Fields to be returned + fields = ["Severity", "Message"] + + ## Security mode and policy (optional) + security_mode = "None" + security_policy = "None" + + ## Client certificate and key (optional) + certificate = "" + private_key = "" + + ## Connection and Request Timeout (optional) + connection_timeout = "10s" + request_timeout = "10s" + ` +} + +func (o *OpcuaEventSubscription) Start() error { + o.Log.Info("******************START******************") + + if o.Endpoint == "" { + return errors.New("missing mandatory field: endpoint") + } + + if o.Interval <= 0 { + return errors.New("missing or invalid mandatory field: interval") + } + + if len(o.NodeIDs) == 0 { + return errors.New("missing mandatory field: node_ids") + } + + if o.EventType.ID == nil { + return errors.New("missing mandatory field: event_type") + } + + if len(o.Fields) == 0 { + return errors.New("missing mandatory field: fields") + } + + if o.ConnectionTimeout == 0 { + o.Log.Debug("ConnectionTimeout not set. Set to default value of 10s") + o.ConnectionTimeout = config.Duration(10 * time.Second) // Default to 10 seconds + } + if o.RequestTimeout == 0 { + o.Log.Debug("RequestTimeout not set. Set to default value of 10s") + o.RequestTimeout = config.Duration(10 * time.Second) // Default to 10 seconds + } + + clientConfig := &opcuaclient.OpcUAClientConfig{ + Endpoint: o.Endpoint, + SecurityPolicy: o.SecurityPolicy, + SecurityMode: o.SecurityMode, + Certificate: o.Certificate, + PrivateKey: o.PrivateKey, + ConnectTimeout: o.ConnectionTimeout, + RequestTimeout: o.RequestTimeout, + } + + client, err := clientConfig.CreateClient(o.Log) + if err != nil { + return fmt.Errorf("failed to create OPC UA client: %w", err) + } + o.Client = client + + err = o.Client.Connect(context.Background()) + if err != nil { + return fmt.Errorf("failed to connect to OPC UA server: %w", err) + } + + o.SubscriptionManager = &SubscriptionManager{ + Client: o.Client.Client, + EventType: o.EventType, + NodeIDs: o.NodeIDs, + Fields: o.Fields, + SourceNames: o.SourceNames, + Log: o.Log, + Interval: time.Duration(o.Interval), + ClientHandleToNodeID: &o.ClientHandleToNodeID, + } + + o.NotificationHandler = &NotificationHandler{ + Fields: o.Fields, + Log: o.Log, + Endpoint: o.Endpoint, + ClientHandleToNodeID: &o.ClientHandleToNodeID, + } + + return nil +} + +func (o *OpcuaEventSubscription) Gather(acc telegraf.Accumulator) error { + if o.Client == nil { + return errors.New("client is not initialized") + } + + if len(o.SubscriptionManager.subscriptions) == 0 { + ctx := context.Background() + notifyCh := make(chan *opcua.PublishNotificationData) + + if err := o.SubscriptionManager.CreateSubscription(ctx, notifyCh); err != nil { + return fmt.Errorf("failed to create subscription: %w", err) + } + + if err := o.SubscriptionManager.Subscribe(ctx); err != nil { + return fmt.Errorf("failed to subscribe: %w", err) + } + + go func() { + for { + select { + case <-ctx.Done(): + o.Log.Warn("Context cancelled, stopping Goroutine") + return + case notification := <-notifyCh: + if notification.Error != nil { + o.Log.Errorf("Notification error: %v", notification.Error) + continue + } + o.NotificationHandler.HandleNotification(notification, acc) + } + } + }() + } + + return nil +} + +func (o *OpcuaEventSubscription) Stop() { + o.Log.Info("******************STOP******************") + if o.Cancel != nil { + o.Cancel() + } + if o.Client != nil { + for _, sub := range o.SubscriptionManager.subscriptions { + err := sub.Cancel(context.Background()) + if err != nil { + o.Log.Errorf("Failed to cancel subscription: %v", err) + } + } + err := o.Client.Disconnect(context.Background()) + if err != nil { + o.Log.Errorf("Failed to disconnect client: %v", err) + } + } +} + +func init() { + inputs.Add("opcua_event_subscription", func() telegraf.Input { + return &OpcuaEventSubscription{} + }) +} diff --git a/plugins/inputs/opcua_event_subscription/opcua_event_subscription_test.go b/plugins/inputs/opcua_event_subscription/opcua_event_subscription_test.go new file mode 100644 index 0000000000000..4db823bbc67a8 --- /dev/null +++ b/plugins/inputs/opcua_event_subscription/opcua_event_subscription_test.go @@ -0,0 +1,105 @@ +package opcua_event_subscription + +import ( + "fmt" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/testutil" + "github.com/pelletier/go-toml" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +type TempConfig struct { + Endpoint string `toml:"endpoint"` + Interval string `toml:"interval"` + EventType string `toml:"event_type"` + NodeIDs []string `toml:"node_ids"` + SourceNames []string `toml:"source_names"` + Fields []string `toml:"fields"` + SecurityMode string `toml:"security_mode"` + SecurityPolicy string `toml:"security_policy"` + Certificate string `toml:"certificate"` + PrivateKey string `toml:"private_key"` +} + +func LoadSampleConfigToPlugin() (*OpcuaEventSubscription, error) { + plugin := &OpcuaEventSubscription{} + sampleConfig := plugin.SampleConfig() + tempConfig := &TempConfig{} + + err := toml.Unmarshal([]byte(sampleConfig), tempConfig) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal sample config: %w", err) + } + + plugin.Endpoint = tempConfig.Endpoint + plugin.Interval = config.Duration(time.Second * 10) // Default to 10s for simplicity + plugin.EventType = NodeIDWrapper{} + if err := plugin.EventType.UnmarshalText([]byte(tempConfig.EventType)); err != nil { + return nil, fmt.Errorf("failed to unmarshal EventType: %w", err) + } + + for _, nodeIDStr := range tempConfig.NodeIDs { + nodeIDWrapper := NodeIDWrapper{} + if err := nodeIDWrapper.UnmarshalText([]byte(nodeIDStr)); err != nil { + return nil, fmt.Errorf("failed to unmarshal NodeID: %w", err) + } + plugin.NodeIDs = append(plugin.NodeIDs, nodeIDWrapper) + } + + plugin.SourceNames = tempConfig.SourceNames + plugin.Fields = tempConfig.Fields + plugin.SecurityMode = tempConfig.SecurityMode + plugin.SecurityPolicy = tempConfig.SecurityPolicy + plugin.Certificate = tempConfig.Certificate + plugin.PrivateKey = tempConfig.PrivateKey + + return plugin, nil +} + +func TestStart(t *testing.T) { + plugin, err := LoadSampleConfigToPlugin() + require.NoError(t, err) + + plugin.Log = testutil.Logger{} + + err = plugin.Start() + require.NoError(t, err) + + require.NotNil(t, plugin.SubscriptionManager) + require.NotNil(t, plugin.NotificationHandler) + + // Clean up after the test + plugin.Stop() +} + +func TestGather(t *testing.T) { + plugin, err := LoadSampleConfigToPlugin() + require.NoError(t, err) + plugin.Log = testutil.Logger{} + + acc := &testutil.Accumulator{} + + err = plugin.Start() + require.NoError(t, err) + + err = plugin.Gather(acc) + require.NoError(t, err) + + plugin.Stop() +} + +func TestStop(t *testing.T) { + plugin, err := LoadSampleConfigToPlugin() + require.NoError(t, err) + plugin.Log = testutil.Logger{} + + err = plugin.Start() + require.NoError(t, err) + + plugin.SubscriptionManager = &SubscriptionManager{} + + plugin.Stop() + require.Nil(t, plugin.Cancel) +} diff --git a/plugins/inputs/opcua_event_subscription/subscription_manager.go b/plugins/inputs/opcua_event_subscription/subscription_manager.go new file mode 100644 index 0000000000000..6a1c80c912106 --- /dev/null +++ b/plugins/inputs/opcua_event_subscription/subscription_manager.go @@ -0,0 +1,146 @@ +package opcua_event_subscription + +import ( + "context" + "errors" + "fmt" + + "github.com/gopcua/opcua" + "github.com/gopcua/opcua/id" + "github.com/gopcua/opcua/ua" + "github.com/influxdata/telegraf" + "sync" + "time" +) + +type SubscriptionManager struct { + Client *opcua.Client + NodeIDs []NodeIDWrapper + Fields []string + EventType NodeIDWrapper + SourceNames []string + NotifyChannels []chan *opcua.PublishNotificationData + subscriptions []*opcua.Subscription + Log telegraf.Logger + Interval time.Duration + ClientHandleToNodeID *sync.Map +} + +func (sm *SubscriptionManager) CreateSubscription(ctx context.Context, notifyCh chan *opcua.PublishNotificationData) error { + if len(sm.subscriptions) == 0 { + if ctx == nil { + return errors.New("context is nil") + } + if notifyCh == nil { + return errors.New("notification channel is nil") + } + sm.NotifyChannels = append(sm.NotifyChannels, notifyCh) + + sub, err := sm.Client.Subscribe(ctx, &opcua.SubscriptionParameters{ + Interval: sm.Interval, + }, notifyCh) + if err != nil { + return fmt.Errorf("failed to create subscription: %w", err) + } + sm.subscriptions = append(sm.subscriptions, sub) + } + return nil +} + +func (sm *SubscriptionManager) Subscribe(ctx context.Context) error { + filter := ua.EventFilter{ + SelectClauses: sm.createSelectClauses(), + WhereClause: sm.createWhereClauses(), + } + + filterExtObj := ua.ExtensionObject{ + EncodingMask: ua.ExtensionObjectBinary, + TypeID: &ua.ExpandedNodeID{ + NodeID: ua.NewNumericNodeID(0, id.EventFilter_Encoding_DefaultBinary), + }, + Value: filter, + } + + for i, nodeID := range sm.NodeIDs { + miCreateRequest := &ua.MonitoredItemCreateRequest{ + ItemToMonitor: &ua.ReadValueID{ + NodeID: nodeID.ID, + AttributeID: ua.AttributeIDEventNotifier, + DataEncoding: &ua.QualifiedName{}, + }, + MonitoringMode: ua.MonitoringModeReporting, + RequestedParameters: &ua.MonitoringParameters{ + ClientHandle: uint32(i), + SamplingInterval: 10000.0, // 10 seconds + QueueSize: 10, + DiscardOldest: true, + Filter: &filterExtObj, + }, + } + sm.ClientHandleToNodeID.Store(uint32(i), nodeID.ID.String()) + res, err := sm.subscriptions[0].Monitor(ctx, ua.TimestampsToReturnBoth, miCreateRequest) + if err != nil || res.Results[0].StatusCode != ua.StatusOK { + return fmt.Errorf("failed to create monitored item: %w", err) + } + } + sm.Log.Info("Subscribed successfully") + return nil +} + +func (sm *SubscriptionManager) createSelectClauses() []*ua.SimpleAttributeOperand { + selects := make([]*ua.SimpleAttributeOperand, len(sm.Fields)) + for i, name := range sm.Fields { + selects[i] = &ua.SimpleAttributeOperand{ + TypeDefinitionID: ua.NewNumericNodeID(sm.EventType.ID.Namespace(), sm.EventType.ID.IntID()), + BrowsePath: []*ua.QualifiedName{{NamespaceIndex: 0, Name: name}}, + AttributeID: ua.AttributeIDValue, + } + } + return selects +} + +func (sm *SubscriptionManager) createWhereClauses() *ua.ContentFilter { + if len(sm.SourceNames) == 0 { + return &ua.ContentFilter{ + Elements: make([]*ua.ContentFilterElement, 0), + } + } + operands := make([]*ua.ExtensionObject, 0) + for _, sourceName := range sm.SourceNames { + literalOperand := &ua.ExtensionObject{ + EncodingMask: 1, + TypeID: &ua.ExpandedNodeID{ + NodeID: ua.NewNumericNodeID(0, id.LiteralOperand_Encoding_DefaultBinary), + }, + Value: ua.LiteralOperand{ + Value: ua.MustVariant(sourceName), + }, + } + operands = append(operands, literalOperand) + } + + attributeOperand := &ua.ExtensionObject{ + EncodingMask: ua.ExtensionObjectBinary, + TypeID: &ua.ExpandedNodeID{ + NodeID: ua.NewNumericNodeID(0, id.SimpleAttributeOperand_Encoding_DefaultBinary), + }, + Value: &ua.SimpleAttributeOperand{ + TypeDefinitionID: ua.NewNumericNodeID(sm.EventType.ID.Namespace(), sm.EventType.ID.IntID()), + BrowsePath: []*ua.QualifiedName{ + {NamespaceIndex: 0, Name: "SourceName"}, + }, + AttributeID: ua.AttributeIDValue, + }, + } + + filterElement := &ua.ContentFilterElement{ + FilterOperator: ua.FilterOperatorInList, + FilterOperands: append([]*ua.ExtensionObject{attributeOperand}, operands...), + } + + wheres := &ua.ContentFilter{ + Elements: []*ua.ContentFilterElement{filterElement}, + } + + return wheres +} diff --git a/plugins/parsers/xpath/testcases/openweathermap_5d.xml b/plugins/parsers/xpath/testcases/openweathermap_5d.xml index 2b7dc83a5b86b..a3fe64be57f96 100644 --- a/plugins/parsers/xpath/testcases/openweathermap_5d.xml +++ b/plugins/parsers/xpath/testcases/openweathermap_5d.xml @@ -1,38 +1,38 @@ - - - - London - - GB - 3600 - - - - 2015-06-30T00:00:00Z - - - - - - - + + + + London + + GB + 3600 + + + + 2015-06-30T00:00:00Z + + + + + + +