Skip to content

Commit

Permalink
Some refactoring in jobs module
Browse files Browse the repository at this point in the history
Signed-off-by: Sergio Castaño Arteaga <[email protected]>
  • Loading branch information
tegioz committed Nov 26, 2024
1 parent ba122f6 commit ee63960
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 99 deletions.
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ tokio-postgres = { version = "0.7.12", features = [
"with-serde_json-1",
"with-time-0_3",
] }
tokio-util = { version = "0.7.12", features = ["rt"] }
tower = "0.5.1"
tower-http = { version = "0.6.2", features = ["auth", "fs", "set-header", "trace"] }
tracing = "0.1.40"
Expand Down
6 changes: 3 additions & 3 deletions clowarden-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ use clowarden_core::{
},
};

/// Environment variable containing Github token.
const GITHUB_TOKEN: &str = "GITHUB_TOKEN";

#[derive(Parser)]
#[command(
version,
Expand Down Expand Up @@ -83,9 +86,6 @@ struct GenerateArgs {
output_file: PathBuf,
}

/// Environment variable containing Github token.
const GITHUB_TOKEN: &str = "GITHUB_TOKEN";

#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
Expand Down
4 changes: 2 additions & 2 deletions clowarden-core/src/services/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! This module defines some types and traits that service handlers
//! implementations will rely upon.
use std::fmt::Debug;
use std::{fmt::Debug, sync::Arc};

use anyhow::Result;
use as_any::AsAny;
Expand All @@ -27,7 +27,7 @@ pub trait ServiceHandler {
}

/// Type alias to represent a service handler trait object.
pub type DynServiceHandler = Box<dyn ServiceHandler + Send + Sync>;
pub type DynServiceHandler = Arc<dyn ServiceHandler + Send + Sync>;

/// Represents a summary of changes detected in the service's state as defined
/// in the configuration from the base to the head reference.
Expand Down
1 change: 1 addition & 0 deletions clowarden-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ thiserror = { workspace = true }
time = { workspace = true }
tokio = { workspace = true }
tokio-postgres = { workspace = true }
tokio-util = { workspace = true }
tower = { workspace = true }
tower-http = { workspace = true }
tracing = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions clowarden-server/src/github.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ use thiserror::Error;

use clowarden_core::cfg::{GitHubApp, Organization};

/// Name used for the check run in GitHub.
const CHECK_RUN_NAME: &str = "CLOWarden";

/// Trait that defines some operations a GH implementation must support.
#[async_trait]
#[cfg_attr(test, automock)]
Expand Down Expand Up @@ -165,9 +168,6 @@ pub(crate) enum PullRequestEventAction {
Other,
}

/// Name used for the check run in GitHub.
const CHECK_RUN_NAME: &str = "CLOWarden";

/// Helper function to create a new ChecksCreateRequest instance.
pub(crate) fn new_checks_create_request(
head_sha: String,
Expand Down
140 changes: 68 additions & 72 deletions clowarden-server/src/jobs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! This module defines the types and functionality needed to schedule and
//! process jobs.
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{collections::HashMap, time::Duration};

use ::time::OffsetDateTime;
use anyhow::{Error, Result};
Expand All @@ -10,10 +10,11 @@ use futures::future::{self, JoinAll};
use octorust::types::{ChecksCreateRequestConclusion, JobStatus, PullRequestData};
use serde::{Deserialize, Serialize};
use tokio::{
sync::{broadcast, mpsc},
sync::mpsc,
task::JoinHandle,
time::{self, sleep, MissedTickBehavior},
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, instrument};

use self::core::github::Source;
Expand All @@ -31,6 +32,9 @@ use crate::{
tmpl,
};

/// How often periodic reconcile jobs should be scheduled (in seconds).
const RECONCILE_FREQUENCY: u64 = 60 * 60; // Every hour

/// Represents a job to be executed.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -117,84 +121,83 @@ impl ValidateInput {
}
}

/// A jobs handler is in charge of executing the received jobs.
pub(crate) struct Handler {
/// A jobs handler is in charge of executing the received jobs. It will create
/// a worker for each organization, plus an additional task to route jobs to
/// the corresponding organization worker. All tasks will stop when the
/// cancellation token is cancelled.
pub(crate) fn handler(
db: &DynDB,
gh: &DynGH,
ghc: &core::github::DynGH,
services: &HashMap<ServiceName, DynServiceHandler>,
mut jobs_rx: mpsc::UnboundedReceiver<Job>,
cancel_token: CancellationToken,
orgs: Vec<Organization>,
) -> JoinAll<JoinHandle<()>> {
let mut handles = Vec::with_capacity(orgs.len() + 1);
let mut orgs_jobs_tx_channels = HashMap::new();

// Create a worker for each organization
for org in orgs {
let (org_jobs_tx, org_jobs_rx) = mpsc::unbounded_channel();
orgs_jobs_tx_channels.insert(org.name, org_jobs_tx);
let org_worker = OrgWorker::new(db.clone(), gh.clone(), ghc.clone(), services.clone());
handles.push(org_worker.run(org_jobs_rx, cancel_token.clone()));
}

// Create a worker to route jobs to the corresponding org worker
let jobs_router = tokio::spawn(async move {
loop {
tokio::select! {
biased;

// Pick next job from the queue and send it to the corresponding org worker
Some(job) = jobs_rx.recv() => {
if let Some(org_jobs_tx) = orgs_jobs_tx_channels.get(job.org_name()) {
_ = org_jobs_tx.send(job);
}
}

// Exit if the handler has been asked to stop
() = cancel_token.cancelled() => break,
}
}
});
handles.push(jobs_router);

future::join_all(handles)
}

/// An organization worker is in charge of processing jobs for a given
/// organization.
struct OrgWorker {
db: DynDB,
gh: DynGH,
ghc: core::github::DynGH,
services: HashMap<ServiceName, DynServiceHandler>,
}

impl Handler {
/// Create a new handler instance.
pub(crate) fn new(
impl OrgWorker {
/// Create a new organization worker instance.
fn new(
db: DynDB,
gh: DynGH,
ghc: core::github::DynGH,
services: HashMap<ServiceName, DynServiceHandler>,
) -> Arc<Self> {
Arc::new(Self {
) -> Self {
Self {
db,
gh,
ghc,
services,
})
}

/// Spawn some tasks to process jobs received on the jobs channel. We will
/// create one worker per organization, plus an additional task to route
/// jobs to the corresponding organization worker. All tasks will stop when
/// notified on the stop channel provided.
pub(crate) fn start(
self: Arc<Self>,
mut jobs_rx: mpsc::UnboundedReceiver<Job>,
stop_tx: &broadcast::Sender<()>,
orgs: Vec<Organization>,
) -> JoinAll<JoinHandle<()>> {
let mut handles = Vec::with_capacity(orgs.len() + 1);
let mut orgs_jobs_tx_channels = HashMap::new();

// Create a worker for each organization
for org in orgs {
let (org_jobs_tx, org_jobs_rx) = mpsc::unbounded_channel();
orgs_jobs_tx_channels.insert(org.name, org_jobs_tx);
let org_worker = self.clone().organization_worker(org_jobs_rx, stop_tx.subscribe());
handles.push(org_worker);
}

// Create a worker to route jobs to the corresponding org worker
let mut stop_rx = stop_tx.subscribe();
let jobs_router = tokio::spawn(async move {
loop {
tokio::select! {
biased;

// Pick next job from the queue and send it to the corresponding org worker
Some(job) = jobs_rx.recv() => {
if let Some(org_jobs_tx) = orgs_jobs_tx_channels.get(job.org_name()) {
_ = org_jobs_tx.send(job);
}
}

// Exit if the handler has been asked to stop
_ = stop_rx.recv() => {
break
}
}
}
});
handles.push(jobs_router);

future::join_all(handles)
}

/// Spawn a worker that will take care of processing jobs for a given
/// organization. The worker will stop when notified on the stop channel
/// provided.
fn organization_worker(
self: Arc<Self>,
/// Run organization worker.
fn run(
self,
mut org_jobs_rx: mpsc::UnboundedReceiver<Job>,
mut stop_rx: broadcast::Receiver<()>,
cancel_token: CancellationToken,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
Expand All @@ -210,9 +213,7 @@ impl Handler {
}

// Exit if the handler has been asked to stop
_ = stop_rx.recv() => {
break
}
() = cancel_token.cancelled() => break,
}
}
})
Expand Down Expand Up @@ -349,14 +350,11 @@ impl Handler {
}
}

/// How often periodic reconcile jobs should be scheduled (in seconds).
const RECONCILE_FREQUENCY: u64 = 60 * 60;

/// A jobs scheduler is in charge of scheduling the execution of some jobs
/// periodically.
pub(crate) fn scheduler(
jobs_tx: mpsc::UnboundedSender<Job>,
mut stop_rx: broadcast::Receiver<()>,
cancel_token: CancellationToken,
orgs: Vec<Organization>,
) -> JoinAll<JoinHandle<()>> {
let scheduler = tokio::spawn(async move {
Expand All @@ -369,9 +367,7 @@ pub(crate) fn scheduler(
biased;

// Exit if the scheduler has been asked to stop
_ = stop_rx.recv() => {
break
}
() = cancel_token.cancelled() => break,

// Schedule reconcile job for each of the registered organizations
_ = reconcile.tick() => {
Expand Down
37 changes: 21 additions & 16 deletions clowarden-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ use std::{collections::HashMap, net::SocketAddr, path::PathBuf, sync::Arc};
use anyhow::{Context, Result};
use clap::Parser;
use config::{Config, File};
use db::DynDB;
use deadpool_postgres::{Config as DbConfig, Runtime};
use futures::future;
use github::DynGH;
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use postgres_openssl::MakeTlsConnector;
use tokio::{
net::TcpListener,
signal,
sync::{broadcast, mpsc},
};
use tokio::{net::TcpListener, signal, sync::mpsc};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
use tracing_subscriber::EnvFilter;

Expand Down Expand Up @@ -69,12 +68,12 @@ async fn main() -> Result<()> {
let connector = MakeTlsConnector::new(builder.build());
let db_cfg: DbConfig = cfg.get("db")?;
let pool = db_cfg.create_pool(Some(Runtime::Tokio1), connector)?;
let db = Arc::new(PgDB::new(pool));
let db: DynDB = Arc::new(PgDB::new(pool));

// Setup GitHub clients
let gh_app: core::cfg::GitHubApp = cfg.get("server.githubApp")?;
let gh = Arc::new(github::GHApi::new(&gh_app).context("error setting up github client")?);
let ghc = Arc::new(
let gh: DynGH = Arc::new(github::GHApi::new(&gh_app).context("error setting up github client")?);
let ghc: core::github::DynGH = Arc::new(
core::github::GHApi::new_with_app_creds(&gh_app).context("error setting up core github client")?,
);

Expand All @@ -84,18 +83,24 @@ async fn main() -> Result<()> {
let svc = Arc::new(services::github::service::SvcApi::new_with_app_creds(&gh_app)?);
services.insert(
services::github::SERVICE_NAME,
Box::new(services::github::Handler::new(ghc.clone(), svc)),
Arc::new(services::github::Handler::new(ghc.clone(), svc)),
);
}

// Setup and launch jobs workers
let (stop_tx, _): (broadcast::Sender<()>, _) = broadcast::channel(1);
let cancel_token = CancellationToken::new();
let (jobs_tx, jobs_rx) = mpsc::unbounded_channel();
let jobs_handler = jobs::Handler::new(db.clone(), gh.clone(), ghc.clone(), services);
let jobs_workers_done = future::join_all([
jobs_handler.start(jobs_rx, &stop_tx, cfg.get("organizations")?),
jobs::scheduler(jobs_tx.clone(), stop_tx.subscribe(), cfg.get("organizations")?),
]);
let jobs_handler = jobs::handler(
&db,
&gh,
&ghc,
&services,
jobs_rx,
cancel_token.clone(),
cfg.get("organizations")?,
);
let jobs_scheduler = jobs::scheduler(jobs_tx.clone(), cancel_token.clone(), cfg.get("organizations")?);
let jobs_workers_done = future::join_all([jobs_handler, jobs_scheduler]);

// Setup and launch HTTP server
let router = handlers::setup_router(&cfg, db.clone(), gh.clone(), jobs_tx)
Expand All @@ -110,7 +115,7 @@ async fn main() -> Result<()> {
}

// Ask jobs workers to stop and wait for them to finish
drop(stop_tx);
cancel_token.cancel();
jobs_workers_done.await;
info!("server stopped");

Expand Down

0 comments on commit ee63960

Please sign in to comment.