Skip to content
3 changes: 2 additions & 1 deletion edge-modules/metrics-collector/src/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ public static class Constants
public const string DefaultLogAnalyticsWorkspaceDomainPrefixOds = ".ods.opinsights.";
public const string DefaultLogAnalyticsWorkspaceDomainPrefixOms = ".oms.opinsights.";
public const string ProductInfo = "IoTEdgeMetricsCollectorModule";
public static readonly int MaxMessageSize = 225000;
}
}
}
73 changes: 57 additions & 16 deletions edge-modules/metrics-collector/src/IotHubUpload/IotHubUpload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,14 @@ public async Task<bool> PublishAsync(IEnumerable<Metric> metrics, CancellationTo
{
Preconditions.CheckNotNull(metrics, nameof(metrics));
IEnumerable<ExportMetric> outputMetrics = metrics.Select(m => new ExportMetric(m));
List<IEnumerable<ExportMetric>> splitMetrics = new List<IEnumerable<ExportMetric>> { outputMetrics };
IEnumerable<Message> messagesToSend = BatchAndBuildMessages(splitMetrics);

string outputString = JsonConvert.SerializeObject(outputMetrics);

if (Settings.Current.TransformForIoTCentral)
{
outputString = Transform(outputMetrics);
}

byte[] metricsData = Encoding.UTF8.GetBytes(outputString);
if (Settings.Current.CompressForUpload)
foreach (Message metricsMessage in messagesToSend)
{
metricsData = Compression.CompressToGzip(metricsData);
await this.ModuleClientWrapper.SendMessageAsync("metricOutput", metricsMessage);
}

Message metricsMessage = new Message(metricsData);
metricsMessage.Properties[IdentifierPropertyName] = Constants.IoTUploadMessageIdentifier;

await this.ModuleClientWrapper.SendMessageAsync("metricOutput", metricsMessage);

return true;
}
catch (Exception e)
Expand All @@ -59,6 +48,58 @@ public async Task<bool> PublishAsync(IEnumerable<Metric> metrics, CancellationTo
}
}

private byte[] serializeMetrics(IEnumerable<ExportMetric> outputMetrics)
{
string outputString = JsonConvert.SerializeObject(outputMetrics);

if (Settings.Current.TransformForIoTCentral)
{
outputString = Transform(outputMetrics);
}

byte[] metricsData = Encoding.UTF8.GetBytes(outputString);

if (Settings.Current.CompressForUpload)
{
metricsData = Compression.CompressToGzip(metricsData);
}

return metricsData;
}

private IEnumerable<Message> BatchAndBuildMessages(List<IEnumerable<ExportMetric>> splitMetrics)
{

List<Message> messageList = new List<Message>();
byte[] metricsData = serializeMetrics(splitMetrics.Last());
if (metricsData.Length > Constants.MaxMessageSize)
{
LoggerUtil.Writer.LogInformation($"IoT message is {metricsData.Length} bytes, splitting messages...");
List<IEnumerable<ExportMetric>> splitAgainMetrics = new List<IEnumerable<ExportMetric>>();
foreach (IEnumerable<ExportMetric> metrics in splitMetrics)
{
ExportMetric[] metricsarray = metrics.ToArray();
ExportMetric[] firstArray = metricsarray.Take(metricsarray.Length / 2).ToArray();
splitAgainMetrics.Add(firstArray);
ExportMetric[] secondArray = metricsarray.Skip(metricsarray.Length / 2).ToArray();
splitAgainMetrics.Add(secondArray);
}

return BatchAndBuildMessages(splitAgainMetrics);
}
else
{
foreach (IEnumerable<ExportMetric> metrics in splitMetrics)
{
metricsData = serializeMetrics(metrics);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A list of metrics that is close to but not exceeding the size limit will always be serialized twice, including possibly transforming the data for IoT Central and compressing the data. If the list of metrics exceeds the size limit, then the entire list will be serialized, followed by half the list, etc., until all sub-lists are within the size, then each sublist is serialized again. So we're going from O(n) to O(n log n) performance. This will likely be a noticeable (and possibly unacceptable) performance dip for many users.

Message metricsMessage = new Message(metricsData);
metricsMessage.Properties[IdentifierPropertyName] = Constants.IoTUploadMessageIdentifier;
messageList.Add(metricsMessage);
}
return messageList;
}
}

private string Transform(IEnumerable<ExportMetric> metrics)
{
DateTime timeGeneratedUtc = DateTime.MaxValue;
Expand Down Expand Up @@ -267,4 +308,4 @@ public ExportMetric(Metric baseMetric)
}
}
}
}
}