Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Required the update of tokio-retry to tokio-retry2 which is a fork of the original (which is now unmaintained?)

The backoff algorithm is now

sleep = random(0, base * 2 ** attempt)
so they will approximate 200ms, 400ms, 800ms, 1600ms

Previously it was

sleep = random(0, base ** attempt)
which was 200ms, 40000ms, 8000000ms, 1600000000ms
  • Loading branch information
chris-smith-zocdoc committed Feb 15, 2025
1 parent dd87b85 commit 3edbb69
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 32 deletions.
9 changes: 4 additions & 5 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ tempfile = { workspace = true }
testutil_mock = { package = "mock", path = "testutil/mock" }
time = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread"] }
tokio-retry = { workspace = true }
tokio-retry2 = { workspace = true }
tokio-util = { workspace = true, features = ["io"] }
tryfuture = { path = "tryfuture" }
ui = { path = "ui" }
Expand Down Expand Up @@ -319,7 +319,7 @@ tempfile = "3.5.0"
terminal_size = "0.1.15"
time = "0.3.30"
tokio = "1.32"
tokio-retry = "0.3"
tokio-retry2 = "0.5"
tokio-rustls = "0.26"
tokio-stream = "0.1"
tokio-util = "0.7"
Expand Down
54 changes: 29 additions & 25 deletions src/rust/engine/src/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use std::time::Duration;
use async_trait::async_trait;
use bytes::{BufMut, Bytes};
use futures::stream::StreamExt;
use futures::TryFutureExt;
use hashing::Digest;
use humansize::{file_size_opts, FileSize};
use reqwest::header::{HeaderMap, HeaderName};
use reqwest::Error;
use store::Store;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::RetryIf;
use tokio_retry2::{strategy::ExponentialFactorBackoff, Retry, RetryError};
use url::Url;

use workunit_store::{in_workunit, Level};
Expand Down Expand Up @@ -216,6 +216,10 @@ async fn attempt_download(
Ok((digest, bytewriter.writer.into_inner().freeze()))
}

pub fn jitter(duration: Duration) -> Duration {
duration.mul_f64(rand::random::<f64>())
}

pub async fn download(
http_client: &reqwest::Client,
store: Store,
Expand All @@ -238,30 +242,30 @@ pub async fn download(
.unwrap()
)),
|_workunit| async move {
let retry_strategy = ExponentialBackoff::from_millis(error_delay.as_millis() as u64)
.map(jitter)
.take(max_attempts.get() - 1);
RetryIf::spawn(
retry_strategy,
|| {
attempt_number += 1;
log::debug!("Downloading {} (attempt #{})", &url, &attempt_number);

attempt_download(
http_client,
&url,
&auth_headers,
file_name.clone(),
expected_digest,
)
},
|err: &StreamingError| {
let is_retryable = matches!(err, StreamingError::Retryable(_));
let retry_strategy =
ExponentialFactorBackoff::from_millis(error_delay.as_millis() as u64, 2.0)
.map(jitter)
.take(max_attempts.get() - 1);

return Retry::spawn(retry_strategy, || {
attempt_number += 1;
log::debug!("Downloading {} (attempt #{})", &url, &attempt_number);
attempt_download(
http_client,
&url,
&auth_headers,
file_name.clone(),
expected_digest,
)
.map_err(|err| {
log::debug!("Error while downloading {}: {}", &url, err);
is_retryable
},
)
.await
match err {
StreamingError::Retryable(msg) => RetryError::transient(msg),
StreamingError::Permanent(msg) => RetryError::permanent(msg),
}
})
})
.await;
}
)
.await?;
Expand Down

0 comments on commit 3edbb69

Please sign in to comment.