Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
allow-unwrap-in-tests = true
10 changes: 6 additions & 4 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,17 @@ pub fn execute() -> io::Result<()> {
}
result = shutdown.recv_task_finished() => {
match result {
None => panic!("tasks_finished channel unexpectedly closed"),
None => tracing::info!("system.recv_task_finished.channel_closed"),
Some(Err(err)) => {
panic!("Error in partition: {err:?}");
tracing::error!(%err, "system.recv_task_finished.shutdown_error");
},
_ => panic!("Unexpected end of task"),
Some(Ok(_)) => tracing::info!("system.recv_task_finished.end_of_task"),
}
}
}
shutdown.stop().await;
if let Err(err) = shutdown.stop().await {
tracing::error!(%err, "system.shutdown_error");
}
tracing::info!("system.shutdown");
Ok(())
}),
Expand Down
55 changes: 28 additions & 27 deletions src/check_config_provider/redis_config_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,15 @@ impl RedisConfigProvider {
.set(config_payloads.len() as f64);

for config_payload in config_payloads {
let config: CheckConfig = rmp_serde::from_slice(&config_payload)
.map_err(|err| {
tracing::error!(?err, "config_consumer.invalid_config_message");
})
.unwrap();
manager
.get_service(partition.number)
.get_config_store()
.write()
.unwrap()
.add_config(config);
match rmp_serde::from_slice(&config_payload) {
Ok(config) => manager
.get_service(partition.number)
.get_config_store()
.write()
.expect("lock not poisoned")
.add_config(config),
Err(err) => tracing::error!(?err, "config_consumer.invalid_config_message"),
}
}
let partition_loading_time = partition_start_loading.elapsed().as_secs_f64();
metrics::histogram!(
Expand Down Expand Up @@ -252,22 +250,22 @@ impl RedisConfigProvider {
.await;

for config_payload in config_payloads {
let config: CheckConfig = rmp_serde::from_slice(&config_payload)
.map_err(|err| {
tracing::error!(?err, "config_consumer.invalid_config_message");
})
.unwrap();
tracing::debug!(
partition = partition.number,
subscription_id = %config.subscription_id,
"redis_config_provider.upserting_config"
);
manager
.get_service(partition.number)
.get_config_store()
.write()
.unwrap()
.add_config(config);
match rmp_serde::from_slice::<CheckConfig>(&config_payload) {
Ok(config) => {
tracing::debug!(
partition = partition.number,
subscription_id = %config.subscription_id,
"redis_config_provider.upserting_config"
);
manager
.get_service(partition.number)
.get_config_store()
.write()
.expect("lock not poisoned")
.add_config(config);
}
Err(err) => tracing::error!(?err, "config_consumer.invalid_config_message"),
}
}
let partition_update_duration = partition_update_start.elapsed().as_secs_f64();
metrics::histogram!(
Expand Down Expand Up @@ -329,6 +327,9 @@ pub fn run_config_provider(
})
}

// This function is allowed to panic, as an incorrect checker number represents a fatal
// logic error for the uptime checker.
#[allow(clippy::panic)]
pub fn determine_owned_partitions(config: &Config) -> HashSet<u16> {
// Determines which partitions this checker owns based on number of partitions,
// number of checkers and checker number
Expand Down
21 changes: 11 additions & 10 deletions src/check_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,18 +275,19 @@ async fn executor_loop(
job_producer,
conf.region,
);
if conf.record_task_metrics {
let check_task_result = if conf.record_task_metrics {
if conf.checker_parallel {
tokio::spawn(metrics_monitor.instrument(check_fut))
.await
.expect("The check task should not fail");
tokio::spawn(metrics_monitor.instrument(check_fut)).await
} else {
metrics_monitor.instrument(check_fut).await;
Ok(())
}
} else {
tokio::spawn(check_fut)
.await
.expect("The check task should not fail");
tokio::spawn(check_fut).await
};
Comment on lines +278 to +287

This comment was marked as outdated.


if let Err(err) = check_task_result {
tracing::error!(%err, "executor.check_task_failed");
}
let num_running_val = num_running.fetch_sub(1, Ordering::Relaxed) - 1;
num_running_gauge.set(num_running_val as f64);
Expand Down Expand Up @@ -325,9 +326,9 @@ pub(crate) async fn do_check(
{
Some(check_result) => check_result,
None => {
scheduled_check
.record_result(None)
.expect("Check recording channel should exist");
if let Err(err) = scheduled_check.record_result(None) {
tracing::error!(%err, "executor.do_check.robots_record_results_error");
}
return;
}
},
Expand Down
38 changes: 18 additions & 20 deletions src/checker/reqwest_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,7 @@ async fn do_request(
check_config: &CheckConfig,
sentry_trace: &str,
) -> Result<(Response, RequestId), reqwest::Error> {
let timeout = check_config
.timeout
.to_std()
.expect("Timeout duration should be representable as a duration");
let timeout = check_config.timeout.to_std().unwrap_or_default();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: A negative check_config.timeout value causes to_std() to fail, and unwrap_or_default() converts it to Duration::ZERO, making all HTTP requests timeout immediately.
Severity: CRITICAL | Confidence: High

🔍 Detailed Analysis

The change from expect() to unwrap_or_default() introduces a silent failure. Negative check_config.timeout values, which are not validated upstream, can exist in production. When a negative TimeDelta is converted using to_std(), it returns an error. The new unwrap_or_default() call handles this by returning Duration::ZERO. This causes the reqwest client to timeout all HTTP check requests immediately, breaking the timeout functionality without any error or alert. The previous implementation would have panicked, which served as a safeguard against invalid data.

💡 Suggested Fix

Instead of using unwrap_or_default(), handle the Err case from to_std() explicitly. Propagate the error, log it and use a reasonable default timeout, or add validation upstream to prevent negative TimeDelta values from reaching this code path. The previous expect() call should be replaced with proper error handling, not a silent failure.

🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: src/checker/reqwest_checker.rs#L71

Potential issue: The change from `expect()` to `unwrap_or_default()` introduces a silent
failure. Negative `check_config.timeout` values, which are not validated upstream, can
exist in production. When a negative `TimeDelta` is converted using `to_std()`, it
returns an error. The new `unwrap_or_default()` call handles this by returning
`Duration::ZERO`. This causes the `reqwest` client to timeout all HTTP check requests
immediately, breaking the timeout functionality without any error or alert. The previous
implementation would have panicked, which served as a safeguard against invalid data.

Did we get this right? 👍 / 👎 to inform future reviews.
Reference ID: 8184931


let url = check_config.url.as_str();

Expand Down Expand Up @@ -312,15 +309,17 @@ fn run_assertion(
) -> Check {
let comp_assert = assert_cache.get_or_compile(assertion);

if let Err(err) = comp_assert {
tracing::warn!(
"a bad assertion made it to compile from {} : {}",
subscription_id,
err.to_string(),
);
return Check::assert_compile_failure(&err);
}
let assertion = comp_assert.expect("already tested above");
let assertion = match comp_assert {
Ok(assertion) => assertion,
Err(err) => {
tracing::warn!(
"a bad assertion made it to compile from {} : {}",
subscription_id,
err.to_string(),
);
return Check::assert_compile_failure(&err);
}
};

let result = assertion.eval(r.status().as_u16(), r.headers(), body_bytes);

Expand Down Expand Up @@ -388,16 +387,15 @@ fn to_errored_request_infos(
// connection error_ will require some effort, so for now, just bill the full time to the part that
// we failed on, leaving the others at zero.

let request_duration =
TimeDelta::from_std(start.elapsed()).expect("duration shouldn't be large");
let request_duration = TimeDelta::from_std(start.elapsed()).unwrap_or_default();
let zero_timing = Timing {
start_us: actual_check_time.timestamp_micros() as u128,
duration_us: 0,
};

let full_duration = Timing {
start_us: actual_check_time.timestamp_micros() as u128,
duration_us: request_duration.num_microseconds().unwrap() as u64,
duration_us: request_duration.num_microseconds().unwrap_or(0) as u64,
};

let mut dns_timing = zero_timing;
Expand All @@ -424,7 +422,7 @@ fn to_errored_request_infos(
request_body_size_bytes: check.get_config().request_body.len() as u32,
url: check.get_config().url.clone(),
response_body_size_bytes: 0,
request_duration_us: request_duration.num_microseconds().unwrap() as u64,
request_duration_us: request_duration.num_microseconds().unwrap_or(0) as u64,
durations: RequestDurations {
dns_lookup: dns_timing,
tcp_connection: connection_timing,
Expand Down Expand Up @@ -493,9 +491,9 @@ impl Checker for ReqwestChecker {
let check_result = to_check_result(&self.assert_cache, response, check, &body_bytes);

// Our total duration includes the additional processing time, including running the assert.
let duration = TimeDelta::from_std(start.elapsed()).expect("duration shouldn't be large");
let duration = TimeDelta::from_std(start.elapsed()).unwrap_or_default();

let final_req = rinfos.last().unwrap().clone();
let request_info = rinfos.last().cloned();

CheckResult {
guid: trace_id,
Expand All @@ -510,7 +508,7 @@ impl Checker for ReqwestChecker {
actual_check_time_us: actual_check_time,
duration: Some(duration),
duration_us: Some(duration),
request_info: Some(final_req),
request_info,
region,
request_info_list: rinfos,
assertion_failure_data: check_result.assert_path,
Expand Down
17 changes: 8 additions & 9 deletions src/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,14 @@ pub fn start_endpoint(
let endpoint_port = config.webserver_port;

tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("localhost:{}", endpoint_port)).await;

let Ok(listener) = listener else {
tracing::error!(
"Could not listen on webserver endpoint: {}",
listener.err().unwrap().to_string()
);
return;
};
let listener =
match tokio::net::TcpListener::bind(format!("localhost:{}", endpoint_port)).await {
Ok(listener) => listener,
Err(err) => {
tracing::error!(%err, "endpoint.listen_error");
return;
}
};
let shutdown_fut = shutdown_signal.cancelled_owned();
let result = axum::serve(listener, app)
.with_graceful_shutdown(shutdown_fut)
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![deny(clippy::unwrap_used, clippy::panic)]
// TODO: We might want to remove this once more stable, but it's just noisy for now.
#![allow(dead_code)]
mod app;
Expand Down
22 changes: 12 additions & 10 deletions src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
sync::{Arc, RwLock},
};
use tokio::sync::mpsc::{self, UnboundedSender};
use tokio::task::JoinHandle;
use tokio::task::{JoinError, JoinHandle};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -105,7 +105,9 @@ impl PartitionedService {
self.shutdown_signal.cancel();

// Okay to unwrap here, since we're just shutting down.
self.scheduler_join_handle.await.unwrap();
if let Err(err) = self.scheduler_join_handle.await {
tracing::error!(%err, "partionied_service.shutdown_error");
}
tracing::info!(partition = self.partition, "partitioned_service.shutdown");
}
}
Expand All @@ -130,20 +132,20 @@ pub struct ManagerHandle {
}

impl ManagerHandle {
pub async fn stop(self) {
pub async fn stop(self) -> Result<(), JoinError> {
self.shutdown_signal.cancel();
// Unwrapping here because we're just shutting down; it's okay to fail badly
// at this point.

self.endpoint_join_handle.await.unwrap();
self.endpoint_join_handle.await?;

self.consumer_join_handle.await?;

self.consumer_join_handle.await.unwrap();
self.results_worker.await?;

self.results_worker.await.unwrap();
self.services_join_handle.await?;

self.services_join_handle.await.unwrap();
self.executor_join_handle.await?;

self.executor_join_handle.await.unwrap();
Ok(())
}

pub async fn recv_task_finished(&mut self) -> Option<anyhow::Result<()>> {
Expand Down
4 changes: 3 additions & 1 deletion src/producer/dummy_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ impl ResultsProducer for DummyResultsProducer {
let json = serde_json::to_vec(result)?;

// Let's actually blow up here, so that we fail schema validation in tests.
self.schema.validate_json(&json).unwrap();
self.schema
.validate_json(&json)
.expect("invalid json schema in dummy producer");
Ok(())
}
}
14 changes: 10 additions & 4 deletions src/types/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use hyper::rt::ConnectionStats;
use hyper::stats::AbsoluteDuration;
use hyper::stats::RequestStats;
use openssl::asn1::Asn1Time;
use openssl::asn1::TimeDiff;
use openssl::x509::X509;
use sentry::protocol::SpanId;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -125,10 +126,15 @@ pub fn to_request_info_list(stats: &RequestStats, method: RequestMethod) -> Vec<
.get_certificate_bytes()
.and_then(|cert| X509::from_der(cert).ok())
.map(|cert| {
let epoch_start = Asn1Time::from_unix(0).unwrap();

let not_after_diff = epoch_start.diff(cert.not_after()).unwrap();
let not_before_diff = epoch_start.diff(cert.not_before()).unwrap();
let epoch_start =
Asn1Time::from_unix(0).expect("from_unix(0) should never fail");

let not_after_diff = epoch_start
.diff(cert.not_after())
.unwrap_or(TimeDiff { days: 0, secs: 0 });
let not_before_diff = epoch_start
.diff(cert.not_before())
.unwrap_or(TimeDiff { days: 0, secs: 0 });

CertificateInfo {
not_after_timestamp_s: not_after_diff.secs as u64
Expand Down
Loading