diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9482a5b..0a8e0e3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,6 +8,10 @@ on: env: CARGO_TERM_COLOR: always + # Prevent utoipa-swagger-ui build script from hitting the network. + # The "vendored" Cargo feature bundles assets; this env var is a belt-and- + # suspenders guard in case any older path still attempts a fetch. + SWAGGER_UI_DOWNLOAD_URL: "" jobs: # ------------------------------------------------------------------ @@ -22,6 +26,15 @@ jobs: uses: dtolnay/rust-toolchain@stable with: components: rustfmt + - name: Cache Cargo registry + uses: actions/cache@v3 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-fmt-${{ hashFiles('**/Cargo.lock') }} + restore-keys: ${{ runner.os }}-cargo-fmt- - name: Check Formatting run: cargo fmt --all -- --check @@ -38,6 +51,15 @@ jobs: with: components: clippy target: wasm32-unknown-unknown + - name: Cache Cargo registry + uses: actions/cache@v3 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-clippy-${{ hashFiles('**/Cargo.lock') }} + restore-keys: ${{ runner.os }}-cargo-clippy- - name: Install System Dependencies run: sudo apt-get update && sudo apt-get install -y pkg-config libdbus-1-dev libudev-dev - name: Install Soroban CLI @@ -57,12 +79,21 @@ jobs: uses: dtolnay/rust-toolchain@stable with: target: wasm32-unknown-unknown + - name: Cache Cargo registry + uses: actions/cache@v3 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-test-${{ hashFiles('**/Cargo.lock') }} + restore-keys: ${{ runner.os }}-cargo-test- - name: Install System Dependencies run: sudo apt-get update && sudo apt-get install -y pkg-config libdbus-1-dev libudev-dev - name: Install Soroban CLI run: cargo install soroban-cli - name: Run Tests - run: cargo test + run: cargo test --workspace # ------------------------------------------------------------------ # Job 4: Frontend (Lint & Typecheck) diff --git a/Cargo.lock b/Cargo.lock index 5ad13a5..a810190 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -675,6 +675,19 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.10.0" @@ -3076,6 +3089,7 @@ dependencies = [ "axum", "base64 0.22.1", "config", + "dashmap", "dotenvy", "ed25519-dalek", "hex", @@ -3097,6 +3111,7 @@ dependencies = [ "tracing-subscriber", "utoipa", "utoipa-swagger-ui", + "uuid", ] [[package]] @@ -3749,9 +3764,16 @@ dependencies = [ "serde_json", "url", "utoipa", + "utoipa-swagger-ui-vendored", "zip", ] +[[package]] +name = "utoipa-swagger-ui-vendored" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2eebbbfe4093922c2b6734d7c679ebfebd704a0d7e56dfcb0d05818ce28977d" + [[package]] name = "uuid" version = "1.21.0" diff --git a/core/Cargo.toml b/core/Cargo.toml index 319f57b..518d812 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -22,7 +22,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } tower = "0.4" tower-http = { version = "0.5", features = ["cors", "trace"] } utoipa = { version = "4", features = ["axum_extras"] } -utoipa-swagger-ui = { version = "7", features = ["axum"] } +utoipa-swagger-ui = { version = "7", features = ["axum", "vendored"] } reqwest = { version = "0.12", features = ["json"] } base64 = "0.22" hex = "0.4" @@ -31,4 +31,6 @@ jsonwebtoken = "9" ed25519-dalek = "2" sha2 = "0.10" rand = "0.8" +dashmap = "5.5" moka = { version = "0.12", features = ["future"] } +uuid = { version = "1.0", features = ["v4"] } diff --git a/core/src/jobs.rs b/core/src/jobs.rs new file mode 100644 index 0000000..bb020d2 --- /dev/null +++ b/core/src/jobs.rs @@ -0,0 +1,248 @@ +use dashmap::DashMap; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::time::{timeout, Duration}; +use utoipa::ToSchema; +use uuid::Uuid; + +use crate::errors::AppError; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum JobStatus { + Queued, + Processing, + Completed, + Failed, + Cancelled, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobMetadata { + pub id: String, + pub status: JobStatus, + pub created_at: u64, + pub updated_at: u64, + pub expires_at: Option, + pub callback_url: Option, + pub result: Option, + pub error: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct SubmitJobRequest { + pub callback_url: Option, + pub expires_in_seconds: Option, + pub job_data: serde_json::Value, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct JobResponse { + pub job_id: String, + pub status: JobStatus, + pub created_at: u64, + pub expires_at: Option, +} + +#[derive(Clone)] +pub struct JobQueue { + jobs: Arc>, +} + +impl JobQueue { + pub fn new() -> Self { + Self { + jobs: Arc::new(DashMap::new()), + } + } + + pub fn submit_job(&self, request: SubmitJobRequest) -> Result { + let job_id = Uuid::new_v4().to_string(); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| AppError::Internal(format!("Time error: {}", e)))? + .as_secs(); + + let expires_at = request.expires_in_seconds.map(|secs| now + secs); + + let job_metadata = JobMetadata { + id: job_id.clone(), + status: JobStatus::Queued, + created_at: now, + updated_at: now, + expires_at, + callback_url: request.callback_url, + result: None, + error: None, + }; + + self.jobs.insert(job_id.clone(), job_metadata); + + Ok(JobResponse { + job_id, + status: JobStatus::Queued, + created_at: now, + expires_at, + }) + } + + pub async fn get_job(&self, job_id: &str) -> Result { + let job = self + .jobs + .get(job_id) + .ok_or_else(|| AppError::NotFound(format!("Job with ID {} not found", job_id)))?; + + Ok(JobResponse { + job_id: job.id.clone(), + status: job.status.clone(), + created_at: job.created_at, + expires_at: job.expires_at, + }) + } + + pub fn update_job_status( + &self, + job_id: &str, + status: JobStatus, + result: Option, + error: Option, + ) -> Result<(), AppError> { + if let Some(mut job) = self.jobs.get_mut(job_id) { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| AppError::Internal(format!("Time error: {}", e)))? + .as_secs(); + + job.status = status; + job.updated_at = now; + job.result = result; + job.error = error; + + // If job is completed, trigger webhook if callback URL exists + if matches!(job.status, JobStatus::Completed | JobStatus::Failed) { + if let Some(ref callback_url) = job.callback_url.clone() { + // Clone into fully-owned values so the async task is 'static + let owned_url: String = callback_url.clone(); + let job_snapshot: JobMetadata = job.clone(); + tokio::spawn(async move { + Self::trigger_webhook(owned_url, job_snapshot).await; + }); + } + } + } else { + return Err(AppError::NotFound(format!( + "Job with ID {} not found", + job_id + ))); + } + + Ok(()) + } + + pub fn cancel_job(&self, job_id: &str) -> Result<(), AppError> { + self.update_job_status( + job_id, + JobStatus::Cancelled, + None, + Some("Job cancelled".to_string()), + ) + } + + pub fn cleanup_expired_jobs(&self) { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + self.jobs.retain(|_, job| { + // Keep job if it doesn't expire or hasn't expired yet + job.expires_at.map_or(true, |exp| exp > now) + }); + } + + pub async fn trigger_webhook(callback_url: String, job: JobMetadata) { + let client = reqwest::Client::new(); + let payload = serde_json::json!({ + "job_id": job.id, + "status": format!("{:?}", job.status), + "result": job.result, + "error": job.error, + "completed_at": job.updated_at + }); + + match timeout( + Duration::from_secs(30), + client.post(&callback_url).json(&payload).send(), + ) + .await + { + Ok(Ok(response)) => { + if response.status().is_success() { + tracing::info!( + "Webhook sent successfully to {} for job {}", + callback_url, + job.id + ); + } else { + tracing::warn!( + "Webhook failed with status {} for job {} at {}", + response.status(), + job.id, + callback_url + ); + } + } + Ok(Err(e)) => { + tracing::error!("Failed to send webhook for job {}: {}", job.id, e); + } + Err(_) => { + tracing::error!( + "Webhook request timed out for job {} at {}", + job.id, + callback_url + ); + } + } + } + + pub fn get_job_result(&self, job_id: &str) -> Result, AppError> { + let job = self + .jobs + .get(job_id) + .ok_or_else(|| AppError::NotFound(format!("Job with ID {} not found", job_id)))?; + + Ok(job.result.clone()) + } + + pub fn is_job_expired(&self, job_id: &str) -> Result { + let job = self + .jobs + .get(job_id) + .ok_or_else(|| AppError::NotFound(format!("Job with ID {} not found", job_id)))?; + + match job.expires_at { + Some(expiry) => { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| AppError::Internal(format!("Time error: {}", e)))? + .as_secs(); + + Ok(now > expiry) + } + None => Ok(false), + } + } +} + +// Background cleanup task to remove expired jobs +pub async fn start_cleanup_task(job_queue: JobQueue, interval_seconds: u64) { + let job_queue_clone = job_queue.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(interval_seconds)).await; + job_queue_clone.cleanup_expired_jobs(); + tracing::debug!("Expired jobs cleanup completed"); + } + }); +} diff --git a/core/src/main.rs b/core/src/main.rs index c326d73..a3312b7 100644 --- a/core/src/main.rs +++ b/core/src/main.rs @@ -2,14 +2,22 @@ mod auth; mod benchmarks; mod comparison; mod errors; +<<<<<<< main +mod jobs; +======= pub mod insights; +>>>>>>> main mod parser; pub mod rpc_provider; mod simulation; use crate::comparison::{CompareMode, RegressionFlag, RegressionReport, ResourceDelta}; use crate::errors::AppError; +<<<<<<< main +use crate::jobs::{JobQueue, JobResponse, SubmitJobRequest}; +======= use crate::insights::InsightsEngine; +>>>>>>> main use crate::rpc_provider::{ProviderRegistry, RpcProvider}; use crate::simulation::{SimulationCache, SimulationEngine, SimulationResult}; use axum::{ @@ -119,7 +127,11 @@ struct AppState { #[allow(dead_code)] // will be used when RPC simulation is wired into analyze handler engine: SimulationEngine, cache: Arc, +<<<<<<< main + job_queue: Arc, +======= insights_engine: InsightsEngine, +>>>>>>> main } #[derive(Debug, Deserialize, ToSchema)] @@ -492,21 +504,94 @@ fn write_temp_wasm(bytes: &[u8]) -> Result { Ok(path) } +#[utoipa::path( + post, + path = "/jobs/submit", + request_body = SubmitJobRequest, + responses( + (status = 200, description = "Job submitted successfully", body = JobResponse), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Submission failed") + ), + security( + ("jwt" = []) + ), + tag = "Jobs" +)] +async fn submit_job( + State(state): State>, + Json(payload): Json, +) -> Result, AppError> { + tracing::info!( + callback_url = ?payload.callback_url, + "Received job submission request" + ); + + let response = state + .job_queue + .submit_job(payload) + .map_err(|e| AppError::Internal(format!("Job submission failed: {}", e)))?; + + Ok(Json(response)) +} + +#[utoipa::path( + get, + path = "/jobs/{id}", + params(("id" = String, description = "Job ID")), + responses( + (status = 200, description = "Job status retrieved successfully", body = JobResponse), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Job not found"), + (status = 500, description = "Retrieval failed") + ), + security( + ("jwt" = []) + ), + tag = "Jobs" +)] +async fn get_job( + State(state): State>, + axum::extract::Path(job_id): axum::extract::Path, +) -> Result, AppError> { + tracing::info!( + job_id = %job_id, + "Received job status request" + ); + + let response = state + .job_queue + .get_job(&job_id) + .await + .map_err(|e| AppError::Internal(format!("Job retrieval failed: {}", e)))?; + + Ok(Json(response)) +} + #[derive(OpenApi)] #[openapi( +<<<<<<< main + paths(analyze, auth::challenge_handler, auth::verify_handler, submit_job, get_job), +======= paths(analyze, optimize_limits, compare_handler, auth::challenge_handler, auth::verify_handler), +>>>>>>> main components(schemas( AnalyzeRequest, ResourceReport, OptimizeLimitsRequest, OptimizeLimitsResponse, CompareApiResponse, RegressionReport, ResourceDelta, RegressionFlag, auth::ChallengeRequest, auth::ChallengeResponse, auth::VerifyRequest, auth::VerifyResponse, +<<<<<<< main + jobs::SubmitJobRequest, jobs::JobResponse +======= crate::simulation::OptimizationBuffer, crate::simulation::SorobanResources +>>>>>>> main )), tags( (name = "Analysis", description = "Soroban contract resource analysis endpoints"), - (name = "Auth", description = "SEP-10 wallet authentication") + (name = "Auth", description = "SEP-10 wallet authentication"), + (name = "Jobs", description = "Background job processing endpoints") ), info( title = "SoroScope API", @@ -649,15 +734,30 @@ async fn main() { let app_state = Arc::new(AppState { engine: SimulationEngine::with_registry(Arc::clone(®istry)), cache: SimulationCache::new(), +<<<<<<< main + job_queue: Arc::new(JobQueue::new()), +======= insights_engine: InsightsEngine::new(), +>>>>>>> main }); + // Start the background cleanup task for expired jobs + let _cleanup_handle = tokio::spawn(jobs::start_cleanup_task( + (*app_state.job_queue).clone(), + 3600, + )); + let cors = CorsLayer::new().allow_origin(Any); let protected = Router::new() .route("/analyze", post(analyze)) +<<<<<<< main + .route("/jobs/submit", post(submit_job)) + .route("/jobs/:id", get(get_job)) +======= .route("/analyze/optimize-limits", post(optimize_limits)) .route("/analyze/compare", post(compare_handler)) +>>>>>>> main .route_layer(middleware::from_fn(auth::auth_middleware)); let app = Router::new()