Skip to content

Commit fc74ea4

Browse files
authored
feat: OTEL semantic convention compliance and req/resp metrics (#32)
* feat: enhance otel implementation by setting more span info on request receipt * refactor: simplify conn_id * fix: share counters * refactor: make ctx private to prevent misuse * feat: comply with error_code requirement * feat: comply with request_id on server span * chore: readme update * fix: add the error_message prop as well * lint: clippy * nit: formatting * feat: use traceparent via otel http header extractor * feat: set otel status during response construction * refactor: DRY with macros * refactor: improve code quality and DRY * feat: appropiately associate spans with names * lint: clippy * fix: start message counters at 1 * chore: document non-compliance * fix: mock * fix: clone vs child * refactor: add handler macro, implement metrics * feat: metric for method not found * feat: record completed calls and active calls * chore: make some fns more priv
1 parent 16a42a2 commit fc74ea4

File tree

19 files changed

+1216
-547
lines changed

19 files changed

+1216
-547
lines changed

Cargo.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ description = "Simple, modern, ergonomic JSON-RPC 2.0 router built with tower an
55
keywords = ["json-rpc", "jsonrpc", "json"]
66
categories = ["web-programming::http-server", "web-programming::websocket"]
77

8-
version = "0.3.4"
8+
version = "0.4.0"
99
edition = "2021"
1010
rust-version = "1.81"
1111
authors = ["init4", "James Prestwich"]
@@ -15,6 +15,7 @@ repository = "https://github.com/init4tech/ajj"
1515

1616
[dependencies]
1717
bytes = "1.9.0"
18+
opentelemetry = "0.31.0"
1819
pin-project = "1.1.8"
1920
serde = { version = "1.0.217", features = ["derive"] }
2021
serde_json = { version = "1.0.135", features = ["raw_value"] }
@@ -23,10 +24,12 @@ tokio = { version = "1.43.0", features = ["sync", "rt", "macros"] }
2324
tokio-util = { version = "0.7.13", features = ["io", "rt"] }
2425
tower = { version = "0.5.2", features = ["util"] }
2526
tracing = "0.1.41"
27+
tracing-opentelemetry = "0.32.0"
2628

2729
# axum
2830
axum = { version = "0.8.1", optional = true }
2931
mime = { version = "0.3.17", optional = true }
32+
opentelemetry-http = { version = "0.31.0", optional = true }
3033

3134
# pubsub
3235
tokio-stream = { version = "0.1.17", optional = true }
@@ -37,6 +40,7 @@ interprocess = { version = "2.2.2", features = ["async", "tokio"], optional = tr
3740
# ws
3841
tokio-tungstenite = { version = "0.26.1", features = ["rustls-tls-webpki-roots"], optional = true }
3942
futures-util = { version = "0.3.31", optional = true }
43+
metrics = "0.24.2"
4044

4145
[dev-dependencies]
4246
ajj = { path = "./", features = ["axum", "ws", "ipc"] }
@@ -51,7 +55,7 @@ eyre = "0.6.12"
5155

5256
[features]
5357
default = ["axum", "ws", "ipc"]
54-
axum = ["dep:axum", "dep:mime"]
58+
axum = ["dep:axum", "dep:mime", "dep:opentelemetry-http"]
5559
pubsub = ["dep:tokio-stream", "axum?/ws"]
5660
ipc = ["pubsub", "dep:interprocess"]
5761
ws = ["pubsub", "dep:tokio-tungstenite", "dep:futures-util"]

README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,23 @@ implementations.
7171

7272
See the [crate documentation on docs.rs] for more detailed examples.
7373

74+
## Specification Complinace
75+
76+
`ajj` aims to be fully compliant with the [JSON-RPC 2.0] specification. If any
77+
issues are found, please [open an issue]!
78+
79+
`ajj` produces [`tracing`] spans and events that meet the [OpenTelemetry
80+
semantic conventions] for JSON-RPC servers with the following exceptions:
81+
82+
- The `server.address` attribute is NOT set, as the server address is not always
83+
known to the ajj system.
84+
- `rpc.message` events are included in AJJ system spans for the batch request,
85+
which technically does not comply with semantic conventions. The semantic
86+
conventions do not specify how to handle batch requests, and assume that each
87+
message corresponds to a separate request. In AJJ, batch requests are a single
88+
message, and result in a single `rpc.message` event at receipt and at
89+
response.
90+
7491
## Note on code provenance
7592

7693
Some code in this project has been reproduced or adapted from other projects.
@@ -94,3 +111,6 @@ reproduced from the following projects, and we are grateful for their work:
94111
[`interprocess::local_socket::ListenerOptions`]: https://docs.rs/interprocess/latest/interprocess/local_socket/struct.ListenerOptions.html
95112
[std::net::SocketAddr]: https://doc.rust-lang.org/std/net/enum.SocketAddr.html
96113
[alloy]: https://docs.rs/alloy/latest/alloy/
114+
[open an issue]: https://github.com/init4tech/ajj/issues/new
115+
[OpenTelemetry semantic conventions]: https://opentelemetry.io/docs/specs/semconv/rpc/json-rpc/
116+
[`tracing`]: https://docs.rs/tracing/latest/tracing/

src/axum.rs

Lines changed: 62 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
use crate::{
22
types::{InboundData, Response},
3-
HandlerCtx, TaskSet,
3+
HandlerCtx, TaskSet, TracingInfo,
44
};
55
use axum::{
66
extract::FromRequest,
77
http::{header, HeaderValue},
88
response::IntoResponse,
99
};
1010
use bytes::Bytes;
11-
use std::{future::Future, pin::Pin};
11+
use std::{
12+
future::Future,
13+
pin::Pin,
14+
sync::{atomic::AtomicU32, Arc},
15+
};
1216
use tokio::runtime::Handle;
17+
use tracing::{Instrument, Span};
1318

1419
/// A wrapper around an [`Router`] that implements the
1520
/// [`axum::handler::Handler`] trait. This struct is an implementation detail
@@ -21,14 +26,22 @@ use tokio::runtime::Handle;
2126
#[derive(Debug, Clone)]
2227
pub(crate) struct IntoAxum<S> {
2328
pub(crate) router: crate::Router<S>,
29+
2430
pub(crate) task_set: TaskSet,
31+
32+
/// Counter for OTEL messages received.
33+
pub(crate) rx_msg_id: Arc<AtomicU32>,
34+
/// Counter for OTEL messages sent.
35+
pub(crate) tx_msg_id: Arc<AtomicU32>,
2536
}
2637

2738
impl<S> From<crate::Router<S>> for IntoAxum<S> {
2839
fn from(router: crate::Router<S>) -> Self {
2940
Self {
3041
router,
3142
task_set: Default::default(),
43+
rx_msg_id: Arc::new(AtomicU32::new(1)),
44+
tx_msg_id: Arc::new(AtomicU32::new(1)),
3245
}
3346
}
3447
}
@@ -39,12 +52,26 @@ impl<S> IntoAxum<S> {
3952
Self {
4053
router,
4154
task_set: handle.into(),
55+
rx_msg_id: Arc::new(AtomicU32::new(1)),
56+
tx_msg_id: Arc::new(AtomicU32::new(1)),
4257
}
4358
}
59+
}
4460

45-
/// Get a new context, built from the task set.
46-
fn ctx(&self) -> HandlerCtx {
47-
self.task_set.clone().into()
61+
impl<S> IntoAxum<S>
62+
where
63+
S: Clone + Send + Sync + 'static,
64+
{
65+
fn ctx(&self, req: &axum::extract::Request) -> HandlerCtx {
66+
let parent_context = opentelemetry::global::get_text_map_propagator(|propagator| {
67+
propagator.extract(&opentelemetry_http::HeaderExtractor(req.headers()))
68+
});
69+
70+
HandlerCtx::new(
71+
None,
72+
self.task_set.clone(),
73+
TracingInfo::new_with_context(self.router.service_name(), parent_context),
74+
)
4875
}
4976
}
5077

@@ -56,25 +83,50 @@ where
5683

5784
fn call(self, req: axum::extract::Request, state: S) -> Self::Future {
5885
Box::pin(async move {
86+
let ctx = self.ctx(&req);
87+
ctx.init_request_span(&self.router, Some(&Span::current()));
88+
5989
let Ok(bytes) = Bytes::from_request(req, &state).await else {
90+
crate::metrics::record_parse_error(self.router.service_name());
6091
return Box::<str>::from(Response::parse_error()).into_response();
6192
};
6293

63-
// If the inbound data is not currently parsable, we
64-
// send an empty one it to the router, as the router enforces
65-
// the specification.
66-
let req = InboundData::try_from(bytes).unwrap_or_default();
94+
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/rpc/rpc-spans.md#message-event
95+
let req = ctx.span().in_scope(|| {
96+
message_event!(
97+
@received,
98+
counter: &self.rx_msg_id,
99+
bytes: bytes.len(),
100+
);
101+
102+
// If the inbound data is not currently parsable, we
103+
// send an empty one it to the router, as the router enforces
104+
// the specification.
105+
InboundData::try_from(bytes).unwrap_or_default()
106+
});
67107

108+
let span = ctx.span().clone();
68109
if let Some(response) = self
69110
.router
70-
.call_batch_with_state(self.ctx(), req, state)
111+
.call_batch_with_state(ctx, req, state)
112+
.instrument(span.clone())
71113
.await
72114
{
73115
let headers = [(
74116
header::CONTENT_TYPE,
75117
HeaderValue::from_static(mime::APPLICATION_JSON.as_ref()),
76118
)];
119+
77120
let body = Box::<str>::from(response);
121+
122+
span.in_scope(|| {
123+
message_event!(
124+
@sent,
125+
counter: &self.tx_msg_id,
126+
bytes: body.len(),
127+
);
128+
});
129+
78130
(headers, body).into_response()
79131
} else {
80132
().into_response()

src/lib.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
//! })
2525
//! // Routes get a ctx, which can be used to send notifications.
2626
//! .route("notify", |ctx: HandlerCtx| async move {
27-
//! if ctx.notifications().is_none() {
27+
//! if !ctx.notifications_enabled() {
2828
//! // This error will appear in the ResponsePayload's `data` field.
2929
//! return Err("notifications are disabled");
3030
//! }
@@ -159,6 +159,8 @@ mod axum;
159159
mod error;
160160
pub use error::RegistrationError;
161161

162+
pub(crate) mod metrics;
163+
162164
mod primitives;
163165
pub use primitives::{BorrowedRpcObject, MethodId, RpcBorrow, RpcObject, RpcRecv, RpcSend};
164166

@@ -171,6 +173,7 @@ pub use pubsub::ReadJsonStream;
171173
mod routes;
172174
pub use routes::{
173175
BatchFuture, Handler, HandlerArgs, HandlerCtx, NotifyError, Params, RouteFuture, State,
176+
TracingInfo,
174177
};
175178
pub(crate) use routes::{BoxedIntoRoute, ErasedIntoRoute, Method, Route};
176179

@@ -206,7 +209,8 @@ pub(crate) mod test_utils {
206209
mod test {
207210

208211
use crate::{
209-
router::RouterInner, routes::HandlerArgs, test_utils::assert_rv_eq, ResponsePayload,
212+
router::RouterInner, routes::HandlerArgs, test_utils::assert_rv_eq, HandlerCtx,
213+
ResponsePayload,
210214
};
211215
use bytes::Bytes;
212216
use serde_json::value::RawValue;
@@ -231,10 +235,7 @@ mod test {
231235

232236
let res = router
233237
.call_with_state(
234-
HandlerArgs {
235-
ctx: Default::default(),
236-
req: req.try_into().unwrap(),
237-
},
238+
HandlerArgs::new(HandlerCtx::mock(), req.try_into().unwrap()),
238239
(),
239240
)
240241
.await
@@ -250,10 +251,7 @@ mod test {
250251

251252
let res2 = router
252253
.call_with_state(
253-
HandlerArgs {
254-
ctx: Default::default(),
255-
req: req2.try_into().unwrap(),
256-
},
254+
HandlerArgs::new(HandlerCtx::mock(), req2.try_into().unwrap()),
257255
(),
258256
)
259257
.await

0 commit comments

Comments
 (0)