From 5a880657c6dba6b43c6752819c4c025f95ec15b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Casta=C3=B1o=20Arteaga?= Date: Thu, 24 Oct 2024 14:14:31 +0200 Subject: [PATCH] Improve support for multiple organizations (#272) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sergio CastaƱo Arteaga --- clowarden-server/src/jobs.rs | 83 +++++++++++++++++++++++++++++++----- clowarden-server/src/main.rs | 2 +- 2 files changed, 73 insertions(+), 12 deletions(-) diff --git a/clowarden-server/src/jobs.rs b/clowarden-server/src/jobs.rs index c25b59b..721ffc3 100644 --- a/clowarden-server/src/jobs.rs +++ b/clowarden-server/src/jobs.rs @@ -17,9 +17,10 @@ use clowarden_core::{ multierror::MultiError, services::{BaseRefConfigStatus, ChangesApplied, ChangesSummary, DynServiceHandler, ServiceName}, }; +use futures::future::{self, JoinAll}; use octorust::types::{ChecksCreateRequestConclusion, JobStatus, PullRequestData}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use tokio::{ sync::{broadcast, mpsc}, task::JoinHandle, @@ -47,6 +48,16 @@ pub(crate) enum Job { Validate(ValidateInput), } +impl Job { + /// Get the name of the organization this job is related to. + pub(crate) fn org_name(&self) -> &str { + match self { + Job::Reconcile(input) => &input.org.name, + Job::Validate(input) => &input.org.name, + } + } +} + /// Information required to process a reconcile job. #[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] pub(crate) struct ReconcileInput { @@ -118,20 +129,68 @@ impl Handler { gh: DynGH, ghc: core::github::DynGH, services: HashMap, - ) -> Self { - Self { + ) -> Arc { + Arc::new(Self { db, gh, ghc, services, - } + }) } - /// Spawn a new task to process jobs received on the jobs channel. The task - /// will stop when notified on the stop channel provided. + /// Spawn some tasks to process jobs received on the jobs channel. We will + /// create one worker per organization, plus an additional task to route + /// jobs to the corresponding organization worker. All tasks will stop when + /// notified on the stop channel provided. pub(crate) fn start( - self, + self: Arc, mut jobs_rx: mpsc::UnboundedReceiver, + stop_tx: &broadcast::Sender<()>, + orgs: Vec, + ) -> JoinAll> { + let mut handles = Vec::with_capacity(orgs.len() + 1); + let mut orgs_jobs_tx_channels = HashMap::new(); + + // Create a worker for each organization + for org in orgs { + let (org_jobs_tx, org_jobs_rx) = mpsc::unbounded_channel(); + orgs_jobs_tx_channels.insert(org.name, org_jobs_tx); + let org_worker = self.clone().organization_worker(org_jobs_rx, stop_tx.subscribe()); + handles.push(org_worker); + } + + // Create a worker to route jobs to the corresponding org worker + let mut stop_rx = stop_tx.subscribe(); + let jobs_router = tokio::spawn(async move { + loop { + tokio::select! { + biased; + + // Pick next job from the queue and send it to the corresponding org worker + Some(job) = jobs_rx.recv() => { + if let Some(org_jobs_tx) = orgs_jobs_tx_channels.get(job.org_name()) { + _ = org_jobs_tx.send(job); + } + } + + // Exit if the handler has been asked to stop + _ = stop_rx.recv() => { + break + } + } + } + }); + handles.push(jobs_router); + + future::join_all(handles) + } + + /// Spawn a worker that will take care of processing jobs for a given + /// organization. The worker will stop when notified on the stop channel + /// provided. + fn organization_worker( + self: Arc, + mut org_jobs_rx: mpsc::UnboundedReceiver, mut stop_rx: broadcast::Receiver<()>, ) -> JoinHandle<()> { tokio::spawn(async move { @@ -140,7 +199,7 @@ impl Handler { biased; // Pick next job from the queue and process it - Some(job) = jobs_rx.recv() => { + Some(job) = org_jobs_rx.recv() => { match job { Job::Reconcile(input) => _ = self.handle_reconcile_job(input).await, Job::Validate(input) => _ = self.handle_validate_job(input).await, @@ -296,8 +355,8 @@ pub(crate) fn scheduler( jobs_tx: mpsc::UnboundedSender, mut stop_rx: broadcast::Receiver<()>, orgs: Vec, -) -> JoinHandle<()> { - tokio::spawn(async move { +) -> JoinAll> { + let scheduler = tokio::spawn(async move { let reconcile_frequency = time::Duration::from_secs(RECONCILE_FREQUENCY); let mut reconcile = time::interval(reconcile_frequency); reconcile.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -322,5 +381,7 @@ pub(crate) fn scheduler( }, } } - }) + }); + + future::join_all(vec![scheduler]) } diff --git a/clowarden-server/src/main.rs b/clowarden-server/src/main.rs index 82e6285..5dd6362 100644 --- a/clowarden-server/src/main.rs +++ b/clowarden-server/src/main.rs @@ -90,7 +90,7 @@ async fn main() -> Result<()> { let (jobs_tx, jobs_rx) = mpsc::unbounded_channel(); let jobs_handler = jobs::Handler::new(db.clone(), gh.clone(), ghc.clone(), services); let jobs_workers_done = future::join_all([ - jobs_handler.start(jobs_rx, stop_tx.subscribe()), + jobs_handler.start(jobs_rx, &stop_tx, cfg.get("organizations")?), jobs::scheduler(jobs_tx.clone(), stop_tx.subscribe(), cfg.get("organizations")?), ]);