diff --git a/Cargo.lock b/Cargo.lock index 1a76c579..15da7265 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4736,7 +4736,7 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "uptime-checker" -version = "25.12.0" +version = "25.12.1" dependencies = [ "anyhow", "associative-cache", diff --git a/clippy.toml b/clippy.toml new file mode 100644 index 00000000..154626ef --- /dev/null +++ b/clippy.toml @@ -0,0 +1 @@ +allow-unwrap-in-tests = true diff --git a/src/app.rs b/src/app.rs index dd06f4ca..65e7742b 100644 --- a/src/app.rs +++ b/src/app.rs @@ -42,15 +42,17 @@ pub fn execute() -> io::Result<()> { } result = shutdown.recv_task_finished() => { match result { - None => panic!("tasks_finished channel unexpectedly closed"), + None => tracing::info!("system.recv_task_finished.channel_closed"), Some(Err(err)) => { - panic!("Error in partition: {err:?}"); + tracing::error!(%err, "system.recv_task_finished.shutdown_error"); }, - _ => panic!("Unexpected end of task"), + Some(Ok(_)) => tracing::info!("system.recv_task_finished.end_of_task"), } } } - shutdown.stop().await; + if let Err(err) = shutdown.stop().await { + tracing::error!(%err, "system.shutdown_error"); + } tracing::info!("system.shutdown"); Ok(()) }), diff --git a/src/check_config_provider/redis_config_provider.rs b/src/check_config_provider/redis_config_provider.rs index 71f067c8..4cb85962 100644 --- a/src/check_config_provider/redis_config_provider.rs +++ b/src/check_config_provider/redis_config_provider.rs @@ -158,17 +158,15 @@ impl RedisConfigProvider { .set(config_payloads.len() as f64); for config_payload in config_payloads { - let config: CheckConfig = rmp_serde::from_slice(&config_payload) - .map_err(|err| { - tracing::error!(?err, "config_consumer.invalid_config_message"); - }) - .unwrap(); - manager - .get_service(partition.number) - .get_config_store() - .write() - .unwrap() - .add_config(config); + match rmp_serde::from_slice(&config_payload) { + Ok(config) => manager + .get_service(partition.number) + .get_config_store() + .write() + .expect("lock not poisoned") + .add_config(config), + Err(err) => tracing::error!(?err, "config_consumer.invalid_config_message"), + } } let partition_loading_time = partition_start_loading.elapsed().as_secs_f64(); metrics::histogram!( @@ -252,22 +250,22 @@ impl RedisConfigProvider { .await; for config_payload in config_payloads { - let config: CheckConfig = rmp_serde::from_slice(&config_payload) - .map_err(|err| { - tracing::error!(?err, "config_consumer.invalid_config_message"); - }) - .unwrap(); - tracing::debug!( - partition = partition.number, - subscription_id = %config.subscription_id, - "redis_config_provider.upserting_config" - ); - manager - .get_service(partition.number) - .get_config_store() - .write() - .unwrap() - .add_config(config); + match rmp_serde::from_slice::(&config_payload) { + Ok(config) => { + tracing::debug!( + partition = partition.number, + subscription_id = %config.subscription_id, + "redis_config_provider.upserting_config" + ); + manager + .get_service(partition.number) + .get_config_store() + .write() + .expect("lock not poisoned") + .add_config(config); + } + Err(err) => tracing::error!(?err, "config_consumer.invalid_config_message"), + } } let partition_update_duration = partition_update_start.elapsed().as_secs_f64(); metrics::histogram!( @@ -329,6 +327,9 @@ pub fn run_config_provider( }) } +// This function is allowed to panic, as an incorrect checker number represents a fatal +// logic error for the uptime checker. +#[allow(clippy::panic)] pub fn determine_owned_partitions(config: &Config) -> HashSet { // Determines which partitions this checker owns based on number of partitions, // number of checkers and checker number diff --git a/src/check_executor.rs b/src/check_executor.rs index 7c411898..75cc6d52 100644 --- a/src/check_executor.rs +++ b/src/check_executor.rs @@ -275,18 +275,19 @@ async fn executor_loop( job_producer, conf.region, ); - if conf.record_task_metrics { + let check_task_result = if conf.record_task_metrics { if conf.checker_parallel { - tokio::spawn(metrics_monitor.instrument(check_fut)) - .await - .expect("The check task should not fail"); + tokio::spawn(metrics_monitor.instrument(check_fut)).await } else { metrics_monitor.instrument(check_fut).await; + Ok(()) } } else { - tokio::spawn(check_fut) - .await - .expect("The check task should not fail"); + tokio::spawn(check_fut).await + }; + + if let Err(err) = check_task_result { + tracing::error!(%err, "executor.check_task_failed"); } let num_running_val = num_running.fetch_sub(1, Ordering::Relaxed) - 1; num_running_gauge.set(num_running_val as f64); @@ -325,9 +326,9 @@ pub(crate) async fn do_check( { Some(check_result) => check_result, None => { - scheduled_check - .record_result(None) - .expect("Check recording channel should exist"); + if let Err(err) = scheduled_check.record_result(None) { + tracing::error!(%err, "executor.do_check.robots_record_results_error"); + } return; } }, diff --git a/src/checker/reqwest_checker.rs b/src/checker/reqwest_checker.rs index 26643fc8..e46813c0 100644 --- a/src/checker/reqwest_checker.rs +++ b/src/checker/reqwest_checker.rs @@ -68,10 +68,7 @@ async fn do_request( check_config: &CheckConfig, sentry_trace: &str, ) -> Result<(Response, RequestId), reqwest::Error> { - let timeout = check_config - .timeout - .to_std() - .expect("Timeout duration should be representable as a duration"); + let timeout = check_config.timeout.to_std().unwrap_or_default(); let url = check_config.url.as_str(); @@ -312,15 +309,17 @@ fn run_assertion( ) -> Check { let comp_assert = assert_cache.get_or_compile(assertion); - if let Err(err) = comp_assert { - tracing::warn!( - "a bad assertion made it to compile from {} : {}", - subscription_id, - err.to_string(), - ); - return Check::assert_compile_failure(&err); - } - let assertion = comp_assert.expect("already tested above"); + let assertion = match comp_assert { + Ok(assertion) => assertion, + Err(err) => { + tracing::warn!( + "a bad assertion made it to compile from {} : {}", + subscription_id, + err.to_string(), + ); + return Check::assert_compile_failure(&err); + } + }; let result = assertion.eval(r.status().as_u16(), r.headers(), body_bytes); @@ -388,8 +387,7 @@ fn to_errored_request_infos( // connection error_ will require some effort, so for now, just bill the full time to the part that // we failed on, leaving the others at zero. - let request_duration = - TimeDelta::from_std(start.elapsed()).expect("duration shouldn't be large"); + let request_duration = TimeDelta::from_std(start.elapsed()).unwrap_or_default(); let zero_timing = Timing { start_us: actual_check_time.timestamp_micros() as u128, duration_us: 0, @@ -397,7 +395,7 @@ fn to_errored_request_infos( let full_duration = Timing { start_us: actual_check_time.timestamp_micros() as u128, - duration_us: request_duration.num_microseconds().unwrap() as u64, + duration_us: request_duration.num_microseconds().unwrap_or(0) as u64, }; let mut dns_timing = zero_timing; @@ -424,7 +422,7 @@ fn to_errored_request_infos( request_body_size_bytes: check.get_config().request_body.len() as u32, url: check.get_config().url.clone(), response_body_size_bytes: 0, - request_duration_us: request_duration.num_microseconds().unwrap() as u64, + request_duration_us: request_duration.num_microseconds().unwrap_or(0) as u64, durations: RequestDurations { dns_lookup: dns_timing, tcp_connection: connection_timing, @@ -493,9 +491,9 @@ impl Checker for ReqwestChecker { let check_result = to_check_result(&self.assert_cache, response, check, &body_bytes); // Our total duration includes the additional processing time, including running the assert. - let duration = TimeDelta::from_std(start.elapsed()).expect("duration shouldn't be large"); + let duration = TimeDelta::from_std(start.elapsed()).unwrap_or_default(); - let final_req = rinfos.last().unwrap().clone(); + let request_info = rinfos.last().cloned(); CheckResult { guid: trace_id, @@ -510,7 +508,7 @@ impl Checker for ReqwestChecker { actual_check_time_us: actual_check_time, duration: Some(duration), duration_us: Some(duration), - request_info: Some(final_req), + request_info, region, request_info_list: rinfos, assertion_failure_data: check_result.assert_path, diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs index 3f06755a..75b8e5fb 100644 --- a/src/endpoint/mod.rs +++ b/src/endpoint/mod.rs @@ -32,15 +32,14 @@ pub fn start_endpoint( let endpoint_port = config.webserver_port; tokio::spawn(async move { - let listener = tokio::net::TcpListener::bind(format!("localhost:{}", endpoint_port)).await; - - let Ok(listener) = listener else { - tracing::error!( - "Could not listen on webserver endpoint: {}", - listener.err().unwrap().to_string() - ); - return; - }; + let listener = + match tokio::net::TcpListener::bind(format!("localhost:{}", endpoint_port)).await { + Ok(listener) => listener, + Err(err) => { + tracing::error!(%err, "endpoint.listen_error"); + return; + } + }; let shutdown_fut = shutdown_signal.cancelled_owned(); let result = axum::serve(listener, app) .with_graceful_shutdown(shutdown_fut) diff --git a/src/main.rs b/src/main.rs index 1c20d477..94dfae27 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +#![deny(clippy::unwrap_used, clippy::panic)] // TODO: We might want to remove this once more stable, but it's just noisy for now. #![allow(dead_code)] mod app; diff --git a/src/manager.rs b/src/manager.rs index a1583283..e89ea951 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -7,7 +7,7 @@ use std::{ sync::{Arc, RwLock}, }; use tokio::sync::mpsc::{self, UnboundedSender}; -use tokio::task::JoinHandle; +use tokio::task::{JoinError, JoinHandle}; use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_util::sync::CancellationToken; @@ -105,7 +105,9 @@ impl PartitionedService { self.shutdown_signal.cancel(); // Okay to unwrap here, since we're just shutting down. - self.scheduler_join_handle.await.unwrap(); + if let Err(err) = self.scheduler_join_handle.await { + tracing::error!(%err, "partionied_service.shutdown_error"); + } tracing::info!(partition = self.partition, "partitioned_service.shutdown"); } } @@ -130,20 +132,20 @@ pub struct ManagerHandle { } impl ManagerHandle { - pub async fn stop(self) { + pub async fn stop(self) -> Result<(), JoinError> { self.shutdown_signal.cancel(); - // Unwrapping here because we're just shutting down; it's okay to fail badly - // at this point. - self.endpoint_join_handle.await.unwrap(); + self.endpoint_join_handle.await?; + + self.consumer_join_handle.await?; - self.consumer_join_handle.await.unwrap(); + self.results_worker.await?; - self.results_worker.await.unwrap(); + self.services_join_handle.await?; - self.services_join_handle.await.unwrap(); + self.executor_join_handle.await?; - self.executor_join_handle.await.unwrap(); + Ok(()) } pub async fn recv_task_finished(&mut self) -> Option> { diff --git a/src/producer/dummy_producer.rs b/src/producer/dummy_producer.rs index 239be954..c5e21041 100644 --- a/src/producer/dummy_producer.rs +++ b/src/producer/dummy_producer.rs @@ -19,7 +19,9 @@ impl ResultsProducer for DummyResultsProducer { let json = serde_json::to_vec(result)?; // Let's actually blow up here, so that we fail schema validation in tests. - self.schema.validate_json(&json).unwrap(); + self.schema + .validate_json(&json) + .expect("invalid json schema in dummy producer"); Ok(()) } } diff --git a/src/types/result.rs b/src/types/result.rs index 1b69bb9a..6f68cf9e 100644 --- a/src/types/result.rs +++ b/src/types/result.rs @@ -6,6 +6,7 @@ use hyper::rt::ConnectionStats; use hyper::stats::AbsoluteDuration; use hyper::stats::RequestStats; use openssl::asn1::Asn1Time; +use openssl::asn1::TimeDiff; use openssl::x509::X509; use sentry::protocol::SpanId; use serde::{Deserialize, Serialize}; @@ -125,10 +126,15 @@ pub fn to_request_info_list(stats: &RequestStats, method: RequestMethod) -> Vec< .get_certificate_bytes() .and_then(|cert| X509::from_der(cert).ok()) .map(|cert| { - let epoch_start = Asn1Time::from_unix(0).unwrap(); - - let not_after_diff = epoch_start.diff(cert.not_after()).unwrap(); - let not_before_diff = epoch_start.diff(cert.not_before()).unwrap(); + let epoch_start = + Asn1Time::from_unix(0).expect("from_unix(0) should never fail"); + + let not_after_diff = epoch_start + .diff(cert.not_after()) + .unwrap_or(TimeDiff { days: 0, secs: 0 }); + let not_before_diff = epoch_start + .diff(cert.not_before()) + .unwrap_or(TimeDiff { days: 0, secs: 0 }); CertificateInfo { not_after_timestamp_s: not_after_diff.secs as u64