From dae81f76b8f0bd494cf87c22c736a51c235c42e0 Mon Sep 17 00:00:00 2001 From: Nwokolo Chukwuemeka Paul Date: Wed, 25 Feb 2026 18:45:40 +0100 Subject: [PATCH 1/4] Background Task Queue & Webhook SystemBackground Task Queue & Webhook System --- Cargo.lock | 15 ++++ core/Cargo.toml | 2 + core/src/jobs.rs | 212 +++++++++++++++++++++++++++++++++++++++++++++++ core/src/main.rs | 76 ++++++++++++++++- 4 files changed, 302 insertions(+), 3 deletions(-) create mode 100644 core/src/jobs.rs diff --git a/Cargo.lock b/Cargo.lock index b2e8737..4435ca9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -674,6 +674,19 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.10.0" @@ -3058,6 +3071,7 @@ dependencies = [ "axum", "base64 0.22.1", "config", + "dashmap", "dotenvy", "ed25519-dalek", "hex", @@ -3078,6 +3092,7 @@ dependencies = [ "tracing-subscriber", "utoipa", "utoipa-swagger-ui", + "uuid", ] [[package]] diff --git a/core/Cargo.toml b/core/Cargo.toml index 87cfcba..56a16ae 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -30,4 +30,6 @@ jsonwebtoken = "9" ed25519-dalek = "2" sha2 = "0.10" rand = "0.8" +dashmap = "5.5" moka = { version = "0.12", features = ["future"] } +uuid = { version = "1.0", features = ["v4"] } diff --git a/core/src/jobs.rs b/core/src/jobs.rs new file mode 100644 index 0000000..171e275 --- /dev/null +++ b/core/src/jobs.rs @@ -0,0 +1,212 @@ +use dashmap::DashMap; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::time::{timeout, Duration}; +use uuid::Uuid; + +use crate::errors::AppError; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum JobStatus { + Queued, + Processing, + Completed, + Failed, + Cancelled, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobMetadata { + pub id: String, + pub status: JobStatus, + pub created_at: u64, + pub updated_at: u64, + pub expires_at: Option, + pub callback_url: Option, + pub result: Option, + pub error: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubmitJobRequest { + pub callback_url: Option, + pub expires_in_seconds: Option, + pub job_data: serde_json::Value, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobResponse { + pub job_id: String, + pub status: JobStatus, + pub created_at: u64, + pub expires_at: Option, +} + +#[derive(Clone)] +pub struct JobQueue { + jobs: Arc>, +} + +impl JobQueue { + pub fn new() -> Self { + Self { + jobs: Arc::new(DashMap::new()), + } + } + + pub fn submit_job(&self, request: SubmitJobRequest) -> Result { + let job_id = Uuid::new_v4().to_string(); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| AppError::Internal(format!("Time error: {}", e)))? + .as_secs(); + + let expires_at = request.expires_in_seconds.map(|secs| now + secs); + + let job_metadata = JobMetadata { + id: job_id.clone(), + status: JobStatus::Queued, + created_at: now, + updated_at: now, + expires_at, + callback_url: request.callback_url, + result: None, + error: None, + }; + + self.jobs.insert(job_id.clone(), job_metadata); + + Ok(JobResponse { + job_id, + status: JobStatus::Queued, + created_at: now, + expires_at, + }) + } + + pub async fn get_job(&self, job_id: &str) -> Result { + let job = self.jobs.get(job_id).ok_or_else(|| { + AppError::NotFound(format!("Job with ID {} not found", job_id)) + })?; + + Ok(JobResponse { + job_id: job.id.clone(), + status: job.status.clone(), + created_at: job.created_at, + expires_at: job.expires_at, + }) + } + + + + pub fn update_job_status(&self, job_id: &str, status: JobStatus, result: Option, error: Option) -> Result<(), AppError> { + if let Some(mut job) = self.jobs.get_mut(job_id) { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| AppError::Internal(format!("Time error: {}", e)))? + .as_secs(); + + job.status = status; + job.updated_at = now; + job.result = result; + job.error = error; + + // If job is completed, trigger webhook if callback URL exists + if matches!(job.status, JobStatus::Completed | JobStatus::Failed) { + if let Some(callback_url) = &job.callback_url { + let job_clone = job.clone(); + tokio::spawn(async move { + Self::trigger_webhook(callback_url, &job_clone).await; + }); + } + } + } else { + return Err(AppError::NotFound(format!("Job with ID {} not found", job_id))); + } + + Ok(()) + } + + pub fn cancel_job(&self, job_id: &str) -> Result<(), AppError> { + self.update_job_status(job_id, JobStatus::Cancelled, None, Some("Job cancelled".to_string())) + } + + pub fn cleanup_expired_jobs(&self) { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + self.jobs.retain(|_, job| { + // Keep job if it doesn't expire or hasn't expired yet + job.expires_at.map_or(true, |exp| exp > now) + }); + } + + pub async fn trigger_webhook(callback_url: &str, job: &JobMetadata) { + let client = reqwest::Client::new(); + let payload = serde_json::json!({ + "job_id": job.id, + "status": format!("{:?}", job.status), + "result": job.result, + "error": job.error, + "completed_at": job.updated_at + }); + + match timeout(Duration::from_secs(30), client.post(callback_url).json(&payload).send()).await { + Ok(Ok(response)) => { + if response.status().is_success() { + tracing::info!("Webhook sent successfully to {} for job {}", callback_url, job.id); + } else { + tracing::warn!("Webhook failed with status {} for job {} at {}", response.status(), job.id, callback_url); + } + } + Ok(Err(e)) => { + tracing::error!("Failed to send webhook for job {}: {}", job.id, e); + } + Err(_) => { + tracing::error!("Webhook request timed out for job {} at {}", job.id, callback_url); + } + } + } + + pub fn get_job_result(&self, job_id: &str) -> Result, AppError> { + let job = self.jobs.get(job_id).ok_or_else(|| { + AppError::NotFound(format!("Job with ID {} not found", job_id)) + })?; + + Ok(job.result.clone()) + } + + pub fn is_job_expired(&self, job_id: &str) -> Result { + let job = self.jobs.get(job_id).ok_or_else(|| { + AppError::NotFound(format!("Job with ID {} not found", job_id)) + })?; + + match job.expires_at { + Some(expiry) => { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| AppError::Internal(format!("Time error: {}", e)))? + .as_secs(); + + Ok(now > expiry) + } + None => Ok(false), + } + } +} + +// Background cleanup task to remove expired jobs +pub async fn start_cleanup_task(job_queue: JobQueue, interval_seconds: u64) { + let job_queue_clone = job_queue.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(interval_seconds)).await; + job_queue_clone.cleanup_expired_jobs(); + tracing::debug!("Expired jobs cleanup completed"); + } + }); +} \ No newline at end of file diff --git a/core/src/main.rs b/core/src/main.rs index ae707a0..21b0a53 100644 --- a/core/src/main.rs +++ b/core/src/main.rs @@ -1,10 +1,12 @@ mod auth; mod benchmarks; mod errors; +mod jobs; mod parser; mod simulation; use crate::errors::AppError; +use crate::jobs::{JobQueue, JobResponse, SubmitJobRequest}; use crate::simulation::{SimulationCache, SimulationEngine, SimulationResult}; use axum::{ extract::{Json, State}, @@ -60,6 +62,7 @@ struct AppState { #[allow(dead_code)] // will be used when RPC simulation is wired into analyze handler engine: SimulationEngine, cache: Arc, + job_queue: Arc, } #[derive(Debug, Deserialize, ToSchema)] @@ -177,17 +180,78 @@ async fn analyze( Ok((headers, Json(to_report(&result)))) } +#[utoipa::path( + post, + path = "/jobs/submit", + request_body = SubmitJobRequest, + responses( + (status = 200, description = "Job submitted successfully", body = JobResponse), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Submission failed") + ), + security( + ("jwt" = []) + ), + tag = "Jobs" +)] +async fn submit_job( + State(state): State>, + Json(payload): Json, +) -> Result, AppError> { + tracing::info!( + callback_url = ?payload.callback_url, + "Received job submission request" + ); + + let response = state.job_queue.submit_job(payload) + .map_err(|e| AppError::Internal(format!("Job submission failed: {}", e)))?; + + Ok(Json(response)) +} + +#[utoipa::path( + get, + path = "/jobs/{id}", + params(("id" = String, description = "Job ID")), + responses( + (status = 200, description = "Job status retrieved successfully", body = JobResponse), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Job not found"), + (status = 500, description = "Retrieval failed") + ), + security( + ("jwt" = []) + ), + tag = "Jobs" +)] +async fn get_job( + State(state): State>, + axum::extract::Path(job_id): axum::extract::Path, +) -> Result, AppError> { + tracing::info!( + job_id = %job_id, + "Received job status request" + ); + + let response = state.job_queue.get_job(&job_id) + .map_err(|e| AppError::Internal(format!("Job retrieval failed: {}", e)))?; + + Ok(Json(response)) +} + #[derive(OpenApi)] #[openapi( - paths(analyze, auth::challenge_handler, auth::verify_handler), + paths(analyze, auth::challenge_handler, auth::verify_handler, submit_job, get_job), components(schemas( AnalyzeRequest, ResourceReport, auth::ChallengeRequest, auth::ChallengeResponse, - auth::VerifyRequest, auth::VerifyResponse + auth::VerifyRequest, auth::VerifyResponse, + jobs::SubmitJobRequest, jobs::JobResponse )), tags( (name = "Analysis", description = "Soroban contract resource analysis endpoints"), - (name = "Auth", description = "SEP-10 wallet authentication") + (name = "Auth", description = "SEP-10 wallet authentication"), + (name = "Jobs", description = "Background job processing endpoints") ), info( title = "SoroScope API", @@ -267,12 +331,18 @@ async fn main() { let app_state = Arc::new(AppState { engine: SimulationEngine::new(config.soroban_rpc_url.clone()), cache: SimulationCache::new(), + job_queue: Arc::new(JobQueue::new()), }); + // Start the background cleanup task for expired jobs + let _cleanup_handle = tokio::spawn(jobs::start_cleanup_task((*app_state.job_queue).clone(), 3600)); + let cors = CorsLayer::new().allow_origin(Any); let protected = Router::new() .route("/analyze", post(analyze)) + .route("/jobs/submit", post(submit_job)) + .route("/jobs/:id", get(get_job)) .route_layer(middleware::from_fn(auth::auth_middleware)); let app = Router::new() From e7353d8d3c044db7662165472c74cf18d6167f3f Mon Sep 17 00:00:00 2001 From: Nwokolo Chukwuemeka Paul Date: Wed, 25 Feb 2026 21:04:08 +0100 Subject: [PATCH 2/4] chore: apply rustfmt formatting --- core/src/jobs.rs | 77 ++++++++++++++++++++++++++++++++++-------------- core/src/main.rs | 13 ++++++-- 2 files changed, 65 insertions(+), 25 deletions(-) diff --git a/core/src/jobs.rs b/core/src/jobs.rs index 171e275..2a46127 100644 --- a/core/src/jobs.rs +++ b/core/src/jobs.rs @@ -57,7 +57,7 @@ impl JobQueue { pub fn submit_job(&self, request: SubmitJobRequest) -> Result { let job_id = Uuid::new_v4().to_string(); - + let now = SystemTime::now() .duration_since(UNIX_EPOCH) .map_err(|e| AppError::Internal(format!("Time error: {}", e)))? @@ -87,9 +87,10 @@ impl JobQueue { } pub async fn get_job(&self, job_id: &str) -> Result { - let job = self.jobs.get(job_id).ok_or_else(|| { - AppError::NotFound(format!("Job with ID {} not found", job_id)) - })?; + let job = self + .jobs + .get(job_id) + .ok_or_else(|| AppError::NotFound(format!("Job with ID {} not found", job_id)))?; Ok(JobResponse { job_id: job.id.clone(), @@ -99,9 +100,13 @@ impl JobQueue { }) } - - - pub fn update_job_status(&self, job_id: &str, status: JobStatus, result: Option, error: Option) -> Result<(), AppError> { + pub fn update_job_status( + &self, + job_id: &str, + status: JobStatus, + result: Option, + error: Option, + ) -> Result<(), AppError> { if let Some(mut job) = self.jobs.get_mut(job_id) { let now = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -112,7 +117,7 @@ impl JobQueue { job.updated_at = now; job.result = result; job.error = error; - + // If job is completed, trigger webhook if callback URL exists if matches!(job.status, JobStatus::Completed | JobStatus::Failed) { if let Some(callback_url) = &job.callback_url { @@ -123,14 +128,22 @@ impl JobQueue { } } } else { - return Err(AppError::NotFound(format!("Job with ID {} not found", job_id))); + return Err(AppError::NotFound(format!( + "Job with ID {} not found", + job_id + ))); } Ok(()) } pub fn cancel_job(&self, job_id: &str) -> Result<(), AppError> { - self.update_job_status(job_id, JobStatus::Cancelled, None, Some("Job cancelled".to_string())) + self.update_job_status( + job_id, + JobStatus::Cancelled, + None, + Some("Job cancelled".to_string()), + ) } pub fn cleanup_expired_jobs(&self) { @@ -155,35 +168,55 @@ impl JobQueue { "completed_at": job.updated_at }); - match timeout(Duration::from_secs(30), client.post(callback_url).json(&payload).send()).await { + match timeout( + Duration::from_secs(30), + client.post(callback_url).json(&payload).send(), + ) + .await + { Ok(Ok(response)) => { if response.status().is_success() { - tracing::info!("Webhook sent successfully to {} for job {}", callback_url, job.id); + tracing::info!( + "Webhook sent successfully to {} for job {}", + callback_url, + job.id + ); } else { - tracing::warn!("Webhook failed with status {} for job {} at {}", response.status(), job.id, callback_url); + tracing::warn!( + "Webhook failed with status {} for job {} at {}", + response.status(), + job.id, + callback_url + ); } } Ok(Err(e)) => { tracing::error!("Failed to send webhook for job {}: {}", job.id, e); } Err(_) => { - tracing::error!("Webhook request timed out for job {} at {}", job.id, callback_url); + tracing::error!( + "Webhook request timed out for job {} at {}", + job.id, + callback_url + ); } } } pub fn get_job_result(&self, job_id: &str) -> Result, AppError> { - let job = self.jobs.get(job_id).ok_or_else(|| { - AppError::NotFound(format!("Job with ID {} not found", job_id)) - })?; + let job = self + .jobs + .get(job_id) + .ok_or_else(|| AppError::NotFound(format!("Job with ID {} not found", job_id)))?; Ok(job.result.clone()) } pub fn is_job_expired(&self, job_id: &str) -> Result { - let job = self.jobs.get(job_id).ok_or_else(|| { - AppError::NotFound(format!("Job with ID {} not found", job_id)) - })?; + let job = self + .jobs + .get(job_id) + .ok_or_else(|| AppError::NotFound(format!("Job with ID {} not found", job_id)))?; match job.expires_at { Some(expiry) => { @@ -191,7 +224,7 @@ impl JobQueue { .duration_since(UNIX_EPOCH) .map_err(|e| AppError::Internal(format!("Time error: {}", e)))? .as_secs(); - + Ok(now > expiry) } None => Ok(false), @@ -209,4 +242,4 @@ pub async fn start_cleanup_task(job_queue: JobQueue, interval_seconds: u64) { tracing::debug!("Expired jobs cleanup completed"); } }); -} \ No newline at end of file +} diff --git a/core/src/main.rs b/core/src/main.rs index 21b0a53..b620110 100644 --- a/core/src/main.rs +++ b/core/src/main.rs @@ -203,7 +203,9 @@ async fn submit_job( "Received job submission request" ); - let response = state.job_queue.submit_job(payload) + let response = state + .job_queue + .submit_job(payload) .map_err(|e| AppError::Internal(format!("Job submission failed: {}", e)))?; Ok(Json(response)) @@ -233,7 +235,9 @@ async fn get_job( "Received job status request" ); - let response = state.job_queue.get_job(&job_id) + let response = state + .job_queue + .get_job(&job_id) .map_err(|e| AppError::Internal(format!("Job retrieval failed: {}", e)))?; Ok(Json(response)) @@ -335,7 +339,10 @@ async fn main() { }); // Start the background cleanup task for expired jobs - let _cleanup_handle = tokio::spawn(jobs::start_cleanup_task((*app_state.job_queue).clone(), 3600)); + let _cleanup_handle = tokio::spawn(jobs::start_cleanup_task( + (*app_state.job_queue).clone(), + 3600, + )); let cors = CorsLayer::new().allow_origin(Any); From ce69f1541984d2aa15550290b780203970aff560 Mon Sep 17 00:00:00 2001 From: Nwokolo Chukwuemeka Paul Date: Thu, 26 Feb 2026 06:13:08 +0100 Subject: [PATCH 3/4] Update main.rs Remove used and wrong file network_config --- core/src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main.rs b/core/src/main.rs index 307b052..6d89ad3 100644 --- a/core/src/main.rs +++ b/core/src/main.rs @@ -9,7 +9,6 @@ mod simulation; use crate::errors::AppError; use crate::jobs::{JobQueue, JobResponse, SubmitJobRequest}; use crate::rpc_provider::{ProviderRegistry, RpcProvider}; -use crate::network_config::NetworkConfig; use crate::simulation::{SimulationCache, SimulationEngine, SimulationResult}; use axum::{ extract::{Json, State}, From a3c07854fdceb9bd3c9e7b8ba657ccf9f8488bf3 Mon Sep 17 00:00:00 2001 From: Nwokolo Chukwuemeka Paul Date: Thu, 26 Feb 2026 13:10:15 +0100 Subject: [PATCH 4/4] fixing the cli issue --- .github/workflows/ci.yml | 33 ++++++++++++++++++++++++++++++++- Cargo.lock | 7 +++++++ core/Cargo.toml | 2 +- core/src/jobs.rs | 17 ++++++++++------- core/src/main.rs | 1 + 5 files changed, 51 insertions(+), 9 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9482a5b..0a8e0e3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,6 +8,10 @@ on: env: CARGO_TERM_COLOR: always + # Prevent utoipa-swagger-ui build script from hitting the network. + # The "vendored" Cargo feature bundles assets; this env var is a belt-and- + # suspenders guard in case any older path still attempts a fetch. + SWAGGER_UI_DOWNLOAD_URL: "" jobs: # ------------------------------------------------------------------ @@ -22,6 +26,15 @@ jobs: uses: dtolnay/rust-toolchain@stable with: components: rustfmt + - name: Cache Cargo registry + uses: actions/cache@v3 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-fmt-${{ hashFiles('**/Cargo.lock') }} + restore-keys: ${{ runner.os }}-cargo-fmt- - name: Check Formatting run: cargo fmt --all -- --check @@ -38,6 +51,15 @@ jobs: with: components: clippy target: wasm32-unknown-unknown + - name: Cache Cargo registry + uses: actions/cache@v3 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-clippy-${{ hashFiles('**/Cargo.lock') }} + restore-keys: ${{ runner.os }}-cargo-clippy- - name: Install System Dependencies run: sudo apt-get update && sudo apt-get install -y pkg-config libdbus-1-dev libudev-dev - name: Install Soroban CLI @@ -57,12 +79,21 @@ jobs: uses: dtolnay/rust-toolchain@stable with: target: wasm32-unknown-unknown + - name: Cache Cargo registry + uses: actions/cache@v3 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-test-${{ hashFiles('**/Cargo.lock') }} + restore-keys: ${{ runner.os }}-cargo-test- - name: Install System Dependencies run: sudo apt-get update && sudo apt-get install -y pkg-config libdbus-1-dev libudev-dev - name: Install Soroban CLI run: cargo install soroban-cli - name: Run Tests - run: cargo test + run: cargo test --workspace # ------------------------------------------------------------------ # Job 4: Frontend (Lint & Typecheck) diff --git a/Cargo.lock b/Cargo.lock index 4435ca9..84e9475 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3745,9 +3745,16 @@ dependencies = [ "serde_json", "url", "utoipa", + "utoipa-swagger-ui-vendored", "zip", ] +[[package]] +name = "utoipa-swagger-ui-vendored" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2eebbbfe4093922c2b6734d7c679ebfebd704a0d7e56dfcb0d05818ce28977d" + [[package]] name = "uuid" version = "1.21.0" diff --git a/core/Cargo.toml b/core/Cargo.toml index 56a16ae..0154fd2 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -21,7 +21,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } tower = "0.4" tower-http = { version = "0.5", features = ["cors", "trace"] } utoipa = { version = "4", features = ["axum_extras"] } -utoipa-swagger-ui = { version = "7", features = ["axum"] } +utoipa-swagger-ui = { version = "7", features = ["axum", "vendored"] } reqwest = { version = "0.12", features = ["json"] } base64 = "0.22" hex = "0.4" diff --git a/core/src/jobs.rs b/core/src/jobs.rs index 2a46127..bb020d2 100644 --- a/core/src/jobs.rs +++ b/core/src/jobs.rs @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::time::{timeout, Duration}; +use utoipa::ToSchema; use uuid::Uuid; use crate::errors::AppError; @@ -28,14 +29,14 @@ pub struct JobMetadata { pub error: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct SubmitJobRequest { pub callback_url: Option, pub expires_in_seconds: Option, pub job_data: serde_json::Value, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct JobResponse { pub job_id: String, pub status: JobStatus, @@ -120,10 +121,12 @@ impl JobQueue { // If job is completed, trigger webhook if callback URL exists if matches!(job.status, JobStatus::Completed | JobStatus::Failed) { - if let Some(callback_url) = &job.callback_url { - let job_clone = job.clone(); + if let Some(ref callback_url) = job.callback_url.clone() { + // Clone into fully-owned values so the async task is 'static + let owned_url: String = callback_url.clone(); + let job_snapshot: JobMetadata = job.clone(); tokio::spawn(async move { - Self::trigger_webhook(callback_url, &job_clone).await; + Self::trigger_webhook(owned_url, job_snapshot).await; }); } } @@ -158,7 +161,7 @@ impl JobQueue { }); } - pub async fn trigger_webhook(callback_url: &str, job: &JobMetadata) { + pub async fn trigger_webhook(callback_url: String, job: JobMetadata) { let client = reqwest::Client::new(); let payload = serde_json::json!({ "job_id": job.id, @@ -170,7 +173,7 @@ impl JobQueue { match timeout( Duration::from_secs(30), - client.post(callback_url).json(&payload).send(), + client.post(&callback_url).json(&payload).send(), ) .await { diff --git a/core/src/main.rs b/core/src/main.rs index 6d89ad3..d9dab03 100644 --- a/core/src/main.rs +++ b/core/src/main.rs @@ -293,6 +293,7 @@ async fn get_job( let response = state .job_queue .get_job(&job_id) + .await .map_err(|e| AppError::Internal(format!("Job retrieval failed: {}", e)))?; Ok(Json(response))