Skip to content

Commit 792a204

Browse files
[Tracer] Add Azure Service Bus batching support (#7553)
## Summary of changes Follow-up to #7413 Add support for ServiceBusMessageBatch messages for Service Bus. ## Reason for change Service Bus instrumentation completeness. ## Implementation details ## Test coverage Integration tests. ## Other details <!-- Fixes #{issue} --> <!-- ⚠️ Note: Where possible, please obtain 2 approvals prior to merging. Unless CODEOWNERS specifies otherwise, for external teams it is typically best to have one review from a team member, and one review from apm-dotnet. Trivial changes do not require 2 reviews. MergeQueue is NOT enabled in this repository. If you have write access to the repo, the PR has 1-2 approvals (see above), and all of the required checks have passed, you can use the Squash and Merge button to merge the PR. If you don't have write access, or you need help, reach out in the #apm-dotnet channel in Slack. -->
1 parent aec7e5e commit 792a204

File tree

23 files changed

+1186
-636
lines changed

23 files changed

+1186
-636
lines changed

tracer/build/supported_calltargets.g.json

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3762,6 +3762,31 @@
37623762
"IsAdoNetIntegration": false,
37633763
"InstrumentationCategory": 1
37643764
},
3765+
{
3766+
"IntegrationName": "AzureServiceBus",
3767+
"AssemblyName": "Azure.Messaging.ServiceBus",
3768+
"TargetTypeName": "Azure.Messaging.ServiceBus.ServiceBusSender",
3769+
"TargetMethodName": "SendMessagesAsync",
3770+
"TargetReturnType": "System.Threading.Tasks.Task",
3771+
"TargetParameterTypes": [
3772+
"Azure.Messaging.ServiceBus.ServiceBusMessageBatch",
3773+
"System.Threading.CancellationToken"
3774+
],
3775+
"MinimumVersion": {
3776+
"Item1": 7,
3777+
"Item2": 14,
3778+
"Item3": 0
3779+
},
3780+
"MaximumVersion": {
3781+
"Item1": 7,
3782+
"Item2": 65535,
3783+
"Item3": 65535
3784+
},
3785+
"InstrumentationTypeName": "Datadog.Trace.ClrProfiler.AutoInstrumentation.Azure.ServiceBus.ServiceBusSenderSendMessageBatchAsyncIntegration",
3786+
"IntegrationKind": 0,
3787+
"IsAdoNetIntegration": false,
3788+
"InstrumentationCategory": 1
3789+
},
37653790
{
37663791
"IntegrationName": "AzureServiceBus",
37673792
"AssemblyName": "Azure.Messaging.ServiceBus",

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Azure/ServiceBus/AzureServiceBusCommon.cs

Lines changed: 68 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,59 @@ internal static long GetMessageSize<T>(T message)
6060
return size;
6161
}
6262

63-
internal static CallTargetState CreateSenderSpan<TTarget>(TTarget instance, string operationName = "azure_servicebus.send", IEnumerable? messages = null)
64-
where TTarget : IServiceBusSender, IDuckType
63+
internal static CallTargetState CreateSenderSpan(
64+
IServiceBusSender instance,
65+
string operationName,
66+
IEnumerable? messages = null,
67+
int? messageCount = null,
68+
IEnumerable<SpanLink>? spanLinks = null)
69+
{
70+
var entityPath = instance.EntityPath;
71+
var endpoint = instance.Connection?.ServiceEndpoint;
72+
var networkDestinationName = endpoint?.Host;
73+
// https://learn.microsoft.com/en-us/dotnet/api/system.uri.port#remarks
74+
var networkDestinationPort = endpoint?.Port is null or -1 or 5671 ?
75+
"5671" :
76+
endpoint.Port.ToString();
77+
78+
return CreateSenderSpanInternal(
79+
entityPath,
80+
networkDestinationName,
81+
networkDestinationPort,
82+
operationName,
83+
messages,
84+
messageCount,
85+
spanLinks);
86+
}
87+
88+
internal static CallTargetState CreateSenderSpan(
89+
IMessagingClientDiagnostics clientDiagnostics,
90+
string operationName,
91+
IEnumerable? messages = null,
92+
int? messageCount = null,
93+
IEnumerable<SpanLink>? spanLinks = null)
94+
{
95+
var entityPath = clientDiagnostics.EntityPath;
96+
var networkDestinationName = clientDiagnostics.FullyQualifiedNamespace;
97+
98+
return CreateSenderSpanInternal(
99+
entityPath,
100+
networkDestinationName,
101+
null,
102+
operationName,
103+
messages,
104+
messageCount,
105+
spanLinks);
106+
}
107+
108+
private static CallTargetState CreateSenderSpanInternal(
109+
string? entityPath,
110+
string? networkDestinationName,
111+
string? networkDestinationPort,
112+
string operationName,
113+
IEnumerable? messages,
114+
int? messageCount,
115+
IEnumerable<SpanLink>? spanLinks)
65116
{
66117
var tracer = Tracer.Instance;
67118
if (!tracer.Settings.IsIntegrationEnabled(IntegrationId.AzureServiceBus, false))
@@ -71,31 +122,31 @@ internal static CallTargetState CreateSenderSpan<TTarget>(TTarget instance, stri
71122

72123
var tags = tracer.CurrentTraceSettings.Schema.Messaging.CreateAzureServiceBusTags(SpanKinds.Producer);
73124

74-
var entityPath = instance.EntityPath ?? "unknown";
75125
tags.MessagingDestinationName = entityPath;
76-
tags.MessagingOperation = "send";
126+
tags.MessagingOperation = operationName;
77127
tags.MessagingSystem = "servicebus";
78128
tags.InstrumentationName = "AzureServiceBus";
79129

80130
string serviceName = tracer.CurrentTraceSettings.Schema.Messaging.GetServiceName("azureservicebus");
81131
var scope = tracer.StartActiveInternal(
82-
operationName,
132+
"azure_servicebus." + operationName,
83133
tags: tags,
84-
serviceName: serviceName);
134+
serviceName: serviceName,
135+
links: spanLinks);
85136
var span = scope.Span;
86137

87138
span.Type = SpanTypes.Queue;
88139
span.ResourceName = entityPath;
89140

90-
var messageCount = messages is ICollection collection ? collection.Count : 0;
141+
var actualMessageCount = messageCount ?? (messages is ICollection collection ? collection.Count : 0);
91142
string? singleMessageId = null;
92143

93-
if (messageCount > 1)
144+
if (actualMessageCount > 1)
94145
{
95-
span.SetTag(Tags.MessagingBatchMessageCount, messageCount.ToString());
146+
span.SetTag(Tags.MessagingBatchMessageCount, actualMessageCount.ToString());
96147
}
97148

98-
if (messageCount == 1 && messages != null)
149+
if (actualMessageCount == 1 && messages != null)
99150
{
100151
foreach (var message in messages)
101152
{
@@ -110,14 +161,14 @@ internal static CallTargetState CreateSenderSpan<TTarget>(TTarget instance, stri
110161
}
111162
}
112163

113-
var endpoint = instance.Connection?.ServiceEndpoint;
114-
if (endpoint != null)
164+
if (!string.IsNullOrEmpty(networkDestinationName))
165+
{
166+
tags.NetworkDestinationName = networkDestinationName;
167+
}
168+
169+
if (!string.IsNullOrEmpty(networkDestinationPort))
115170
{
116-
tags.NetworkDestinationName = endpoint.Host;
117-
// https://learn.microsoft.com/en-us/dotnet/api/system.uri.port?view=net-8.0#remarks
118-
tags.NetworkDestinationPort = endpoint.Port is -1 or 5671 ?
119-
"5671" :
120-
endpoint.Port.ToString();
171+
tags.NetworkDestinationPort = networkDestinationPort;
121172
}
122173

123174
return new CallTargetState(scope);
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// <copyright file="IMessagingClientDiagnostics.cs" company="Datadog">
2+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
4+
// </copyright>
5+
6+
#nullable enable
7+
8+
using Datadog.Trace.DuckTyping;
9+
10+
namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Azure.ServiceBus
11+
{
12+
/// <summary>
13+
/// Duck type interface for Azure.Core.Shared.MessagingClientDiagnostics
14+
/// </summary>
15+
internal interface IMessagingClientDiagnostics : IDuckType
16+
{
17+
[DuckField(Name = "_entityPath")]
18+
string? EntityPath { get; }
19+
20+
[DuckField(Name = "_fullyQualifiedNamespace")]
21+
string? FullyQualifiedNamespace { get; }
22+
}
23+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// <copyright file="IServiceBusMessageBatch.cs" company="Datadog">
2+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
4+
// </copyright>
5+
6+
#nullable enable
7+
8+
using Datadog.Trace.DuckTyping;
9+
10+
namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Azure.ServiceBus
11+
{
12+
/// <summary>
13+
/// Duck type interface for Azure.Messaging.ServiceBus.ServiceBusMessageBatch
14+
/// </summary>
15+
internal interface IServiceBusMessageBatch : IDuckType
16+
{
17+
int Count { get; }
18+
19+
[DuckField(Name = "_clientDiagnostics")]
20+
IMessagingClientDiagnostics ClientDiagnostics { get; }
21+
}
22+
}

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Azure/ServiceBus/SendServiceBusMessageBatchIntegration.cs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@
55

66
#nullable enable
77

8+
using System;
89
using System.Collections;
910
using System.ComponentModel;
1011
using System.Threading;
1112
using Datadog.Trace.ClrProfiler.CallTarget;
1213
using Datadog.Trace.Configuration;
1314
using Datadog.Trace.DataStreamsMonitoring;
1415
using Datadog.Trace.DuckTyping;
16+
using Datadog.Trace.Logging;
17+
using Datadog.Trace.Propagators;
1518

1619
namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Azure.ServiceBus
1720
{
@@ -31,6 +34,8 @@ namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Azure.ServiceBus
3134
[EditorBrowsable(EditorBrowsableState.Never)]
3235
public class SendServiceBusMessageBatchIntegration
3336
{
37+
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor(typeof(SendServiceBusMessageBatchIntegration));
38+
3439
/// <summary>
3540
/// OnMethodBegin callback
3641
/// </summary>
@@ -40,8 +45,11 @@ public class SendServiceBusMessageBatchIntegration
4045
/// <param name="message">The message instance</param>
4146
/// <returns>Calltarget state value</returns>
4247
internal static CallTargetState OnMethodBegin<TTarget, TMessage>(TTarget instance, TMessage message)
48+
where TTarget : IServiceBusMessageBatch, IDuckType
4349
where TMessage : IServiceBusMessage
4450
{
51+
Scope? messageScope = null;
52+
4553
if (Tracer.Instance.Settings.IsIntegrationEnabled(IntegrationId.AzureServiceBus)
4654
&& Tracer.Instance.TracerManager.DataStreamsManager.IsEnabled)
4755
{
@@ -51,7 +59,41 @@ internal static CallTargetState OnMethodBegin<TTarget, TMessage>(TTarget instanc
5159
AzureServiceBusCommon.SetMessage(message.ApplicationProperties, message.Instance);
5260
}
5361

54-
return CallTargetState.GetDefault();
62+
// Create TryAdd message spans for batch with links when enabled
63+
if (Tracer.Instance.Settings.IsIntegrationEnabled(IntegrationId.AzureServiceBus, false) &&
64+
Tracer.Instance.Settings.AzureServiceBusBatchLinksEnabled)
65+
{
66+
messageScope = CreateAddMessageSpan(instance, message);
67+
}
68+
69+
return new CallTargetState(messageScope);
70+
}
71+
72+
internal static CallTargetReturn<bool> OnMethodEnd<TTarget>(TTarget instance, bool returnValue, Exception? exception, in CallTargetState state)
73+
{
74+
if (state.Scope != null)
75+
{
76+
if (returnValue && instance != null)
77+
{
78+
ServiceBusBatchSpanContext.AddMessageSpanContext(instance, state.Scope.Span.Context);
79+
}
80+
81+
state.Scope.DisposeWithException(exception);
82+
}
83+
84+
return new CallTargetReturn<bool>(returnValue);
85+
}
86+
87+
private static Scope? CreateAddMessageSpan(IServiceBusMessageBatch batch, IServiceBusMessage message)
88+
{
89+
var messageEnumerable = new[] { message };
90+
var state = AzureServiceBusCommon.CreateSenderSpan(
91+
batch.ClientDiagnostics,
92+
operationName: "create",
93+
messages: messageEnumerable,
94+
messageCount: 1);
95+
96+
return state.Scope;
5597
}
5698
}
5799
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// <copyright file="ServiceBusBatchSpanContext.cs" company="Datadog">
2+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
4+
// </copyright>
5+
6+
#nullable enable
7+
8+
using System.Collections.Concurrent;
9+
using System.Runtime.CompilerServices;
10+
11+
namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Azure.ServiceBus
12+
{
13+
/// <summary>
14+
/// Helper class to manage span contexts for Service Bus message batch operations.
15+
/// Tracks individual message span contexts per batch for span linking purposes.
16+
/// </summary>
17+
internal static class ServiceBusBatchSpanContext
18+
{
19+
// Maps ServiceBusMessageBatch instances to their collection of message span contexts
20+
private static readonly ConditionalWeakTable<object, ConcurrentBag<SpanContext>> BatchToMessageSpanContexts = new();
21+
22+
public static void AddMessageSpanContext(object batch, SpanContext spanContext)
23+
{
24+
if (batch == null || spanContext == null)
25+
{
26+
return;
27+
}
28+
29+
var spanContexts = BatchToMessageSpanContexts.GetValue(batch, _ => new ConcurrentBag<SpanContext>());
30+
spanContexts.Add(spanContext);
31+
}
32+
33+
public static SpanContext[] ExtractMessageSpanContexts(object batch)
34+
{
35+
if (batch == null)
36+
{
37+
return [];
38+
}
39+
40+
if (BatchToMessageSpanContexts.TryGetValue(batch, out var spanContexts))
41+
{
42+
var contexts = spanContexts.ToArray();
43+
BatchToMessageSpanContexts.Remove(batch);
44+
45+
return contexts;
46+
}
47+
48+
return [];
49+
}
50+
}
51+
}

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Azure/ServiceBus/ServiceBusReceiverReceiveMessagesAsyncIntegration.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ internal static CallTargetState OnMethodBegin<TTarget>(TTarget instance, int max
8787
return null;
8888
}
8989

90+
if (!tracer.Settings.AzureServiceBusBatchLinksEnabled)
91+
{
92+
return null;
93+
}
94+
9095
var extractedContexts = new HashSet<SpanContext>(new SpanContextComparer());
9196

9297
try

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Azure/ServiceBus/ServiceBusSenderScheduleMessagesAsyncIntegration.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Azure.ServiceBus;
3030
[EditorBrowsable(EditorBrowsableState.Never)]
3131
public class ServiceBusSenderScheduleMessagesAsyncIntegration
3232
{
33-
private const string OperationName = "azure_servicebus.send";
33+
private const string OperationName = "send";
3434

3535
internal static CallTargetState OnMethodBegin<TTarget>(TTarget instance, ref IEnumerable messages, ref DateTimeOffset scheduledEnqueueTime, ref CancellationToken cancellationToken)
3636
where TTarget : IServiceBusSender, IDuckType

0 commit comments

Comments
 (0)