Skip to content

Commit 96dea1a

Browse files
committed
Add a journal tracing all actor messages
Signed-off-by: Didier Wenzek <[email protected]>
1 parent d4e820a commit 96dea1a

File tree

6 files changed

+57
-35
lines changed

6 files changed

+57
-35
lines changed

Cargo.lock

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/common/tedge_config/src/system_toml/log_level.rs

+17
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,25 @@ pub fn log_init(
6868
.with_filter(filter_fn(|metadata| metadata.target() == "Audit"))
6969
});
7070

71+
// Actor traces
72+
let trace_appender = RollingFileAppender::builder()
73+
.rotation(Rotation::DAILY)
74+
.filename_prefix("tedge.actors.log")
75+
.max_log_files(2);
76+
let trace_layer = trace_appender
77+
.build("/var/log/tedge")
78+
.ok()
79+
.map(|trace_appender| {
80+
tracing_subscriber::fmt::layer()
81+
.with_writer(trace_appender)
82+
.with_timer(tracing_subscriber::fmt::time::UtcTime::rfc_3339())
83+
.with_filter(LevelFilter::DEBUG)
84+
.with_filter(filter_fn(|metadata| metadata.target() == "Actors"))
85+
});
86+
7187
tracing_subscriber::registry()
7288
.with(audit_layer)
89+
.with(trace_layer)
7390
.with(log_layer)
7491
.init();
7592

crates/core/tedge_actors/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@ test-helpers = []
1717
[dependencies]
1818
async-trait = { workspace = true }
1919
futures = { workspace = true }
20-
log = { workspace = true }
2120
thiserror = { workspace = true }
2221
tokio = { workspace = true, default_features = false, features = [
2322
"sync",
2423
"rt",
2524
"macros",
2625
"time",
2726
] }
27+
tracing = { workspace = true }
2828

2929
[dev-dependencies]
3030
tokio = { workspace = true, default_features = false, features = [

crates/core/tedge_actors/src/message_boxes.rs

+16-11
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ use crate::RuntimeRequest;
9393
use async_trait::async_trait;
9494
use futures::channel::mpsc;
9595
use futures::StreamExt;
96-
use log::debug;
9796
use std::fmt::Debug;
9897

9998
#[async_trait]
@@ -160,22 +159,22 @@ impl<Input: Debug> LoggingReceiver<Input> {
160159
}
161160

162161
#[async_trait]
163-
impl<Input: Send + Debug> MessageReceiver<Input> for LoggingReceiver<Input> {
162+
impl<Input: Send + Debug + Sync> MessageReceiver<Input> for LoggingReceiver<Input> {
164163
async fn try_recv(&mut self) -> Result<Option<Input>, RuntimeRequest> {
165164
let message = self.receiver.try_recv().await;
166-
debug!(target: &self.name, "recv {:?}", message);
165+
log_message_received(&self.name, &message);
167166
message
168167
}
169168

170169
async fn recv(&mut self) -> Option<Input> {
171170
let message = self.receiver.recv().await;
172-
debug!(target: &self.name, "recv {:?}", message);
171+
log_message_received(&self.name, &message);
173172
message
174173
}
175174

176175
async fn recv_signal(&mut self) -> Option<RuntimeRequest> {
177176
let message = self.receiver.recv_signal().await;
178-
debug!(target: &self.name, "recv {:?}", message);
177+
log_message_received(&self.name, &message);
179178
message
180179
}
181180
}
@@ -208,8 +207,14 @@ impl<Output: Message> Sender<Output> for LoggingSender<Output> {
208207
}
209208
}
210209

211-
pub fn log_message_sent<I: Debug>(target: &str, message: I) {
212-
debug!(target: target, "send {message:?}");
210+
#[inline]
211+
pub fn log_message_received<I: Debug>(actor: &str, message: &I) {
212+
tracing::debug!(target: "Actors", actor, recv = ?message);
213+
}
214+
215+
#[inline]
216+
pub fn log_message_sent<I: Debug>(actor: &str, message: &I) {
217+
tracing::debug!(target: "Actors", actor, send = ?message);
213218
}
214219

215220
/// An unbounded receiver
@@ -251,10 +256,10 @@ impl<Input: Debug> UnboundedLoggingReceiver<Input> {
251256
}
252257

253258
#[async_trait]
254-
impl<Input: Send + Debug> MessageReceiver<Input> for UnboundedLoggingReceiver<Input> {
259+
impl<Input: Send + Debug + Sync> MessageReceiver<Input> for UnboundedLoggingReceiver<Input> {
255260
async fn try_recv(&mut self) -> Result<Option<Input>, RuntimeRequest> {
256261
let message = self.next_message().await;
257-
debug!(target: &self.name, "recv {:?}", message);
262+
log_message_received(&self.name, &message);
258263
message
259264
}
260265

@@ -263,13 +268,13 @@ impl<Input: Send + Debug> MessageReceiver<Input> for UnboundedLoggingReceiver<In
263268
Ok(Some(message)) => Some(message),
264269
_ => None,
265270
};
266-
debug!(target: &self.name, "recv {:?}", message);
271+
log_message_received(&self.name, &message);
267272
message
268273
}
269274

270275
async fn recv_signal(&mut self) -> Option<RuntimeRequest> {
271276
let message = self.signal_receiver.next().await;
272-
debug!(target: &self.name, "recv {:?}", message);
277+
log_message_received(&self.name, &message);
273278
message
274279
}
275280
}

crates/core/tedge_actors/src/runtime.rs

+19-19
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ use crate::RuntimeRequestSink;
1111
use futures::channel::mpsc;
1212
use futures::prelude::*;
1313
use futures::stream::FuturesUnordered;
14-
use log::debug;
15-
use log::error;
16-
use log::info;
1714
use std::collections::HashMap;
1815
use std::panic;
1916
use std::time::Duration;
2017
use tokio::task::JoinError;
2118
use tokio::task::JoinHandle;
19+
use tracing::debug;
20+
use tracing::error;
21+
use tracing::info;
2222

2323
/// Actions sent by actors to the runtime
2424
#[derive(Debug)]
@@ -95,7 +95,7 @@ impl Runtime {
9595
/// and all the running tasks have reach completion (successfully or not).
9696
pub async fn run_to_completion(self) -> Result<(), RuntimeError> {
9797
if let Err(err) = Runtime::wait_for_completion(self.bg_task).await {
98-
error!("Aborted due to {err}");
98+
error!(target: "Actors", "Aborted due to {err}");
9999
std::process::exit(1)
100100
}
101101

@@ -138,7 +138,7 @@ impl RuntimeHandle {
138138

139139
/// Send an action to the runtime
140140
async fn send(&mut self, action: RuntimeAction) -> Result<(), ChannelError> {
141-
debug!(target: "Runtime", "schedule {:?}", action);
141+
debug!(target: "Actors", "schedule {:?}", action);
142142
self.actions_sender.send(action).await?;
143143
Ok(())
144144
}
@@ -175,7 +175,7 @@ impl RuntimeActor {
175175
}
176176

177177
async fn run(mut self) -> Result<(), RuntimeError> {
178-
info!(target: "Runtime", "Started");
178+
info!(target: "Actors", "Started");
179179
let mut aborting_error = None;
180180
let mut actors_count: usize = 0;
181181
loop {
@@ -186,7 +186,7 @@ impl RuntimeActor {
186186
match action {
187187
RuntimeAction::Spawn(actor) => {
188188
let running_name = format!("{}-{}", actor.name(), actors_count);
189-
info!(target: "Runtime", "Running {running_name}");
189+
info!(target: "Actors", "Running {running_name}");
190190
self.send_event(RuntimeEvent::Started {
191191
task: running_name.clone(),
192192
})
@@ -196,22 +196,22 @@ impl RuntimeActor {
196196
actors_count += 1;
197197
}
198198
RuntimeAction::Shutdown => {
199-
info!(target: "Runtime", "Shutting down");
199+
info!(target: "Actors", "Shutting down");
200200
shutdown_actors(&mut self.running_actors).await;
201201
break;
202202
}
203203
}
204204
}
205205
None => {
206-
info!(target: "Runtime", "Runtime actions channel closed, runtime stopping");
206+
info!(target: "Actors", "Runtime actions channel closed, runtime stopping");
207207
shutdown_actors(&mut self.running_actors).await;
208208
break;
209209
}
210210
}
211211
},
212212
Some(finished_actor) = self.futures.next() => {
213213
if let Err(error) = self.handle_actor_finishing(finished_actor).await {
214-
info!(target: "Runtime", "Shutting down on error: {error}");
214+
info!(target: "Actors", "Shutting down on error: {error}");
215215
aborting_error = Some(error);
216216
shutdown_actors(&mut self.running_actors).await;
217217
break
@@ -222,12 +222,12 @@ impl RuntimeActor {
222222

223223
tokio::select! {
224224
_ = tokio::time::sleep(self.cleanup_duration) => {
225-
error!(target: "Runtime", "Timeout waiting for all actors to shutdown");
225+
error!(target: "Actors", "Timeout waiting for all actors to shutdown");
226226
for still_running in self.running_actors.keys() {
227-
error!(target: "Runtime", "Failed to shutdown: {still_running}")
227+
error!(target: "Actors", "Failed to shutdown: {still_running}")
228228
}
229229
}
230-
_ = self.wait_for_actors_to_finish() => info!(target: "Runtime", "All actors have finished")
230+
_ = self.wait_for_actors_to_finish() => info!(target: "Actors", "All actors have finished")
231231
}
232232

233233
match aborting_error {
@@ -248,18 +248,18 @@ impl RuntimeActor {
248248
) -> Result<(), RuntimeError> {
249249
match finished_actor {
250250
Err(e) => {
251-
error!(target: "Runtime", "Failed to execute actor: {e}");
251+
error!(target: "Actors", "Failed to execute actor: {e}");
252252
Err(RuntimeError::JoinError(e))
253253
}
254254
Ok(Ok(actor)) => {
255255
self.running_actors.remove(&actor);
256-
info!(target: "Runtime", "Actor has finished: {actor}");
256+
info!(target: "Actors", "Actor has finished: {actor}");
257257
self.send_event(RuntimeEvent::Stopped { task: actor }).await;
258258
Ok(())
259259
}
260260
Ok(Err((actor, error))) => {
261261
self.running_actors.remove(&actor);
262-
error!(target: "Runtime", "Actor {actor} has finished unsuccessfully: {error:?}");
262+
error!(target: "Actors", "Actor {actor} has finished unsuccessfully: {error:?}");
263263
self.send_event(RuntimeEvent::Aborted {
264264
task: actor.clone(),
265265
error: format!("{error}"),
@@ -273,7 +273,7 @@ impl RuntimeActor {
273273
async fn send_event(&mut self, event: RuntimeEvent) {
274274
if let Some(events) = &mut self.events {
275275
if let Err(e) = events.send(event).await {
276-
error!(target: "Runtime", "Failed to send RuntimeEvent: {e}");
276+
error!(target: "Actors", "Failed to send RuntimeEvent: {e}");
277277
}
278278
}
279279
}
@@ -286,10 +286,10 @@ where
286286
for (running_as, sender) in a {
287287
match sender.send(RuntimeRequest::Shutdown).await {
288288
Ok(()) => {
289-
debug!(target: "Runtime", "Successfully sent shutdown request to {running_as}")
289+
debug!(target: "Actors", "Successfully sent shutdown request to {running_as}")
290290
}
291291
Err(e) => {
292-
error!(target: "Runtime", "Failed to send shutdown request to {running_as}: {e:?}")
292+
error!(target: "Actors", "Failed to send shutdown request to {running_as}: {e:?}")
293293
}
294294
}
295295
}

crates/core/tedge_actors/src/servers/message_boxes.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ impl<Request: Message, Response: Message> ConcurrentServerMessageBox<Request, Re
5555
}
5656
Some(result) = self.running_request_handlers.next() => {
5757
if let Err(err) = result {
58-
log::error!("Fail to run a request to completion: {err}");
58+
tracing::error!(target: "Actors", "Fail to run a request to completion: {err}");
5959
}
6060
}
6161
else => {
@@ -73,7 +73,7 @@ impl<Request: Message, Response: Message> ConcurrentServerMessageBox<Request, Re
7373
tokio::select! {
7474
Some(result) = self.running_request_handlers.next() => {
7575
if let Err(err) = result {
76-
log::error!("Fail to run a request to completion: {err}");
76+
tracing::error!(target: "Actors", "Fail to run a request to completion: {err}");
7777
}
7878
ControlFlow::Continue(())
7979
},

0 commit comments

Comments
 (0)