Skip to content

Commit

Permalink
fix: fix interval usage in various components
Browse files Browse the repository at this point in the history
Partially resolves #17
  • Loading branch information
0xLE committed Sep 1, 2024
1 parent 4dee94e commit e6e0943
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 17 deletions.
10 changes: 5 additions & 5 deletions crates/kriger_fetcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use kriger_common::{messaging, models};
use tokio::task::JoinSet;
use tokio::time::MissedTickBehavior;
use tokio::{select, time};
use tracing::{error, info, instrument, warn};
use tracing::{debug, error, info, instrument, warn};

pub async fn main(runtime: AppRuntime) -> Result<()> {
info!("starting data fetcher");
Expand Down Expand Up @@ -45,13 +45,11 @@ pub async fn main(runtime: AppRuntime) -> Result<()> {

let fetcher = config.into_fetcher();

// TODO: Un-hardcode this
// TODO: Un-hardcode this and align the start to the start of a tick
let tick_duration = time::Duration::from_secs(20);
let mut interval = time::interval_at(time::Instant::now(), tick_duration);
let mut interval = time::interval(tick_duration);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

interval.tick().await; // The first tick will immediately complete

let options = FetchOptions {
require_hints: true,
};
Expand All @@ -62,6 +60,8 @@ pub async fn main(runtime: AppRuntime) -> Result<()> {
return Ok(())
}
}

debug!("fetcher tick");
let data = match fetcher.fetch(&options, &services).await {
Ok(data) => data,
Err(error) => {
Expand Down
10 changes: 5 additions & 5 deletions crates/kriger_scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async fn handle_scheduling(
} else {
info!(
"the competition starts in {:} s",
time_since_start.num_seconds()
-time_since_start.num_seconds()
);
}

Expand All @@ -115,10 +115,10 @@ async fn handle_scheduling(
let mut interval = interval_at(instant, tick);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

// The first tick completes immediately.
interval.tick().await;

// TODO: Add scheduling for services with hints
if time_since_start > chrono::Duration::seconds(0) {
// The first tick completes immediately if the interval has started.
interval.tick().await;
}

loop {
select! {
Expand Down
11 changes: 4 additions & 7 deletions crates/kriger_submitter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use kriger_common::models;
use kriger_common::server::runtime::AppRuntime;
use std::time::Duration;
use tokio::select;
use tokio::time::{interval_at, Instant, MissedTickBehavior};
use tokio::time::MissedTickBehavior;
use tracing::{debug, error, info, instrument, warn};

pub async fn main(runtime: AppRuntime) -> eyre::Result<()> {
Expand Down Expand Up @@ -72,21 +72,18 @@ pub async fn main(runtime: AppRuntime) -> eyre::Result<()> {

let submitter = config.inner.into_submitter();

let mut interval = interval_at(Instant::now(), Duration::from_secs(config.interval));
let mut interval = tokio::time::interval(Duration::from_secs(config.interval));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

// The first tick completes immediately
interval.tick().await;

loop {
debug!("submitter tick");
select! {
_ = interval.tick() => {}
_ = runtime.cancellation_token.cancelled() => {
return Ok(());
}
_ = interval.tick() => {}
}

debug!("submitter tick");
// TODO: Is `PollPending` the best solution here, or should we just use the consumer's pending count?
// TODO: Handle backpressure somehow
let requests = PollPending::new(&mut flag_submissions, config.batch).await;
Expand Down

0 comments on commit e6e0943

Please sign in to comment.