Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,17 @@ pub fn execute() -> io::Result<()> {
}
result = shutdown.recv_task_finished() => {
match result {
None => panic!("tasks_finished channel unexpectedly closed"),
None => tracing::info!("system.recv_task_finished.channel_closed"),
Some(Err(err)) => {
panic!("Error in partition: {err:?}");
tracing::error!(%err, "system.recv_task_finished.shutdown_error");
},
_ => panic!("Unexpected end of task"),
Some(Ok(_)) => tracing::info!("system.recv_task_finished.end_of_task"),
}
}
}
shutdown.stop().await;
if let Err(err) = shutdown.stop().await {
tracing::error!(%err, "system.shutdown_error");
}
tracing::info!("system.shutdown");
Ok(())
}),
Expand Down
55 changes: 28 additions & 27 deletions src/check_config_provider/redis_config_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,15 @@ impl RedisConfigProvider {
.set(config_payloads.len() as f64);

for config_payload in config_payloads {
let config: CheckConfig = rmp_serde::from_slice(&config_payload)
.map_err(|err| {
tracing::error!(?err, "config_consumer.invalid_config_message");
})
.unwrap();
manager
.get_service(partition.number)
.get_config_store()
.write()
.unwrap()
.add_config(config);
match rmp_serde::from_slice(&config_payload) {
Ok(config) => manager
.get_service(partition.number)
.get_config_store()
.write()
.expect("lock not poisoned")
.add_config(config),
Err(err) => tracing::error!(?err, "config_consumer.invalid_config_message"),
}
}
let partition_loading_time = partition_start_loading.elapsed().as_secs_f64();
metrics::histogram!(
Expand Down Expand Up @@ -252,22 +250,22 @@ impl RedisConfigProvider {
.await;

for config_payload in config_payloads {
let config: CheckConfig = rmp_serde::from_slice(&config_payload)
.map_err(|err| {
tracing::error!(?err, "config_consumer.invalid_config_message");
})
.unwrap();
tracing::debug!(
partition = partition.number,
subscription_id = %config.subscription_id,
"redis_config_provider.upserting_config"
);
manager
.get_service(partition.number)
.get_config_store()
.write()
.unwrap()
.add_config(config);
match rmp_serde::from_slice::<CheckConfig>(&config_payload) {
Ok(config) => {
tracing::debug!(
partition = partition.number,
subscription_id = %config.subscription_id,
"redis_config_provider.upserting_config"
);
manager
.get_service(partition.number)
.get_config_store()
.write()
.expect("lock not poisoned")
.add_config(config);
}
Err(err) => tracing::error!(?err, "config_consumer.invalid_config_message"),
}
}
let partition_update_duration = partition_update_start.elapsed().as_secs_f64();
metrics::histogram!(
Expand Down Expand Up @@ -329,6 +327,9 @@ pub fn run_config_provider(
})
}

// This function is allowed to panic, as an incorrect checker number represents a fatal
// logic error for the uptime checker.
#[allow(clippy::panic)]
pub fn determine_owned_partitions(config: &Config) -> HashSet<u16> {
// Determines which partitions this checker owns based on number of partitions,
// number of checkers and checker number
Expand Down
21 changes: 11 additions & 10 deletions src/check_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,18 +275,19 @@ async fn executor_loop(
job_producer,
conf.region,
);
if conf.record_task_metrics {
let check_task_result = if conf.record_task_metrics {
if conf.checker_parallel {
tokio::spawn(metrics_monitor.instrument(check_fut))
.await
.expect("The check task should not fail");
tokio::spawn(metrics_monitor.instrument(check_fut)).await
} else {
metrics_monitor.instrument(check_fut).await;
Ok(())
}
} else {
tokio::spawn(check_fut)
.await
.expect("The check task should not fail");
tokio::spawn(check_fut).await
};
Comment on lines +278 to +287

This comment was marked as outdated.


if let Err(err) = check_task_result {
tracing::error!(%err, "executor.check_task_failed");
}
let num_running_val = num_running.fetch_sub(1, Ordering::Relaxed) - 1;
num_running_gauge.set(num_running_val as f64);
Expand Down Expand Up @@ -325,9 +326,9 @@ pub(crate) async fn do_check(
{
Some(check_result) => check_result,
None => {
scheduled_check
.record_result(None)
.expect("Check recording channel should exist");
if let Err(err) = scheduled_check.record_result(None) {
tracing::error!(%err, "executor.do_check.robots_record_results_error");
}
return;
}
},
Expand Down
38 changes: 18 additions & 20 deletions src/checker/reqwest_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,7 @@ async fn do_request(
check_config: &CheckConfig,
sentry_trace: &str,
) -> Result<(Response, RequestId), reqwest::Error> {
let timeout = check_config
.timeout
.to_std()
.expect("Timeout duration should be representable as a duration");
let timeout = check_config.timeout.to_std().unwrap_or_default();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: A negative check_config.timeout value causes to_std() to fail, and unwrap_or_default() converts it to Duration::ZERO, making all HTTP requests timeout immediately.
Severity: CRITICAL | Confidence: High

🔍 Detailed Analysis

The change from expect() to unwrap_or_default() introduces a silent failure. Negative check_config.timeout values, which are not validated upstream, can exist in production. When a negative TimeDelta is converted using to_std(), it returns an error. The new unwrap_or_default() call handles this by returning Duration::ZERO. This causes the reqwest client to timeout all HTTP check requests immediately, breaking the timeout functionality without any error or alert. The previous implementation would have panicked, which served as a safeguard against invalid data.

💡 Suggested Fix

Instead of using unwrap_or_default(), handle the Err case from to_std() explicitly. Propagate the error, log it and use a reasonable default timeout, or add validation upstream to prevent negative TimeDelta values from reaching this code path. The previous expect() call should be replaced with proper error handling, not a silent failure.

🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: src/checker/reqwest_checker.rs#L71

Potential issue: The change from `expect()` to `unwrap_or_default()` introduces a silent
failure. Negative `check_config.timeout` values, which are not validated upstream, can
exist in production. When a negative `TimeDelta` is converted using `to_std()`, it
returns an error. The new `unwrap_or_default()` call handles this by returning
`Duration::ZERO`. This causes the `reqwest` client to timeout all HTTP check requests
immediately, breaking the timeout functionality without any error or alert. The previous
implementation would have panicked, which served as a safeguard against invalid data.

Did we get this right? 👍 / 👎 to inform future reviews.
Reference ID: 8184931


let url = check_config.url.as_str();

Expand Down Expand Up @@ -312,15 +309,17 @@ fn run_assertion(
) -> Check {
let comp_assert = assert_cache.get_or_compile(assertion);

if let Err(err) = comp_assert {
tracing::warn!(
"a bad assertion made it to compile from {} : {}",
subscription_id,
err.to_string(),
);
return Check::assert_compile_failure(&err);
}
let assertion = comp_assert.expect("already tested above");
let assertion = match comp_assert {
Ok(assertion) => assertion,
Err(err) => {
tracing::warn!(
"a bad assertion made it to compile from {} : {}",
subscription_id,
err.to_string(),
);
return Check::assert_compile_failure(&err);
}
};

let result = assertion.eval(r.status().as_u16(), r.headers(), body_bytes);

Expand Down Expand Up @@ -388,16 +387,15 @@ fn to_errored_request_infos(
// connection error_ will require some effort, so for now, just bill the full time to the part that
// we failed on, leaving the others at zero.

let request_duration =
TimeDelta::from_std(start.elapsed()).expect("duration shouldn't be large");
let request_duration = TimeDelta::from_std(start.elapsed()).unwrap_or_default();
let zero_timing = Timing {
start_us: actual_check_time.timestamp_micros() as u128,
duration_us: 0,
};

let full_duration = Timing {
start_us: actual_check_time.timestamp_micros() as u128,
duration_us: request_duration.num_microseconds().unwrap() as u64,
duration_us: request_duration.num_microseconds().unwrap_or(0) as u64,
};

let mut dns_timing = zero_timing;
Expand All @@ -424,7 +422,7 @@ fn to_errored_request_infos(
request_body_size_bytes: check.get_config().request_body.len() as u32,
url: check.get_config().url.clone(),
response_body_size_bytes: 0,
request_duration_us: request_duration.num_microseconds().unwrap() as u64,
request_duration_us: request_duration.num_microseconds().unwrap_or(0) as u64,
durations: RequestDurations {
dns_lookup: dns_timing,
tcp_connection: connection_timing,
Expand Down Expand Up @@ -493,9 +491,9 @@ impl Checker for ReqwestChecker {
let check_result = to_check_result(&self.assert_cache, response, check, &body_bytes);

// Our total duration includes the additional processing time, including running the assert.
let duration = TimeDelta::from_std(start.elapsed()).expect("duration shouldn't be large");
let duration = TimeDelta::from_std(start.elapsed()).unwrap_or_default();

let final_req = rinfos.last().unwrap().clone();
let request_info = rinfos.last().cloned();

CheckResult {
guid: trace_id,
Expand All @@ -510,7 +508,7 @@ impl Checker for ReqwestChecker {
actual_check_time_us: actual_check_time,
duration: Some(duration),
duration_us: Some(duration),
request_info: Some(final_req),
request_info,
region,
request_info_list: rinfos,
assertion_failure_data: check_result.assert_path,
Expand Down
17 changes: 8 additions & 9 deletions src/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,14 @@ pub fn start_endpoint(
let endpoint_port = config.webserver_port;

tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("localhost:{}", endpoint_port)).await;

let Ok(listener) = listener else {
tracing::error!(
"Could not listen on webserver endpoint: {}",
listener.err().unwrap().to_string()
);
return;
};
let listener =
match tokio::net::TcpListener::bind(format!("localhost:{}", endpoint_port)).await {
Ok(listener) => listener,
Err(err) => {
tracing::error!(%err, "endpoint.listen_error");
return;
}
};
let shutdown_fut = shutdown_signal.cancelled_owned();
let result = axum::serve(listener, app)
.with_graceful_shutdown(shutdown_fut)
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![deny(clippy::unwrap_used, clippy::panic)]
// TODO: We might want to remove this once more stable, but it's just noisy for now.
#![allow(dead_code)]
mod app;
Expand Down
22 changes: 12 additions & 10 deletions src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
sync::{Arc, RwLock},
};
use tokio::sync::mpsc::{self, UnboundedSender};
use tokio::task::JoinHandle;
use tokio::task::{JoinError, JoinHandle};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -105,7 +105,9 @@ impl PartitionedService {
self.shutdown_signal.cancel();

// Okay to unwrap here, since we're just shutting down.
self.scheduler_join_handle.await.unwrap();
if let Err(err) = self.scheduler_join_handle.await {
tracing::error!(%err, "partionied_service.shutdown_error");
}
tracing::info!(partition = self.partition, "partitioned_service.shutdown");
}
}
Expand All @@ -130,20 +132,20 @@ pub struct ManagerHandle {
}

impl ManagerHandle {
pub async fn stop(self) {
pub async fn stop(self) -> Result<(), JoinError> {
self.shutdown_signal.cancel();
// Unwrapping here because we're just shutting down; it's okay to fail badly
// at this point.

self.endpoint_join_handle.await.unwrap();
self.endpoint_join_handle.await?;

self.consumer_join_handle.await?;

self.consumer_join_handle.await.unwrap();
self.results_worker.await?;

self.results_worker.await.unwrap();
self.services_join_handle.await?;

self.services_join_handle.await.unwrap();
self.executor_join_handle.await?;

self.executor_join_handle.await.unwrap();
Ok(())
}

pub async fn recv_task_finished(&mut self) -> Option<anyhow::Result<()>> {
Expand Down
4 changes: 3 additions & 1 deletion src/producer/dummy_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ impl ResultsProducer for DummyResultsProducer {
let json = serde_json::to_vec(result)?;

// Let's actually blow up here, so that we fail schema validation in tests.
self.schema.validate_json(&json).unwrap();
self.schema
.validate_json(&json)
.expect("invalid json schema in dummy producer");
Ok(())
}
}
14 changes: 10 additions & 4 deletions src/types/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use hyper::rt::ConnectionStats;
use hyper::stats::AbsoluteDuration;
use hyper::stats::RequestStats;
use openssl::asn1::Asn1Time;
use openssl::asn1::TimeDiff;
use openssl::x509::X509;
use sentry::protocol::SpanId;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -125,10 +126,15 @@ pub fn to_request_info_list(stats: &RequestStats, method: RequestMethod) -> Vec<
.get_certificate_bytes()
.and_then(|cert| X509::from_der(cert).ok())
.map(|cert| {
let epoch_start = Asn1Time::from_unix(0).unwrap();

let not_after_diff = epoch_start.diff(cert.not_after()).unwrap();
let not_before_diff = epoch_start.diff(cert.not_before()).unwrap();
let epoch_start =
Asn1Time::from_unix(0).expect("from_unix(0) should never fail");

let not_after_diff = epoch_start
.diff(cert.not_after())
.unwrap_or(TimeDiff { days: 0, secs: 0 });
let not_before_diff = epoch_start
.diff(cert.not_before())
.unwrap_or(TimeDiff { days: 0, secs: 0 });

CertificateInfo {
not_after_timestamp_s: not_after_diff.secs as u64
Expand Down
Loading