Skip to content

Commit 7ab4a69

Browse files
authored
Fix a number of OTel tracing issues (#60)
Motivation: The OTel tracing interceptors had a number of issues. The most important was that spans were ended too early in their lifecycle. Others included not always setting the span status or grpc status code attribute. Modifications: - Start and end spans manually - Replace the hooked async sequence with custom sequnces for server request messages and client response messages. This allows the atomic to be dropped in favour of a simple integer stored on the iterator. - Replace the hooked async writer with a more targeted tracing writer Result: Tracing interceptors are more reliable
1 parent 4d989fe commit 7ab4a69

File tree

7 files changed

+370
-328
lines changed

7 files changed

+370
-328
lines changed

Sources/GRPCOTelTracingInterceptors/HookedAsyncSequence.swift

Lines changed: 0 additions & 81 deletions
This file was deleted.

Sources/GRPCOTelTracingInterceptors/HookedWriter.swift

Lines changed: 0 additions & 41 deletions
This file was deleted.

Sources/GRPCOTelTracingInterceptors/Tracing/ClientOTelTracingInterceptor.swift

Lines changed: 130 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -146,117 +146,59 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor {
146146
) async throws -> StreamingClientResponse<Output>
147147
) async throws -> StreamingClientResponse<Output> where Input: Sendable, Output: Sendable {
148148
var request = request
149-
let serviceContext = ServiceContext.current ?? .topLevel
150-
151-
return try await tracer.withSpan(
152-
context.descriptor.fullyQualifiedMethod,
153-
context: serviceContext,
154-
ofKind: .client
155-
) { span in
156-
span.setOTelClientSpanGRPCAttributes(
157-
context: context,
158-
serverHostname: self.serverHostname,
159-
networkTransportMethod: self.networkTransportMethod
160-
)
149+
let span = tracer.startSpan(context.descriptor.fullyQualifiedMethod, ofKind: .client)
161150

162-
if self.includeRequestMetadata {
163-
span.setMetadataStringAttributesAsRequestSpanAttributes(request.metadata)
164-
}
151+
span.setOTelClientSpanGRPCAttributes(
152+
context: context,
153+
serverHostname: self.serverHostname,
154+
networkTransportMethod: self.networkTransportMethod
155+
)
165156

166-
tracer.inject(
167-
serviceContext,
168-
into: &request.metadata,
169-
using: self.injector
170-
)
157+
if self.includeRequestMetadata {
158+
span.setMetadataStringAttributesAsRequestSpanAttributes(request.metadata)
159+
}
171160

172-
if self.traceEachMessage {
173-
let wrappedProducer = request.producer
174-
request.producer = { writer in
175-
let messageSentCounter = Atomic(1)
176-
let eventEmittingWriter = HookedWriter(
177-
wrapping: writer,
178-
afterEachWrite: {
179-
var event = SpanEvent(name: "rpc.message")
180-
event.attributes[GRPCTracingKeys.rpcMessageType] = "SENT"
181-
event.attributes[GRPCTracingKeys.rpcMessageID] =
182-
messageSentCounter
183-
.wrappingAdd(1, ordering: .sequentiallyConsistent)
184-
.oldValue
185-
span.addEvent(event)
186-
}
187-
)
188-
try await wrappedProducer(RPCWriter(wrapping: eventEmittingWriter))
189-
}
161+
tracer.inject(span.context, into: &request.metadata, using: self.injector)
162+
163+
if self.traceEachMessage {
164+
let originalProducer = request.producer
165+
request.producer = { writer in
166+
let tracingWriter = TracedMessageWriter(wrapping: writer, span: span)
167+
return try await originalProducer(RPCWriter(wrapping: tracingWriter))
190168
}
169+
}
191170

192-
var response = try await next(request, context)
171+
var response: StreamingClientResponse<Output>
193172

194-
if self.includeResponseMetadata {
195-
span.setMetadataStringAttributesAsResponseSpanAttributes(response.metadata)
173+
do {
174+
response = try await ServiceContext.$current.withValue(span.context) {
175+
try await next(request, context)
196176
}
177+
} catch {
178+
span.endRPC(withError: error)
179+
throw error
180+
}
197181

198-
switch response.accepted {
199-
case .success(var success):
200-
let hookedSequence:
201-
HookedRPCAsyncSequence<
202-
RPCAsyncSequence<StreamingClientResponse<Output>.Contents.BodyPart, any Error>
203-
>
204-
if self.traceEachMessage {
205-
let messageReceivedCounter = Atomic(1)
206-
hookedSequence = HookedRPCAsyncSequence(wrapping: success.bodyParts) { part in
207-
switch part {
208-
case .message:
209-
var event = SpanEvent(name: "rpc.message")
210-
event.attributes[GRPCTracingKeys.rpcMessageType] = "RECEIVED"
211-
event.attributes[GRPCTracingKeys.rpcMessageID] =
212-
messageReceivedCounter
213-
.wrappingAdd(1, ordering: .sequentiallyConsistent)
214-
.oldValue
215-
span.addEvent(event)
216-
217-
case .trailingMetadata(let trailingMetadata):
218-
if self.includeResponseMetadata {
219-
span.setMetadataStringAttributesAsResponseSpanAttributes(trailingMetadata)
220-
}
221-
}
222-
} onFinish: { error in
223-
if let error {
224-
if let errorCode = error.grpcErrorCode {
225-
span.attributes[GRPCTracingKeys.grpcStatusCode] = errorCode.rawValue
226-
}
227-
span.setStatus(SpanStatus(code: .error))
228-
span.recordError(error)
229-
} else {
230-
span.attributes[GRPCTracingKeys.grpcStatusCode] = 0
231-
}
232-
}
233-
} else {
234-
hookedSequence = HookedRPCAsyncSequence(wrapping: success.bodyParts) { _ in
235-
// Nothing to do if traceEachMessage is false
236-
} onFinish: { error in
237-
if let error {
238-
if let errorCode = error.grpcErrorCode {
239-
span.attributes[GRPCTracingKeys.grpcStatusCode] = errorCode.rawValue
240-
}
241-
span.setStatus(SpanStatus(code: .error))
242-
span.recordError(error)
243-
} else {
244-
span.attributes[GRPCTracingKeys.grpcStatusCode] = 0
245-
}
246-
}
247-
}
248-
249-
success.bodyParts = RPCAsyncSequence(wrapping: hookedSequence)
250-
response.accepted = .success(success)
182+
if self.includeResponseMetadata {
183+
span.setMetadataStringAttributesAsResponseSpanAttributes(response.metadata)
184+
}
251185

252-
case .failure(let error):
253-
span.attributes[GRPCTracingKeys.grpcStatusCode] = error.code.rawValue
254-
span.setStatus(SpanStatus(code: .error))
255-
span.recordError(error)
256-
}
186+
switch response.accepted {
187+
case .success(var success):
188+
let tracedResponse = TracedClientResponseBodyParts(
189+
wrapping: success.bodyParts,
190+
span: span,
191+
eventPerMessage: self.traceEachMessage,
192+
includeMetadata: self.includeResponseMetadata
193+
)
194+
success.bodyParts = RPCAsyncSequence(wrapping: tracedResponse)
195+
response.accepted = .success(success)
257196

258-
return response
197+
case .failure(let error):
198+
span.endRPC(withError: error)
259199
}
200+
201+
return response
260202
}
261203
}
262204

@@ -272,14 +214,94 @@ struct ClientRequestInjector: Instrumentation.Injector {
272214
}
273215

274216
@available(gRPCSwiftExtras 2.0, *)
275-
extension Error {
276-
var grpcErrorCode: RPCError.Code? {
277-
if let rpcError = self as? RPCError {
278-
return rpcError.code
279-
} else if let rpcError = self as? any RPCErrorConvertible {
280-
return rpcError.rpcErrorCode
281-
} else {
282-
return nil
217+
internal struct TracedClientResponseBodyParts<Output>: AsyncSequence, Sendable
218+
where Output: Sendable {
219+
typealias Base = RPCAsyncSequence<StreamingClientResponse<Output>.Contents.BodyPart, any Error>
220+
typealias Element = Base.Element
221+
222+
private let base: Base
223+
private var span: any Span
224+
private var eventPerMessage: Bool
225+
private var includeMetadata: Bool
226+
227+
init(
228+
wrapping base: Base,
229+
span: any Span,
230+
eventPerMessage: Bool,
231+
includeMetadata: Bool
232+
) {
233+
self.base = base
234+
self.span = span
235+
self.eventPerMessage = eventPerMessage
236+
self.includeMetadata = includeMetadata
237+
}
238+
239+
func makeAsyncIterator() -> AsyncIterator {
240+
AsyncIterator(
241+
wrapping: self.base.makeAsyncIterator(),
242+
span: self.span,
243+
eventPerMessage: self.eventPerMessage,
244+
includeMetadata: self.includeMetadata
245+
)
246+
}
247+
248+
struct AsyncIterator: AsyncIteratorProtocol {
249+
typealias Element = Base.Element
250+
251+
private var wrapped: Base.AsyncIterator
252+
private var span: any Span
253+
private var messageID: Int
254+
private var eventPerMessage: Bool
255+
private var includeMetadata: Bool
256+
257+
init(
258+
wrapping iterator: Base.AsyncIterator,
259+
span: any Span,
260+
eventPerMessage: Bool,
261+
includeMetadata: Bool
262+
) {
263+
self.wrapped = iterator
264+
self.span = span
265+
self.eventPerMessage = eventPerMessage
266+
self.includeMetadata = includeMetadata
267+
self.messageID = 1
268+
}
269+
270+
private mutating func nextMessageID() -> Int {
271+
defer { self.messageID += 1 }
272+
return self.messageID
273+
}
274+
275+
mutating func next(
276+
isolation actor: isolated (any Actor)?
277+
) async throws(any Error) -> Element? {
278+
do {
279+
if let element = try await self.wrapped.next(isolation: actor) {
280+
if self.eventPerMessage {
281+
switch element {
282+
case .message:
283+
self.span.addEvent(.messageReceived(id: self.nextMessageID()))
284+
285+
case .trailingMetadata(let metadata):
286+
if self.includeMetadata {
287+
self.span.setMetadataStringAttributesAsResponseSpanAttributes(metadata)
288+
}
289+
}
290+
}
291+
292+
return element
293+
} else {
294+
self.span.endRPC()
295+
return nil
296+
}
297+
} catch {
298+
self.span.endRPC(withError: error)
299+
throw error
300+
}
301+
}
302+
303+
mutating func next() async throws -> Element? {
304+
try await self.next(isolation: nil)
283305
}
284306
}
285307
}

0 commit comments

Comments
 (0)