Skip to content
This repository has been archived by the owner on Jul 29, 2021. It is now read-only.

Commit

Permalink
Merge pull request #98 from hansgschossmann/jsonfix
Browse files Browse the repository at this point in the history
Create syntactically correct Json messages sent to IoTHub, remove credentials from logfiles
  • Loading branch information
hansgschossmann authored Nov 21, 2017
2 parents dfe4670 + 54dd4b1 commit 7cb875f
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 40 deletions.
57 changes: 22 additions & 35 deletions src/IotHubMessaging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public async Task<bool> InitAsync()
}
else
{
Trace($"Attempting to register ourselves with IoT Hub using owner connection string: {_iotHubOwnerConnectionString}");
Trace($"Attempting to register ourselves with IoT Hub using owner connection string.");
RegistryManager manager = RegistryManager.CreateFromConnectionString(_iotHubOwnerConnectionString);

// remove any existing device
Expand All @@ -154,13 +154,13 @@ public async Task<bool> InitAsync()
{
string hostname = _iotHubOwnerConnectionString.Substring(0, _iotHubOwnerConnectionString.IndexOf(";"));
deviceConnectionString = hostname + ";DeviceId=" + ApplicationName + ";SharedAccessKey=" + newDevice.Authentication.SymmetricKey.PrimaryKey;
Trace($"Device connection string is: {deviceConnectionString}");
Trace($"Generated device connection string.");
Trace($"Adding it to device cert store.");
await SecureIoTHubToken.WriteAsync(ApplicationName, deviceConnectionString, IotDeviceCertStoreType, IotDeviceCertStorePath);
}
else
{
Trace($"Could not register ourselves with IoT Hub using owner connection string: {_iotHubOwnerConnectionString}");
Trace($"Could not register ourselves with IoT Hub using owner connection string.");
Trace("exiting...");
return false;
}
Expand All @@ -171,7 +171,7 @@ public async Task<bool> InitAsync()
deviceConnectionString = await SecureIoTHubToken.ReadAsync(ApplicationName, IotDeviceCertStoreType, IotDeviceCertStorePath);
if (!string.IsNullOrEmpty(deviceConnectionString))
{
Trace($"Create Publisher IoTHub client with device connection string: '{deviceConnectionString}' using '{IotHubProtocol}' for communication.");
Trace($"Create Publisher IoTHub client with device connection string using '{IotHubProtocol}' for communication.");
_iotHubClient = DeviceClient.CreateFromConnectionString(deviceConnectionString, IotHubProtocol);
ExponentialBackoff exponentialRetryPolicy = new ExponentialBackoff(int.MaxValue, TimeSpan.FromMilliseconds(2), TimeSpan.FromMilliseconds(1024), TimeSpan.FromMilliseconds(3));
_iotHubClient.SetRetryPolicy(exponentialRetryPolicy);
Expand Down Expand Up @@ -270,16 +270,12 @@ public void Enqueue(string json)
/// </summary>
private async Task MonitoredItemsProcessor(CancellationToken ct)
{
string contentPropertyKey = "content-type";
string contentPropertyValue = "application/opcua+uajson";
string devicenamePropertyKey = "devicename";
string devicenamePropertyValue = ApplicationName;
int userPropertyLength = contentPropertyKey.Length + contentPropertyValue.Length + devicenamePropertyKey.Length + devicenamePropertyValue.Length;
uint jsonSquareBracketLength = 2;
Microsoft.Azure.Devices.Client.Message tempMsg = new Microsoft.Azure.Devices.Client.Message();
// the system properties are MessageId (max 128 byte), Sequence number (ulong), ExpiryTime (DateTime) and more. ideally we get that from the client.
int systemPropertyLength = 128 + sizeof(ulong) + tempMsg.ExpiryTimeUtc.ToString().Length;
// if batching is requested the buffer will have the requested size, otherwise we reserve the max size
uint iotHubMessageBufferSize = (_iotHubMessageSize > 0 ? _iotHubMessageSize : IotHubMessageSizeMax) - (uint)userPropertyLength - (uint)systemPropertyLength;
uint iotHubMessageBufferSize = (_iotHubMessageSize > 0 ? _iotHubMessageSize : IotHubMessageSizeMax) - (uint)systemPropertyLength - (uint)jsonSquareBracketLength;
byte[] iotHubMessageBuffer = new byte[iotHubMessageBufferSize];
MemoryStream iotHubMessage = new MemoryStream(iotHubMessageBuffer);
DateTime nextSendTime = DateTime.UtcNow + TimeSpan.FromSeconds(_defaultSendIntervalSeconds);
Expand All @@ -295,6 +291,7 @@ private async Task MonitoredItemsProcessor(CancellationToken ct)

iotHubMessage.Position = 0;
iotHubMessage.SetLength(0);
iotHubMessage.Write(Encoding.UTF8.GetBytes("["), 0, 1);
while (true)
{
// sanity check the send interval, compute the timeout and get the next monitored item message
Expand Down Expand Up @@ -327,7 +324,7 @@ private async Task MonitoredItemsProcessor(CancellationToken ct)
// sanity check that the user has set a large enough IoTHub messages size
if ((_iotHubMessageSize > 0 && jsonMessageSize > _iotHubMessageSize) || (_iotHubMessageSize == 0 && jsonMessageSize > iotHubMessageBufferSize))
{
Trace(Utils.TraceMasks.Error, $"There is a telemetry message (size: {jsonMessageSize}), which will not fit into an IoTHub message (max size: {_iotHubMessageSize}].");
Trace(Utils.TraceMasks.Error, $"There is a telemetry message (size: {jsonMessageSize}), which will not fit into an IoTHub message (max size: {iotHubMessageBufferSize}].");
Trace(Utils.TraceMasks.Error, $"Please check your IoTHub message size settings. The telemetry message will be discarded silently. Sorry:(");
_tooLargeCount++;
continue;
Expand All @@ -338,10 +335,12 @@ private async Task MonitoredItemsProcessor(CancellationToken ct)
if (_iotHubMessageSize > 0 || (_iotHubMessageSize == 0 && _defaultSendIntervalSeconds > 0))
{
// if there is still space to batch, do it. otherwise send the buffer and flag the message for later buffering
if (iotHubMessage.Position + jsonMessageSize <= iotHubMessage.Capacity)
if (iotHubMessage.Position + jsonMessageSize + 1 <= iotHubMessage.Capacity)
{
// add the message and a comma to the buffer
iotHubMessage.Write(Encoding.UTF8.GetBytes(jsonMessage.ToString()), 0, jsonMessageSize);
Trace(Utils.TraceMasks.OperationDetail, $"Added new message with size {jsonMessageSize} to IoTHub message (size is now {iotHubMessage.Position}).");
iotHubMessage.Write(Encoding.UTF8.GetBytes(","), 0, 1);
Trace(Utils.TraceMasks.OperationDetail, $"Added new message with size {jsonMessageSize} to IoTHub message (size is now {(iotHubMessage.Position - 1)}).");
continue;
}
else
Expand All @@ -368,25 +367,28 @@ private async Task MonitoredItemsProcessor(CancellationToken ct)
Microsoft.Azure.Devices.Client.Message encodedIotHubMessage = null;

// if we reached the send interval, but have nothing to send, we continue
if (!gotItem && iotHubMessage.Position == 0)
if (!gotItem && iotHubMessage.Position == 1)
{
nextSendTime += TimeSpan.FromSeconds(_defaultSendIntervalSeconds);
iotHubMessage.Position = 0;
iotHubMessage.SetLength(0);
iotHubMessage.Write(Encoding.UTF8.GetBytes("["), 0, 1);
continue;
}

// if there is no batching and not interval configured, we send the JSON message we just got, otherwise we send the buffer
if (_iotHubMessageSize == 0 && _defaultSendIntervalSeconds == 0)
{
encodedIotHubMessage = new Microsoft.Azure.Devices.Client.Message(Encoding.UTF8.GetBytes(jsonMessage.ToString()));
// we use also an array for a single message to make backend processing more consistent
encodedIotHubMessage = new Microsoft.Azure.Devices.Client.Message(Encoding.UTF8.GetBytes("[" + jsonMessage.ToString() + "]"));
}
else
{
// remove the trailing comma and add a closing square bracket
iotHubMessage.SetLength(iotHubMessage.Length - 1);
iotHubMessage.Write(Encoding.UTF8.GetBytes("]"), 0, 1);
encodedIotHubMessage = new Microsoft.Azure.Devices.Client.Message(iotHubMessage.ToArray());
}
encodedIotHubMessage.Properties.Add(contentPropertyKey, contentPropertyValue);
encodedIotHubMessage.Properties.Add(devicenamePropertyKey, devicenamePropertyValue);
if (_iotHubClient != null)
{
nextSendTime += TimeSpan.FromSeconds(_defaultSendIntervalSeconds);
Expand All @@ -406,11 +408,14 @@ private async Task MonitoredItemsProcessor(CancellationToken ct)
// reset the messaage
iotHubMessage.Position = 0;
iotHubMessage.SetLength(0);
iotHubMessage.Write(Encoding.UTF8.GetBytes("["), 0, 1);

// if we had not yet buffered the last message because there was not enough space, buffer it now
if (needToBufferMessage)
{
// add the message and a comma to the buffer
iotHubMessage.Write(Encoding.UTF8.GetBytes(jsonMessage.ToString()), 0, jsonMessageSize);
iotHubMessage.Write(Encoding.UTF8.GetBytes(","), 0, 1);
}
}
else
Expand Down Expand Up @@ -459,23 +464,5 @@ private async Task MonitoredItemsProcessor(CancellationToken ct)
private static CancellationTokenSource _tokenSource;
private static Task _monitoredItemsProcessorTask;
private static DeviceClient _iotHubClient;

/// <summary>
/// Classes for the telemetry message sent to IoTHub.
/// </summary>
private class OpcUaMessage
{
public string ApplicationUri;
public string DisplayName;
public string NodeId;
public OpcUaValue Value;
}

private class OpcUaValue
{
public string Value;
public string SourceTimestamp;
}

}
}
5 changes: 4 additions & 1 deletion src/OpcSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public enum OpcMonitoredItemConfigurationType
public MonitoredItem OpcUaClientMonitoredItem;
public NodeId ConfigNodeId;
public ExpandedNodeId ConfigExpandedNodeId;
public ExpandedNodeId ConfigExpandedNodeIdOriginal;
public OpcMonitoredItemConfigurationType ConfigType;

/// <summary>
Expand All @@ -56,6 +57,7 @@ public OpcMonitoredItem(NodeId nodeId, Uri sessionEndpointUri, bool requestNames
{
ConfigNodeId = nodeId;
ConfigExpandedNodeId = null;
ConfigExpandedNodeIdOriginal = null;
ConfigType = OpcMonitoredItemConfigurationType.NodeId;
Init(sessionEndpointUri);
if (requestNamespaceUpdate)
Expand All @@ -71,6 +73,7 @@ public OpcMonitoredItem(ExpandedNodeId expandedNodeId, Uri sessionEndpointUri, b
{
ConfigNodeId = null;
ConfigExpandedNodeId = expandedNodeId;
ConfigExpandedNodeIdOriginal = expandedNodeId;
ConfigType = OpcMonitoredItemConfigurationType.ExpandedNodeId;
Init(sessionEndpointUri);
if (requestNamespaceUpdate)
Expand Down Expand Up @@ -166,7 +169,7 @@ public void MonitoredItem_Notification(MonitoredItem monitoredItem, MonitoredIte
encoder.WriteString("DisplayName", monitoredItem.DisplayName);

// use the node Id as configured, to also have the namespace URI in case of a ExpandedNodeId.
encoder.WriteString("NodeId", ConfigType == OpcMonitoredItemConfigurationType.NodeId ? ConfigNodeId.ToString() : ConfigExpandedNodeId.ToString());
encoder.WriteString("NodeId", ConfigType == OpcMonitoredItemConfigurationType.NodeId ? ConfigNodeId.ToString() : ConfigExpandedNodeIdOriginal.ToString());

// suppress output of server timestamp in json by setting it to minvalue
value.ServerTimestamp = DateTime.MinValue;
Expand Down
8 changes: 6 additions & 2 deletions src/OpcStackConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,16 @@ public static int OpcKeepAliveIntervalInSec
set => _opcKeepAliveIntervalInSec = value;
}

public const int OpcSamplingIntervalDefault = 1000;

public static int OpcSamplingInterval
{
get => _opcSamplingInterval;
set => _opcSamplingInterval = value;
}

public const int OpcPublishingIntervalDefault = 0;

public static int OpcPublishingInterval
{
get => _opcPublishingInterval;
Expand Down Expand Up @@ -408,8 +412,8 @@ public async Task ConfigureAsync()
private static uint _opcSessionCreationBackoffMax = 5;
private static uint _opcKeepAliveDisconnectThreshold = 5;
private static int _opcKeepAliveIntervalInSec = 2;
private static int _opcSamplingInterval = 1000;
private static int _opcPublishingInterval = 0;
private static int _opcSamplingInterval = OpcSamplingIntervalDefault;
private static int _opcPublishingInterval = OpcPublishingIntervalDefault;
private static string _publisherServerSecurityPolicy = SecurityPolicies.Basic128Rsa15;
private static string _opcOwnCertStoreType = X509Store;
private static string _opcOwnCertStorePath = OpcOwnCertX509StorePathDefault;
Expand Down
8 changes: 6 additions & 2 deletions src/PublisherNodeConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace OpcPublisher
{
using System.ComponentModel;
using System.IO;
using System.Linq;
using System.Threading;
Expand Down Expand Up @@ -384,9 +385,12 @@ public static async Task UpdateNodeConfigurationFile()
public class OpcNodeOnEndpointUrl
{
public string ExpandedNodeId;
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[DefaultValue(OpcSamplingIntervalDefault)]
[JsonProperty(DefaultValueHandling = DefaultValueHandling.IgnoreAndPopulate, NullValueHandling = NullValueHandling.Ignore)]
public int? OpcSamplingInterval;
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]

[DefaultValue(OpcPublishingIntervalDefault)]
[JsonProperty(DefaultValueHandling = DefaultValueHandling.IgnoreAndPopulate, NullValueHandling = NullValueHandling.Ignore)]
public int? OpcPublishingInterval;
}

Expand Down

0 comments on commit 7cb875f

Please sign in to comment.