11use crate :: {
22 pubsub:: { In , JsonSink , Listener , Out } ,
33 types:: InboundData ,
4- HandlerCtx , TaskSet ,
4+ HandlerCtx , TaskSet , TracingInfo ,
55} ;
66use core:: fmt;
77use serde_json:: value:: RawValue ;
8- use std:: sync:: { atomic:: AtomicU64 , Arc } ;
8+ use std:: sync:: {
9+ atomic:: { AtomicU32 , AtomicU64 , Ordering } ,
10+ Arc ,
11+ } ;
912use tokio:: { pin, runtime:: Handle , select, sync:: mpsc, task:: JoinHandle } ;
1013use tokio_stream:: StreamExt ;
1114use tokio_util:: sync:: WaitForCancellationFutureOwned ;
@@ -105,8 +108,7 @@ impl ConnectionManager {
105108
106109 /// Increment the connection ID counter and return an unused ID.
107110 fn next_id ( & self ) -> ConnectionId {
108- self . next_id
109- . fetch_add ( 1 , std:: sync:: atomic:: Ordering :: Relaxed )
111+ self . next_id . fetch_add ( 1 , Ordering :: Relaxed )
110112 }
111113
112114 /// Get a clone of the router.
@@ -131,13 +133,15 @@ impl ConnectionManager {
131133 write_task : tx,
132134 requests,
133135 tasks : tasks. clone ( ) ,
136+ rx_msg_id : Arc :: new ( AtomicU32 :: new ( 0 ) ) ,
134137 } ;
135138
136139 let wt = WriteTask {
137140 tasks,
138141 conn_id,
139- json : rx,
142+ items : rx,
140143 connection,
144+ tx_msg_id : Arc :: new ( AtomicU32 :: new ( 0 ) ) ,
141145 } ;
142146
143147 ( rt, wt)
@@ -168,11 +172,14 @@ struct RouteTask<T: crate::pubsub::Listener> {
168172 /// Connection ID for the connection serviced by this task.
169173 pub ( crate ) conn_id : ConnectionId ,
170174 /// Sender to the write task.
171- pub ( crate ) write_task : mpsc:: Sender < Box < RawValue > > ,
175+ pub ( crate ) write_task : mpsc:: Sender < WriteItem > ,
172176 /// Stream of requests.
173177 pub ( crate ) requests : In < T > ,
174178 /// The task set for this connection
175179 pub ( crate ) tasks : TaskSet ,
180+
181+ /// Counter for OTEL messages received.
182+ pub ( crate ) rx_msg_id : Arc < AtomicU32 > ,
176183}
177184
178185impl < T : crate :: pubsub:: Listener > fmt:: Debug for RouteTask < T > {
@@ -199,6 +206,7 @@ where
199206 mut requests,
200207 write_task,
201208 tasks,
209+ rx_msg_id,
202210 ..
203211 } = self ;
204212
@@ -224,6 +232,8 @@ where
224232 break ;
225233 } ;
226234
235+ let item_bytes = item. len( ) ;
236+
227237 // If the inbound data is not currently parsable, we
228238 // send an empty one it to the router, as the router
229239 // enforces the specification.
@@ -234,16 +244,38 @@ where
234244 // if the client stops accepting responses, we do not keep
235245 // handling inbound requests.
236246 let Ok ( permit) = write_task. clone( ) . reserve_owned( ) . await else {
237- tracing :: error!( "write task dropped while waiting for permit" ) ;
247+ error!( "write task dropped while waiting for permit" ) ;
238248 break ;
239249 } ;
240250
251+ let tracing = TracingInfo { service: router. service_name( ) , request_span: debug_span!(
252+ parent: None ,
253+ "ajj.pubsub.RouteTask::call" ,
254+ "otel.kind" = "server" ,
255+ "rpc.system" = "jsonrpc" ,
256+ "rpc.jsonrpc.version" = "2.0" ,
257+ "rpc.service" = router. service_name( ) ,
258+ notifications_enabled = true ,
259+ params = tracing:: field:: Empty
260+ ) } ;
261+
241262 let ctx =
242263 HandlerCtx :: new(
243264 Some ( write_task. clone( ) ) ,
244265 children. clone( ) ,
266+ tracing,
245267 ) ;
246268
269+ let span = ctx. span( ) . clone( ) ;
270+ span. in_scope( || {
271+ debug!(
272+ "rpc.message.id" = rx_msg_id. fetch_add( 1 , Ordering :: Relaxed ) ,
273+ "rpc.message.type" = "received" ,
274+ "rpc.message.uncompressed_size" = item_bytes,
275+ "Received request"
276+ ) ;
277+ } ) ;
278+
247279 // Run the future in a new task.
248280 let fut = router. handle_request_batch( ctx, reqs) ;
249281
@@ -252,9 +284,9 @@ where
252284 // Send the response to the write task.
253285 // we don't care if the receiver has gone away,
254286 // as the task is done regardless.
255- if let Some ( rv ) = fut. await {
287+ if let Some ( json ) = fut. await {
256288 let _ = permit. send(
257- rv
289+ WriteItem { span , json }
258290 ) ;
259291 }
260292 }
@@ -275,6 +307,13 @@ where
275307 }
276308}
277309
310+ /// An item to be written to an outbound JSON pubsub stream.
311+ #[ derive( Debug , Clone ) ]
312+ pub ( crate ) struct WriteItem {
313+ pub ( crate ) span : tracing:: Span ,
314+ pub ( crate ) json : Box < RawValue > ,
315+ }
316+
278317/// The Write Task is responsible for writing JSON to the outbound connection.
279318struct WriteTask < T : Listener > {
280319 /// Task set
@@ -287,10 +326,13 @@ struct WriteTask<T: Listener> {
287326 ///
288327 /// Dropping this channel will cause the associated [`RouteTask`] to
289328 /// shutdown.
290- pub ( crate ) json : mpsc:: Receiver < Box < RawValue > > ,
329+ pub ( crate ) items : mpsc:: Receiver < WriteItem > ,
291330
292331 /// Outbound connections.
293332 pub ( crate ) connection : Out < T > ,
333+
334+ /// Counter for OTEL messages sent.
335+ pub ( crate ) tx_msg_id : Arc < AtomicU32 > ,
294336}
295337
296338impl < T : Listener > WriteTask < T > {
@@ -305,8 +347,9 @@ impl<T: Listener> WriteTask<T> {
305347 pub ( crate ) async fn task_future ( self ) {
306348 let WriteTask {
307349 tasks,
308- mut json ,
350+ mut items ,
309351 mut connection,
352+ tx_msg_id,
310353 ..
311354 } = self ;
312355
@@ -318,12 +361,20 @@ impl<T: Listener> WriteTask<T> {
318361 debug!( "Shutdown signal received" ) ;
319362 break ;
320363 }
321- json = json . recv( ) => {
322- let Some ( json) = json else {
364+ item = items . recv( ) => {
365+ let Some ( WriteItem { span , json } ) = item else {
323366 tracing:: error!( "Json stream has closed" ) ;
324367 break ;
325368 } ;
326- let span = debug_span!( "WriteTask" , conn_id = self . conn_id) ;
369+ span. record( "conn_id" , self . conn_id) ;
370+ span. in_scope( || {
371+ debug!(
372+ "rpc.message.id" = tx_msg_id. fetch_add( 1 , Ordering :: Relaxed ) ,
373+ "rpc.message.type" = "sent" ,
374+ "Sending response"
375+ ) ;
376+ } ) ;
377+
327378 if let Err ( err) = connection. send_json( json) . instrument( span) . await {
328379 debug!( %err, conn_id = self . conn_id, "Failed to send json" ) ;
329380 break ;
0 commit comments