Skip to content

Commit ab7335e

Browse files
multi: refactor tokio runtime handling, listen for multiple signals (#21)
* multi: refactor tokio runtime handling, listen for multiple signals The motivation for this is being able to pick up multiple different error sources that need to be propagated out from the main function. Prior to this commit, for example being unable to start the RPC server would not end in a fatal error. * lib: fix clippy issues * main: move egui function
1 parent 76c922e commit ab7335e

File tree

6 files changed

+66
-66
lines changed

6 files changed

+66
-66
lines changed

Cargo.lock

Lines changed: 0 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ serde = "1.0.179"
3131
serde_json = "1.0.113"
3232
thiserror = "2.0.11"
3333
tiny-bip39 = "2.0.0"
34-
tokio = { version = "1.29.1", default-features = false }
34+
tokio = { version = "1.29.1", default-features = false, features = ["signal"] }
3535
tokio-util = "0.7.10"
3636
tonic = "0.12.3"
3737
tracing = "0.1.40"

app/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ anyhow = { workspace = true }
1111
bincode = { workspace = true }
1212
bitcoin = { workspace = true, features = ["serde"] }
1313
clap = { workspace = true, features = ["derive"] }
14-
ctrlc = "3.4.0"
1514
dirs = "5.0.1"
1615
eframe = "0.30.0"
1716
futures = { workspace = true }

app/main.rs

Lines changed: 59 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
#![feature(let_chains)]
22

3-
use std::{path::Path, sync::mpsc};
3+
use std::path::Path;
44

55
use clap::Parser as _;
66

7+
use tokio::{signal::ctrl_c, sync::oneshot};
78
use tracing_subscriber::{
89
filter as tracing_filter, fmt::format, layer::SubscriberExt, Layer,
910
};
@@ -140,6 +141,26 @@ fn set_tracing_subscriber(
140141
Ok((line_buffer, rolling_log_guard))
141142
}
142143

144+
fn run_egui_app(
145+
config: &crate::cli::Config,
146+
line_buffer: LineBuffer,
147+
app: Result<crate::app::App, crate::app::Error>,
148+
) -> Result<(), eframe::Error> {
149+
let native_options = eframe::NativeOptions::default();
150+
eframe::run_native(
151+
"Thunder",
152+
native_options,
153+
Box::new(move |cc| {
154+
Ok(Box::new(gui::EguiApp::new(
155+
app.ok(),
156+
cc,
157+
line_buffer,
158+
config.rpc_addr,
159+
)))
160+
}),
161+
)
162+
}
163+
143164
fn main() -> anyhow::Result<()> {
144165
let cli = cli::Cli::parse();
145166
let config = cli.get_config()?;
@@ -148,55 +169,45 @@ fn main() -> anyhow::Result<()> {
148169
config.log_level,
149170
config.log_level_file,
150171
)?;
151-
let app: Result<app::App, app::Error> = app::App::new(&config)
152-
.inspect(|app| {
153-
// spawn rpc server
154-
app.runtime.spawn({
155-
let app = app.clone();
156-
async move {
157-
rpc_server::run_server(app, config.rpc_addr).await.unwrap()
172+
173+
let (app_tx, app_rx) = oneshot::channel::<anyhow::Error>();
174+
175+
let app = app::App::new(&config).inspect(|app| {
176+
// spawn rpc server
177+
app.runtime.spawn({
178+
let app = app.clone();
179+
async move {
180+
tracing::info!("starting RPC server at `{}`", config.rpc_addr);
181+
if let Err(err) =
182+
rpc_server::run_server(app, config.rpc_addr).await
183+
{
184+
app_tx.send(err).expect("failed to send error to app");
158185
}
159-
});
160-
})
161-
.inspect_err(|err| {
162-
tracing::error!("application error: {:?}", err);
186+
}
163187
});
188+
});
164189

165-
if config.headless {
166-
tracing::info!("Running in headless mode");
167-
drop(line_buffer);
168-
let _app = app?;
169-
// wait for ctrlc signal
170-
let (tx, rx) = mpsc::channel();
171-
ctrlc::set_handler(move || {
172-
tx.send(()).unwrap();
173-
})
174-
.expect("Error setting Ctrl-C handler");
175-
rx.recv().unwrap();
176-
tracing::info!("Received Ctrl-C signal, exiting...");
177-
} else {
178-
let native_options = eframe::NativeOptions::default();
179-
let app: Option<_> = app.map_or_else(
180-
|err| {
181-
let err = anyhow::Error::from(err);
182-
tracing::error!("{err:#}");
183-
None
184-
},
185-
Some,
186-
);
187-
eframe::run_native(
188-
"Thunder",
189-
native_options,
190-
Box::new(move |cc| {
191-
Ok(Box::new(gui::EguiApp::new(
192-
app,
193-
cc,
194-
line_buffer,
195-
config.rpc_addr,
196-
)))
197-
}),
198-
)
199-
.map_err(|err| anyhow::anyhow!("failed to launch egui app: {err}"))?
190+
if !config.headless {
191+
// For GUI mode we want the GUI to start, even if the app fails to start.
192+
return run_egui_app(&config, line_buffer, app)
193+
.map_err(|e| anyhow::anyhow!("failed to run egui app: {e:#}"));
200194
}
201-
Ok(())
195+
196+
tracing::info!("Running in headless mode");
197+
drop(line_buffer);
198+
199+
// If we're headless, we want to exit hard if the app fails to start.
200+
let app = app?;
201+
202+
app.runtime.block_on(async move {
203+
tokio::select! {
204+
Ok(_) = ctrl_c() => {
205+
tracing::info!("Shutting down due to process interruption");
206+
Ok(())
207+
}
208+
Ok(err) = app_rx => {
209+
Err(anyhow::anyhow!("received error from RPC server: {err:#} ({err:?})"))
210+
}
211+
}
212+
})
202213
}

app/rpc_server.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,6 @@ pub async fn run_server(
259259
let server = Server::builder().build(rpc_addr).await?;
260260

261261
let addr = server.local_addr()?;
262-
tracing::info!("RPC server listening on {}", addr);
263262

264263
let handle = server.start(RpcServerImpl { app }.into_rpc());
265264

lib/authorization.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,11 @@ pub fn verify_authorized_transaction(
105105
transaction: &AuthorizedTransaction,
106106
) -> Result<(), Error> {
107107
let tx_bytes_canonical = borsh::to_vec(&transaction.transaction)?;
108-
let messages: Vec<_> = std::iter::repeat(tx_bytes_canonical.as_slice())
109-
.take(transaction.authorizations.len())
110-
.collect();
108+
let messages: Vec<_> = std::iter::repeat_n(
109+
tx_bytes_canonical.as_slice(),
110+
transaction.authorizations.len(),
111+
)
112+
.collect();
111113
let (verifying_keys, signatures): (Vec<VerifyingKey>, Vec<Signature>) =
112114
transaction
113115
.authorizations
@@ -137,7 +139,7 @@ pub fn verify_authorizations(body: &Body) -> Result<(), Error> {
137139
serialized_transactions.iter().map(Vec::as_slice);
138140
let messages = input_numbers.zip(serialized_transactions).flat_map(
139141
|(input_number, serialized_transaction)| {
140-
std::iter::repeat(serialized_transaction).take(input_number)
142+
std::iter::repeat_n(serialized_transaction, input_number)
141143
},
142144
);
143145

0 commit comments

Comments
 (0)