Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(opcua.event.subscription): New Plugin to Subscribe to OPCUA Events #16300

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f60b48f
feat: input plugin to retreive opcua events from opcua server
frmoschner Dec 8, 2024
c709cef
feat: input plugin to retreive opcua events from opcua server
frmoschner Dec 8, 2024
ec3de77
feat: input plugin to retreive opcua events from opcua server
frmoschner Dec 9, 2024
5ee64d3
go mod tidy
frmoschner Dec 9, 2024
54e3454
go mod tidy
frmoschner Dec 9, 2024
d74794a
Fix formatting issues with gofmt
frmoschner Dec 9, 2024
4074db7
added example output to readme
frmoschner Dec 9, 2024
101f4a9
added metrics to readme
frmoschner Dec 9, 2024
8eae116
adjusted readme for global config options
frmoschner Dec 9, 2024
07bf99a
adjusted client
frmoschner Dec 11, 2024
d1fe9ac
Merge branch 'feature/opcua-event-plugin'
frmoschner Dec 11, 2024
48a315b
added plugin to registry
frmoschner Dec 11, 2024
690499b
Merge branch 'influxdata:master' into master
frmoschner Dec 12, 2024
72a5718
removed client_manager and used the common client
frmoschner Dec 12, 2024
04b35d9
resolved make checks
frmoschner Dec 12, 2024
001605d
linted readme
frmoschner Dec 12, 2024
076d951
adjusted line lenght in readme
frmoschner Dec 12, 2024
bd8b3f1
more linting adjustments
frmoschner Dec 12, 2024
27818d6
adjusted long line problems
frmoschner Dec 12, 2024
ca993dd
try to satisfy the linter
frmoschner Dec 12, 2024
083d454
try to satisfy the linter 2
frmoschner Dec 12, 2024
d26ed8c
try to satisfy the linter 3
frmoschner Dec 12, 2024
be6334b
try to satisfy the linter 4
frmoschner Dec 12, 2024
0ddfc67
try to satisfy the linter 5
frmoschner Dec 12, 2024
893353d
removed no trainling spaces
frmoschner Dec 12, 2024
8d944f2
ajsuted linting problems
frmoschner Dec 12, 2024
33e8055
ajsuted linting
frmoschner Dec 12, 2024
6b80b52
ajsuted linting
frmoschner Dec 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ require (
github.com/p4lang/p4runtime v1.4.0
github.com/pborman/ansi v1.0.0
github.com/pcolladosoto/goslurm v0.1.0
github.com/pelletier/go-toml v1.9.5
github.com/peterbourgon/unixtransport v0.0.4
github.com/pion/dtls/v2 v2.2.12
github.com/prometheus-community/pro-bing v0.4.1
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2071,6 +2071,7 @@ github.com/pcolladosoto/goslurm v0.1.0 h1:d2KigvDfsIIeVeHHj/pTtajz2T0cHHqhGk9iJW
github.com/pcolladosoto/goslurm v0.1.0/go.mod h1:eLuBFfN/tj4O/HDMrAJXb+3s3rGhdHQVZFcOUV1Sbbo=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pengsrc/go-shared v0.2.1-0.20190131101655-1999055a4a14 h1:XeOYlK9W1uCmhjJSsY78Mcuh7MVkNjTzmHx1yBzizSU=
github.com/pengsrc/go-shared v0.2.1-0.20190131101655-1999055a4a14/go.mod h1:jVblp62SafmidSkvWrXyxAme3gaTfEtWwRPGz5cpvHg=
Expand Down
5 changes: 5 additions & 0 deletions plugins/inputs/all/opcua_event_subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !custom || inputs || inputs.opcua_event_subscription

package all

import _ "github.com/influxdata/telegraf/plugins/inputs/opcua_event_subscription" // register plugin
159 changes: 159 additions & 0 deletions plugins/inputs/opcua_event_subscription/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# OPC UA Event Monitoring Telegraf Plugin

This input plugin, `opcua_event_subscription`, enables monitoring of
OPC UA events by subscribing to specific node IDs and filtering events based on
event_type and source_name. The plugin also supports secure OPC UA connections,
allowing the use of client certificates and private keys for encrypted
communication with the server.

## Features

- Connects to an OPC UA server to subscribe to a specified event_type.
- Filters events based on source_name and event_type.
- Allows configuration of specific node IDs and fields to monitor for event data.
- Supports secure OPC UA connections, including options for setting SecurityMode (None, Sign, SignAndEncrypt) and SecurityPolicy (None, Basic256Sha256).
- Allows the use of client certificates and private keys for secure communication with the OPC UA server, enabling encrypted connections.

## Requirements

- [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/)
- Go environment for building the plugin.
- An accessible OPC UA server with alarms and conditions support.
- [gopcua](https://github.com/gopcua/opcua) Go client library.

## Installation

- Place the `opcua_event_subscription` plugin in the Telegraf plugin directory.
- Ensure the `opcua_event_subscription` directory is included in your Go path.
- Build and install the plugin according to Telegraf’s external plugin documentation.

## Configuration

```toml @sample.conf
[[inputs.opcua_event_subscription]]
## OPC UA Server Endpoint
endpoint = "opc.tcp://localhost:4840"

## Polling interval
interval = "10s"

## Event Type Filter
event_type = "ns=0;i=2041"

## Node IDs to subscribe to
node_ids = ["ns=2;s=0:East/Blue"]

## Source Name Filter (optional)
source_names = ["SourceName1", "SourceName2"]

## Fields to retrieve (optional)
fields = ["Severity", "Message", "Time"]

## Security mode and policy (optional)
security_mode = "None"
security_policy = "None"

## Client certificate and key (optional)
certificate = "/path/to/cert.pem"
private_key = "/path/to/key.pem"

## Connection and Request Timeout (optional)
connection_timeout = "10s"
request_timeout = "10s"
```

## Configuration Parameters

- `endpoint` The OPC UA server’s endpoint URL.
- `interval` Polling interval for data collection, e.g., 10s.
- `node_ids` A list of OPC UA node identifiers (NodeIds) specifying the nodes to monitor for event notifications, which are associated with the defined event type.
- `event_type` Defines the type or level of events to capture from the monitored nodes.
- `source_names` Specifies OPCUA Event source_names to filter on
- `fields` Specifies the fields to capture from event notifications.
- `security_mode` Sets the OPC UA security mode (None, Sign, SignAndEncrypt).
- `security_policy` Specifies the OPC UA security policy (None, Basic256Sha256).
- `certificate` Path to the client certificate (in PEM format) if required.
- `private_key` Path to the private key (in PEM format) if required.
- `connection_timeout` Defines the maximum time the client will wait to establish a connection with the server before considering the attempt failed (default 10s).
- `request_timeout` Specifies the maximum time the client will wait for a response from the server after a request has been sent before timing out (default 10s).

## Global configuration options <!-- @/docs/includes/plugin_config.md -->

In addition to the plugin-specific configuration settings, plugins support
additional global and plugin configuration settings. These settings are used to
modify metrics, tags, and field or create aliases and configure ordering, etc.
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins

## Security

If secure connections are required, set security_mode
and security_policy based on the OPC UA server’s requirements.
Provide paths to certificate and private_key in PEM format.

## How it works

Once Telegraf starts with this plugin, it establishes a connection
to the OPC UA server, subscribes to the specified event_type’s Node-ID,
and collects events that meet the defined criteria.
The `node_ids` parameter specifies the nodes to monitor for
events (monitored items).
However, the actual subscription is based on the `event_type`,
which determines the events that are capture.

## Metrics

Measurement names are based on the OPC UA fields selected in the
telegraf config.
All the fields are added to the Output `fields`.
All metrics receive the node_id & opcua_host `tags` indicating
the related NodeID and OPCUA Server where the event is coming from.

## Example Output

```bash
{
"fields": {
"EventType": "i=10751",
"Message": "The alarm severity has increased.",
"SourceName": "SouthMotor",
"Time": "2024-12-09 07:46:48.8492578 +0000 UTC"
},
"name": "opcua_event_subscription",
"tags": {
"host": "myHost",
"node_id": "ns=2;s=0:East/Blue",
"opcua_host": "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer"
},
"timestamp": 1733730411
}
```

## Troubleshooting

- Ensure this plugin directory is in Telegraf’s Go path.
- Compile and run Telegraf with this plugin enabled to verify the connection and data collection.
- Check the Telegraf logs for any configuration or connection errors, and troubleshoot accordingly.

## Development

For testing purposes,
you can test the plugin using the `opcua_event_subscription_test` file.
The tests will automatically use the
`SampleConfig` defined in the plugin and connect to a
demo OPC UA server to perform subscriptions.
To run the tests, simply execute the following command:

```batch
go test -v
```

## Limitations

- Does not allow multiple event_types within one subscription. To subscribe to multiple event_types use multiple input plugins within your telegraf.conf.
- Where-Filter is limited to the OPC-UA field source_name.
- This Plugin is only developed for event notifications. Data Change notifications are not supported.
- SamplingInterval is set to 10000.0 // 10 sec.
- QueueSize is set to 10.
- All retrieved fields are forwarded as `fields`, while the opcua_host and the node_id that triggers the event are forwarded as `tags`
19 changes: 19 additions & 0 deletions plugins/inputs/opcua_event_subscription/nodeID_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package opcua_event_subscription

import (
"fmt"
"github.com/gopcua/opcua/ua"
)

type NodeIDWrapper struct {
ID *ua.NodeID
}

func (n *NodeIDWrapper) UnmarshalText(text []byte) error {
nodeID, err := ua.ParseNodeID(string(text))
if err != nil {
return fmt.Errorf("failed to parse NodeID from text: %w", err)
}
n.ID = nodeID
return nil
}
67 changes: 67 additions & 0 deletions plugins/inputs/opcua_event_subscription/notification_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package opcua_event_subscription

import (
"fmt"
"github.com/gopcua/opcua"
"github.com/gopcua/opcua/ua"
"github.com/influxdata/telegraf"
"sync"
)

type NotificationHandler struct {
Fields []string
Log telegraf.Logger
Endpoint string
ClientHandleToNodeID *sync.Map
}

func (nh *NotificationHandler) HandleNotification(notification *opcua.PublishNotificationData, acc telegraf.Accumulator) {
switch v := notification.Value.(type) {
case *ua.EventNotificationList:
nh.handleEventNotification(v, acc)
default:
nh.Log.Infof("Received unknown notification type: %T", v)
}
}

func (nh *NotificationHandler) handleEventNotification(notification *ua.EventNotificationList, acc telegraf.Accumulator) {
for _, event := range notification.Events {
fields := make(map[string]interface{})
for i, field := range event.EventFields {
fieldName := nh.Fields[i]
value := field.Value()

if fieldName == "Message" {
if localizedText, ok := value.(*ua.LocalizedText); ok {
fields["Message"] = localizedText.Text
} else {
nh.Log.Warnf("Message field is not of type *ua.LocalizedText: %T", value)
}
continue
}
var stringValue string
switch v := value.(type) {
case string:
stringValue = v
case fmt.Stringer:
stringValue = v.String()
case nil:
stringValue = "null"
default:
stringValue = fmt.Sprintf("%v", v)
}
fields[fieldName] = stringValue
}

nodeID, ok := nh.ClientHandleToNodeID.Load(event.ClientHandle)
if !ok {
nh.Log.Warnf("NodeId not found for ClientHandle: %d", event.ClientHandle)
nodeID = "unknown"
}
tags := map[string]string{
"node_id": nodeID.(string),
"opcua_host": nh.Endpoint,
}
acc.AddFields("opcua_event_subscription", fields, tags)
}
}
Loading
Loading