Skip to content

Commit

Permalink
feat(worker): use backoff to control retry
Browse files Browse the repository at this point in the history
  • Loading branch information
eatradish committed Jun 30, 2024
1 parent b62f222 commit 0fdb86a
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 39 deletions.
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tokio-tungstenite = { version = "0.21.0", features = ["rustls", "rustls-tls-nati
futures-util = "0.3.30"
flume = "0.11.0"
tungstenite = { version = "0.21.0", features = ["rustls"] }
backoff = { version = "0.4", features = ["tokio"] }

[build-dependencies]
vergen = { version = "8.3.1", features = ["build", "cargo", "git", "gitcl", "rustc", "si"] }
55 changes: 25 additions & 30 deletions worker/src/build.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::{get_memory_bytes, Args};
use backoff::ExponentialBackoff;
use chrono::Local;
use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse};
use flume::{unbounded, Receiver, Sender};
use futures_util::{future::try_join3, StreamExt};
use log::{error, info, warn};
use reqwest::Url;
use reqwest::{Client, Url};
use std::{
path::Path,
path::{Path, PathBuf},
process::{Output, Stdio},
time::{Duration, Instant},
};
Expand Down Expand Up @@ -386,7 +387,7 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {

info!("Receiving new messages");

let client = reqwest::Client::builder()
let client = Client::builder()
.timeout(Duration::from_secs(30))
.build()
.unwrap();
Expand Down Expand Up @@ -420,12 +421,12 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
}

async fn poll_server(
client: reqwest::Client,
client: Client,
args: &Args,
req: WorkerPollRequest,
tree_path: std::path::PathBuf,
tree_path: PathBuf,
tx: Sender<Message>,
) -> Result<(), anyhow::Error> {
) -> anyhow::Result<()> {
loop {
if let Some(job) = client
.post(format!("{}/api/worker/poll", args.server))
Expand Down Expand Up @@ -467,32 +468,26 @@ async fn poll_server(
}
}

pub async fn build_worker(args: Args) -> ! {
loop {
info!("Starting build worker");
if let Err(err) = build_worker_inner(&args).await {
warn!("Got error running heartbeat worker: {}", err);
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
pub async fn build_worker(args: Args) -> anyhow::Result<()> {
backoff::future::retry(ExponentialBackoff::default(), || async {
info!("Starting build worker ...");
Ok(build_worker_inner(&args).await?)
})
.await
}

pub async fn websocket_connect(rx: Receiver<Message>, ws: Url) -> anyhow::Result<()> {
loop {
info!("Starting websocket connect to {:?}", ws);
match connect_async(ws.as_str()).await {
Ok((ws_stream, _)) => {
let (write, _) = ws_stream.split();
let rx = rx.clone().into_stream();
if let Err(e) = rx.map(Ok).forward(write).await {
warn!("{e}");
}
}
Err(err) => {
warn!("Got error connecting to websocket: {}", err);
}
}
backoff::future::retry(ExponentialBackoff::default(), || async {
Ok(websocket_connect_inner(rx.clone(), ws.as_str()).await?)
})
.await
}

tokio::time::sleep(Duration::from_secs(5)).await;
}
async fn websocket_connect_inner(rx: Receiver<Message>, ws: &str) -> anyhow::Result<()> {
info!("Starting websocket connect to {}", ws);
let (ws_stream, _) = connect_async(ws).await?;
let (write, _) = ws_stream.split();
let rx = rx.clone().into_stream();

Ok(rx.map(Ok).forward(write).await?)
}
16 changes: 8 additions & 8 deletions worker/src/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{get_memory_bytes, Args};
use backoff::ExponentialBackoff;
use common::WorkerHeartbeatRequest;
use log::{info, warn};
use std::{
Expand Down Expand Up @@ -52,13 +53,12 @@ pub async fn heartbeat_worker_inner(args: &Args) -> anyhow::Result<()> {
}
}

pub async fn heartbeat_worker(args: Args) -> ! {
pub async fn heartbeat_worker(args: Args) -> anyhow::Result<()> {
tokio::spawn(internet_connectivity_worker());
loop {
info!("Starting heartbeat worker");
if let Err(err) = heartbeat_worker_inner(&args).await {
warn!("Got error running heartbeat worker: {}", err);
}
tokio::time::sleep(Duration::from_secs(5)).await;
}

backoff::future::retry(ExponentialBackoff::default(), || async {
warn!("Retry send heartbeat ...");
Ok(heartbeat_worker_inner(&args).await?)
})
.await
}
2 changes: 1 addition & 1 deletion worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ async fn main() -> anyhow::Result<()> {
s.refresh_memory();

tokio::spawn(heartbeat_worker(args.clone()));
build_worker(args.clone()).await;
build_worker(args.clone()).await?;
Ok(())
}

0 comments on commit 0fdb86a

Please sign in to comment.