Skip to content

Commit 6425abd

Browse files
committed
refactor: add handler macro, implement metrics
1 parent 2aa5416 commit 6425abd

File tree

12 files changed

+594
-462
lines changed

12 files changed

+594
-462
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ interprocess = { version = "2.2.2", features = ["async", "tokio"], optional = tr
4040
# ws
4141
tokio-tungstenite = { version = "0.26.1", features = ["rustls-tls-webpki-roots"], optional = true }
4242
futures-util = { version = "0.3.31", optional = true }
43+
metrics = "0.24.2"
4344

4445
[dev-dependencies]
4546
ajj = { path = "./", features = ["axum", "ws", "ipc"] }

src/axum.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ where
8787
ctx.init_request_span(&self.router, Some(&Span::current()));
8888

8989
let Ok(bytes) = Bytes::from_request(req, &state).await else {
90+
crate::metrics::record_parse_error(self.router.service_name());
9091
return Box::<str>::from(Response::parse_error()).into_response();
9192
};
9293

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ mod axum;
159159
mod error;
160160
pub use error::RegistrationError;
161161

162+
pub(crate) mod metrics;
163+
162164
mod primitives;
163165
pub use primitives::{BorrowedRpcObject, MethodId, RpcBorrow, RpcObject, RpcRecv, RpcSend};
164166

src/macros.rs

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,179 @@ macro_rules! message_event {
7676
};
7777
}
7878

79+
/// Implement a `Handler` call, with metrics recording and response building.
80+
macro_rules! impl_handler_call {
81+
(@metrics, $success:expr, $id:expr, $service:expr, $method:expr) => {{
82+
// Record the metrics.
83+
$crate::metrics::record_execution($success, $service, $method);
84+
$crate::metrics::record_output($id.is_some(), $service, $method);
85+
}};
86+
87+
(@record_span, $span:expr, $payload:expr) => {
88+
if let Some(e) = $payload.as_error() {
89+
use tracing_opentelemetry::OpenTelemetrySpanExt;
90+
$span.record("rpc.jsonrpc.error_code", e.code);
91+
$span.record("rpc.jsonrpc.error_message", e.message.as_ref());
92+
$span.set_status(::opentelemetry::trace::Status::Error {
93+
description: e.message.clone(),
94+
});
95+
}
96+
};
97+
98+
// Hit the metrics and return the payload if any.
99+
(@finish $span:expr, $id:expr, $service:expr, $method:expr, $payload:expr) => {{
100+
impl_handler_call!(@metrics, $payload.is_success(), $id, $service, $method);
101+
impl_handler_call!(@record_span, $span, $payload);
102+
return Response::build_response($id.as_deref(), $payload);
103+
}};
104+
105+
(@unpack_params $span:expr, $id:expr, $service:expr, $method:expr, $req:expr) => {{
106+
let Ok(params) = $req.deser_params() else {
107+
impl_handler_call!(@finish $span, $id, $service, $method, &ResponsePayload::<(), ()>::invalid_params());
108+
};
109+
drop($req); // no longer needed
110+
params
111+
}};
112+
113+
(@unpack_struct_params $span:expr, $id:expr, $service:expr, $method:expr, $req:expr) => {{
114+
let Ok(params) = $req.deser_params() else {
115+
impl_handler_call!(@finish $span, $id, $service, $method, &ResponsePayload::<(), ()>::invalid_params());
116+
};
117+
drop($req); // no longer needed
118+
params
119+
}};
120+
121+
(@unpack $args:expr) => {{
122+
let id = $args.id_owned();
123+
let (ctx, req) = $args.into_parts();
124+
let inst = ctx.span().clone();
125+
let span = ctx.span().clone();
126+
let method = req.method().to_string();
127+
let service = ctx.service_name();
128+
129+
(id, ctx, inst, span, method, service, req)
130+
}};
131+
132+
// NO ARGS
133+
($args:expr, $this:ident()) => {{
134+
let (id, ctx, inst, span, method, service, req) = impl_handler_call!(@unpack $args);
135+
drop(ctx); // no longer needed
136+
drop(req); // no longer needed
137+
138+
Box::pin(
139+
async move {
140+
let payload: $crate::ResponsePayload<_, _> = $this().await.into();
141+
impl_handler_call!(@finish span, id, service, &method, &payload);
142+
}
143+
.instrument(inst),
144+
)
145+
}};
146+
147+
// CTX only
148+
($args:expr, $this:ident(ctx)) => {{
149+
let (id, ctx, inst, span, method, service, req) = impl_handler_call!(@unpack $args);
150+
drop(req); // no longer needed
151+
152+
Box::pin(
153+
async move {
154+
let payload: $crate::ResponsePayload<_, _> = $this(ctx).await.into();
155+
impl_handler_call!(@finish span, id, service, &method, &payload);
156+
}
157+
.instrument(inst),
158+
)
159+
}};
160+
161+
162+
// PARAMS only
163+
($args:expr, $this:ident(params: $params_ty:ty)) => {{
164+
let (id, ctx, inst, span, method, service, req) = impl_handler_call!(@unpack $args);
165+
drop(ctx); // no longer needed
166+
167+
Box::pin(
168+
async move {
169+
let params: $params_ty = impl_handler_call!(@unpack_params span, id, service, &method, req);
170+
let payload: $crate::ResponsePayload<_, _> = $this(params.into()).await.into();
171+
impl_handler_call!(@finish span, id, service, &method, &payload);
172+
}
173+
.instrument(inst),
174+
)
175+
}};
176+
177+
178+
// STATE only
179+
($args:expr, $this:ident($state:expr)) => {{
180+
let (id, ctx, inst, span, method, service, req) = impl_handler_call!(@unpack $args);
181+
drop(ctx); // no longer needed
182+
drop(req); // no longer needed
183+
184+
Box::pin(
185+
async move {
186+
let payload: $crate::ResponsePayload<_, _> = $this($state).await.into();
187+
impl_handler_call!(@finish span, id, service, &method, &payload);
188+
}
189+
.instrument(inst),
190+
)
191+
}};
192+
193+
194+
// CTX and PARAMS
195+
($args:expr, $this:ident(ctx, params: $params_ty:ty)) => {{
196+
let (id, ctx, inst, span, method, service, req) = impl_handler_call!(@unpack $args);
197+
198+
Box::pin(
199+
async move {
200+
let params: $params_ty = impl_handler_call!(@unpack_params span, id, service, &method, req);
201+
let payload: $crate::ResponsePayload<_, _> = $this(ctx, params.into()).await.into();
202+
impl_handler_call!(@finish span, id, service, &method, &payload);
203+
}
204+
.instrument(inst),
205+
)
206+
}};
207+
208+
// CTX and STATE
209+
($args:expr, $this:ident(ctx, $state:expr)) => {{
210+
let (id, ctx, inst, span, method, service, req) = impl_handler_call!(@unpack $args);
211+
drop(req); // no longer needed
212+
213+
Box::pin(
214+
async move {
215+
let payload: $crate::ResponsePayload<_, _> = $this(ctx, $state).await.into();
216+
impl_handler_call!(@finish span, id, service, &method, &payload);
217+
}
218+
.instrument(inst),
219+
)
220+
}};
221+
222+
// PARAMS and STATE
223+
($args:expr, $this:ident(params: $params_ty:ty, $state:expr)) => {{
224+
let (id, ctx, inst, span, method, service, req) = impl_handler_call!(@unpack $args);
225+
drop(ctx); // no longer needed
226+
227+
Box::pin(
228+
async move {
229+
let params: $params_ty = impl_handler_call!(@unpack_params span, id, service, &method, req);
230+
let payload: $crate::ResponsePayload<_, _> = $this(params.into(), $state).await.into();
231+
impl_handler_call!(@finish span, id, service, &method, &payload);
232+
}
233+
.instrument(inst),
234+
)
235+
}};
236+
237+
// CTX and PARAMS and STATE
238+
($args:expr, $this:ident(ctx, params: $params_ty:ty, $state:expr)) => {{
239+
let (id, ctx, inst, span, method, service, req) = impl_handler_call!(@unpack $args);
240+
241+
Box::pin(
242+
async move {
243+
let params: $params_ty = impl_handler_call!(@unpack_params span, id, service, &method, req);
244+
let payload: $crate::ResponsePayload<_, _> = $this(ctx, params.into(), $state).await.into();
245+
impl_handler_call!(@finish span, id, service, &method, &payload);
246+
}
247+
.instrument(inst),
248+
)
249+
}};
250+
}
251+
79252
// Some code is this file is reproduced under the terms of the MIT license. It
80253
// originates from the `axum` crate. The original source code can be found at
81254
// the following URL, and the original license is included below.

src/metrics.rs

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
use std::sync::LazyLock;
2+
3+
use metrics::Counter;
4+
5+
/// Metric name for counting router calls.
6+
pub(crate) const ROUTER_CALLS: &str = "ajj.router.calls";
7+
pub(crate) const ROUTER_CALLS_HELP: &str =
8+
"Number of calls to ajj router methods. Not all requests will result in a response.";
9+
10+
/// Metric name for counting router error execution.
11+
pub(crate) const ROUTER_ERRORS: &str = "ajj.router.errors";
12+
pub(crate) const ROUTER_ERRORS_HELP: &str =
13+
"Number of errored executions by ajj router methods. This does NOT imply a response was sent.";
14+
15+
// Metric name for counting router successful executions.
16+
pub(crate) const ROUTER_SUCCESSES: &str = "ajj.router.successes";
17+
pub(crate) const ROUTER_SUCCESSES_HELP: &str =
18+
"Number of successful executions by ajj router methods. This does NOT imply a response was sent.";
19+
20+
/// Metric name for counting router responses.
21+
pub(crate) const ROUTER_RESPONSES: &str = "ajj.router.responses";
22+
pub(crate) const ROUTER_RESPONSES_HELP: &str =
23+
"Number of responses sent by ajj router methods. Not all requests will result in a response.";
24+
25+
// Metric name for counting omitted notification responses.
26+
pub(crate) const ROUTER_NOTIFICATION_RESPONSE_OMITTED: &str =
27+
"ajj.router.notification_response_omitted";
28+
pub(crate) const ROUTER_NOTIFICATION_RESPONSE_OMITTED_HELP: &str =
29+
"Number of times ajj router methods omitted a response to a notification";
30+
31+
// Metric for counting parse errors.
32+
pub(crate) const ROUTER_PARSE_ERRORS: &str = "ajj.router.parse_errors";
33+
pub(crate) const ROUTER_PARSE_ERRORS_HELP: &str =
34+
"Number of parse errors encountered by ajj router methods. This implies no response was sent.";
35+
36+
static DESCRIBE: LazyLock<()> = LazyLock::new(|| {
37+
metrics::describe_counter!(ROUTER_CALLS, metrics::Unit::Count, ROUTER_CALLS_HELP);
38+
metrics::describe_counter!(ROUTER_ERRORS, metrics::Unit::Count, ROUTER_ERRORS_HELP);
39+
metrics::describe_counter!(
40+
ROUTER_SUCCESSES,
41+
metrics::Unit::Count,
42+
ROUTER_SUCCESSES_HELP
43+
);
44+
metrics::describe_counter!(
45+
ROUTER_RESPONSES,
46+
metrics::Unit::Count,
47+
ROUTER_RESPONSES_HELP
48+
);
49+
metrics::describe_counter!(
50+
ROUTER_NOTIFICATION_RESPONSE_OMITTED,
51+
metrics::Unit::Count,
52+
ROUTER_NOTIFICATION_RESPONSE_OMITTED_HELP
53+
);
54+
metrics::describe_counter!(
55+
ROUTER_PARSE_ERRORS,
56+
metrics::Unit::Count,
57+
ROUTER_PARSE_ERRORS_HELP
58+
);
59+
});
60+
61+
/// Get or register a counter for calls to a specific service and method.
62+
pub(crate) fn calls(service_name: &'static str, method: &str) -> Counter {
63+
let _ = &DESCRIBE;
64+
metrics::counter!(
65+
ROUTER_CALLS,
66+
"service" => service_name.to_string(),
67+
"method" => method.to_string()
68+
)
69+
}
70+
71+
/// Record a call to a specific service and method.
72+
pub(crate) fn record_call(service_name: &'static str, method: &str) {
73+
let counter = calls(service_name, method);
74+
counter.increment(1);
75+
}
76+
77+
/// Get or register a counter for errors from a specific service and method.
78+
pub(crate) fn errors(service_name: &'static str, method: &str) -> Counter {
79+
let _ = &DESCRIBE;
80+
metrics::counter!(
81+
ROUTER_ERRORS,
82+
"service" => service_name.to_string(),
83+
"method" => method.to_string()
84+
)
85+
}
86+
87+
/// Record an error from a specific service and method.
88+
pub(crate) fn record_execution_error(service_name: &'static str, method: &str) {
89+
let counter = errors(service_name, method);
90+
counter.increment(1);
91+
}
92+
93+
/// Get or register a counter for successes from a specific service and method.
94+
pub(crate) fn successes(service_name: &'static str, method: &str) -> Counter {
95+
let _ = &DESCRIBE;
96+
metrics::counter!(
97+
ROUTER_SUCCESSES,
98+
"service" => service_name.to_string(),
99+
"method" => method.to_string()
100+
)
101+
}
102+
103+
/// Record a success from a specific service and method.
104+
pub(crate) fn record_execution_success(service_name: &'static str, method: &str) {
105+
let counter = successes(service_name, method);
106+
counter.increment(1);
107+
}
108+
109+
/// Record a response from a specific service and method, incrementing either
110+
/// the success or error counter.
111+
pub(crate) fn record_execution(success: bool, service_name: &'static str, method: &str) {
112+
if success {
113+
record_execution_success(service_name, method);
114+
} else {
115+
record_execution_error(service_name, method);
116+
}
117+
}
118+
119+
/// Get or register a counter for responses from a specific service and method.
120+
pub(crate) fn responses(service_name: &'static str, method: &str) -> Counter {
121+
let _ = &DESCRIBE;
122+
metrics::counter!(
123+
ROUTER_RESPONSES,
124+
"service" => service_name.to_string(),
125+
"method" => method.to_string()
126+
)
127+
}
128+
129+
/// Record a response from a specific service and method.
130+
pub(crate) fn record_response(service_name: &'static str, method: &str) {
131+
let counter = responses(service_name, method);
132+
counter.increment(1);
133+
}
134+
135+
/// Get or register a counter for omitted notification responses from a specific service and method.
136+
pub(crate) fn response_omitted(service_name: &'static str, method: &str) -> Counter {
137+
let _ = &DESCRIBE;
138+
metrics::counter!(
139+
ROUTER_NOTIFICATION_RESPONSE_OMITTED,
140+
"service" => service_name.to_string(),
141+
"method" => method.to_string()
142+
)
143+
}
144+
145+
/// Record an omitted notification response from a specific service and method.
146+
pub(crate) fn record_response_omitted(service_name: &'static str, method: &str) {
147+
let counter = response_omitted(service_name, method);
148+
counter.increment(1);
149+
}
150+
151+
/// Record either a response sent or an omitted notification response.
152+
pub(crate) fn record_output(response_sent: bool, service_name: &'static str, method: &str) {
153+
if response_sent {
154+
record_response(service_name, method);
155+
} else {
156+
record_response_omitted(service_name, method);
157+
}
158+
}
159+
160+
// Get or register a counter for parse errors.
161+
pub(crate) fn parse_errors(service_name: &'static str) -> Counter {
162+
let _ = &DESCRIBE;
163+
metrics::counter!(ROUTER_PARSE_ERRORS, "service" => service_name.to_string())
164+
}
165+
166+
/// Record a parse error.
167+
pub(crate) fn record_parse_error(service_name: &'static str) {
168+
let counter = parse_errors(service_name);
169+
counter.increment(1);
170+
}

0 commit comments

Comments
 (0)