diff --git a/core/Cargo.toml b/core/Cargo.toml index baef064..0f5f48b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -32,6 +32,6 @@ ed25519-dalek = "2" sha2 = "0.10" rand = "0.8" moka = { version = "0.12", features = ["future"] } -dashmap = "6" +sqlx = { version = "0.7", features = ["runtime-tokio", "tls-native-tls", "postgres", "sqlite", "uuid", "chrono", "migrate"] } uuid = { version = "1", features = ["v4", "serde"] } chrono = { version = "0.4", features = ["serde"] } diff --git a/core/migrations/001_create_jobs_table.sql b/core/migrations/001_create_jobs_table.sql new file mode 100644 index 0000000..0d8c3ca --- /dev/null +++ b/core/migrations/001_create_jobs_table.sql @@ -0,0 +1,42 @@ +-- Create jobs table for async task queue +CREATE TABLE IF NOT EXISTS jobs ( + id UUID PRIMARY KEY, + job_type VARCHAR(50) NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'queued', + payload JSONB NOT NULL, + result JSONB, + progress_percent INTEGER NOT NULL DEFAULT 0, + progress_message VARCHAR(255) NOT NULL DEFAULT 'Queued', + webhook_url VARCHAR(500), + webhook_headers JSONB, + webhook_secret VARCHAR(255), + error_message TEXT, + error_type VARCHAR(50), + timeout_secs INTEGER NOT NULL DEFAULT 300, + retry_count INTEGER NOT NULL DEFAULT 0, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + started_at TIMESTAMP WITH TIME ZONE, + completed_at TIMESTAMP WITH TIME ZONE, + updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() +); + +-- Create indexes for efficient querying +CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status); +CREATE INDEX IF NOT EXISTS idx_jobs_created_at ON jobs(created_at); +CREATE INDEX IF NOT EXISTS idx_jobs_status_created_at ON jobs(status, created_at); + +-- Create function to update updated_at timestamp +CREATE OR REPLACE FUNCTION update_updated_at_column() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = NOW(); + RETURN NEW; +END; +$$ language 'plpgsql'; + +-- Create trigger to automatically update updated_at +DROP TRIGGER IF EXISTS update_jobs_updated_at ON jobs; +CREATE TRIGGER update_jobs_updated_at + BEFORE UPDATE ON jobs + FOR EACH ROW + EXECUTE FUNCTION update_updated_at_column(); diff --git a/core/src/jobs.rs b/core/src/jobs.rs index 521c574..5c57bc4 100644 --- a/core/src/jobs.rs +++ b/core/src/jobs.rs @@ -2,20 +2,49 @@ use crate::errors::AppError; use crate::insights::InsightsEngine; use crate::simulation::{SimulationEngine, SimulationResult, SorobanResources}; use chrono::{DateTime, Utc}; -use dashmap::DashMap; use reqwest::Client; use serde::{Deserialize, Serialize}; +use serde_json::Value; +use sqlx::{PgPool, Row, SqlitePool}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use tokio::sync::mpsc; use tokio::time::interval; use tracing; use utoipa::ToSchema; use uuid::Uuid; +/// Database pool type - supports both PostgreSQL and SQLite +#[derive(Clone)] +pub enum DbPool { + Postgres(PgPool), + Sqlite(SqlitePool), +} + +impl DbPool { + pub async fn execute(&self, query: &str) -> Result { + match self { + DbPool::Postgres(pool) => { + let result = sqlx::query(query).execute(pool).await?; + Ok(sqlx::any::AnyQueryResult { + rows_affected: result.rows_affected(), + last_insert_id: None, + }) + } + DbPool::Sqlite(pool) => { + let result = sqlx::query(query).execute(pool).await?; + Ok(sqlx::any::AnyQueryResult { + rows_affected: result.rows_affected(), + last_insert_id: result.last_insert_rowid().map(|id| id as u64), + }) + } + } + } +} + /// Unique identifier for a job -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, ToSchema)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, ToSchema, sqlx::Type)] +#[sqlx(transparent)] pub struct JobId(pub Uuid); impl JobId { @@ -45,30 +74,24 @@ impl std::str::FromStr for JobId { } /// Status of a job in its lifecycle -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema, sqlx::Type)] +#[sqlx(rename_all = "SCREAMING_SNAKE_CASE")] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] pub enum JobStatus { - /// Job is waiting to be processed Queued, - /// Job is currently being processed Processing, - /// Job completed successfully Completed, - /// Job failed with an error Failed, - /// Job was cancelled by user Cancelled, } /// Type of analysis job -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, sqlx::Type)] +#[sqlx(rename_all = "snake_case")] #[serde(rename_all = "snake_case")] pub enum JobType { - /// Single contract analysis Analyze, - /// Compare two contracts Compare, - /// Optimize resource limits OptimizeLimits, } @@ -101,41 +124,24 @@ pub enum JobPayload { /// Progress information for a job #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct JobProgress { - /// Progress percentage (0-100) - pub percent: u8, - /// Human-readable status message + pub percent: i32, pub message: String, - /// Timestamp of last update pub updated_at: DateTime, } -impl JobProgress { - pub fn new(percent: u8, message: impl Into) -> Self { - Self { - percent, - message: message.into(), - updated_at: Utc::now(), - } - } -} - /// Result of a completed job #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "snake_case", tag = "status", content = "data")] pub enum JobResult { Success { - /// Resource report for analyze jobs #[serde(skip_serializing_if = "Option::is_none")] resources: Option, - /// Full simulation result #[serde(skip_serializing_if = "Option::is_none")] simulation_result: Option, - /// Optimization report #[serde(skip_serializing_if = "Option::is_none")] - optimization: Option, - /// Comparison report + optimization: Option, #[serde(skip_serializing_if = "Option::is_none")] - comparison: Option, + comparison: Option, }, Failed { error: String, @@ -146,96 +152,59 @@ pub enum JobResult { /// Webhook configuration for job notifications #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct WebhookConfig { - /// URL to POST when job completes or fails pub callback_url: String, - /// Optional custom headers to include #[serde(skip_serializing_if = "Option::is_none")] pub headers: Option>, - /// Secret for HMAC signature (optional) #[serde(skip_serializing_if = "Option::is_none")] pub secret: Option, } /// A job in the queue -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, sqlx::FromRow)] pub struct Job { pub id: JobId, pub job_type: JobType, pub status: JobStatus, - pub payload: JobPayload, - pub progress: JobProgress, - #[serde(skip_serializing_if = "Option::is_none")] - pub result: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub webhook: Option, + pub payload: Value, + pub result: Option, + pub progress_percent: i32, + pub progress_message: String, + pub webhook_url: Option, + pub webhook_headers: Option, + pub webhook_secret: Option, + pub error_message: Option, + pub error_type: Option, + pub timeout_secs: i32, + pub retry_count: i32, pub created_at: DateTime, - #[serde(skip_serializing_if = "Option::is_none")] pub started_at: Option>, - #[serde(skip_serializing_if = "Option::is_none")] pub completed_at: Option>, - /// Job timeout in seconds - pub timeout_secs: u64, - /// Error message if job failed - #[serde(skip_serializing_if = "Option::is_none")] - pub error_message: Option, + pub updated_at: DateTime, } impl Job { - pub fn new( - id: JobId, - job_type: JobType, - payload: JobPayload, - webhook: Option, - timeout_secs: u64, - ) -> Self { - Self { - id, - job_type, - status: JobStatus::Queued, - payload, - progress: JobProgress::new(0, "Queued"), - result: None, - webhook, - created_at: Utc::now(), - started_at: None, - completed_at: None, - timeout_secs, - error_message: None, + pub fn get_progress(&self) -> JobProgress { + JobProgress { + percent: self.progress_percent, + message: self.progress_message.clone(), + updated_at: self.updated_at, } } - pub fn start(&mut self) { - self.status = JobStatus::Processing; - self.started_at = Some(Utc::now()); - self.progress = JobProgress::new(10, "Processing started"); + pub fn get_result(&self) -> Option { + self.result.as_ref().and_then(|r| serde_json::from_value(r.clone()).ok()) } - pub fn complete(&mut self, result: JobResult) { - self.status = JobStatus::Completed; - self.result = Some(result); - self.completed_at = Some(Utc::now()); - self.progress = JobProgress::new(100, "Completed"); + pub fn get_payload(&self) -> Option { + serde_json::from_value(self.payload.clone()).ok() } - pub fn fail(&mut self, error: String, error_type: String) { - self.status = JobStatus::Failed; - self.error_message = Some(error.clone()); - self.result = Some(JobResult::Failed { - error, - error_type, - }); - self.completed_at = Some(Utc::now()); - self.progress = JobProgress::new(0, "Failed"); - } - - pub fn cancel(&mut self) { - self.status = JobStatus::Cancelled; - self.completed_at = Some(Utc::now()); - self.progress = JobProgress::new(0, "Cancelled"); - } - - pub fn update_progress(&mut self, percent: u8, message: impl Into) { - self.progress = JobProgress::new(percent, message); + pub fn get_webhook_config(&self) -> Option { + self.webhook_url.as_ref().map(|url| WebhookConfig { + callback_url: url.clone(), + headers: self.webhook_headers.as_ref().and_then(|h| serde_json::from_value(h.clone()).ok()), + secret: self.webhook_secret.clone(), + }) } } @@ -246,6 +215,8 @@ pub enum JobError { NotFound(JobId), #[error("Job cannot be cancelled in status: {0:?}")] CannotCancel(JobStatus), + #[error("Database error: {0}")] + Database(#[from] sqlx::Error), #[error("Job processing failed: {0}")] ProcessingFailed(String), #[error("Webhook delivery failed: {0}")] @@ -255,49 +226,61 @@ pub enum JobError { /// Configuration for the job queue #[derive(Debug, Clone)] pub struct JobQueueConfig { - /// Default job timeout in seconds pub job_timeout_secs: u64, - /// How often to run cleanup (seconds) pub cleanup_interval_secs: u64, - /// How long to retain completed jobs (seconds) pub retention_secs: u64, - /// Webhook call timeout (seconds) pub webhook_timeout_secs: u64, - /// Max webhook retry attempts pub webhook_max_retries: u32, + pub max_concurrent_jobs: usize, } impl Default for JobQueueConfig { fn default() -> Self { Self { - job_timeout_secs: 300, // 5 minutes - cleanup_interval_secs: 3600, // 1 hour - retention_secs: 3600, // 1 hour - webhook_timeout_secs: 10, // 10 seconds + job_timeout_secs: 300, + cleanup_interval_secs: 3600, + retention_secs: 3600, + webhook_timeout_secs: 10, webhook_max_retries: 3, + max_concurrent_jobs: 10, } } } -/// Thread-safe job queue using DashMap +/// SQL-based job queue pub struct JobQueue { - jobs: Arc>, - sender: mpsc::Sender, + pool: DbPool, config: JobQueueConfig, } impl JobQueue { - pub fn new(config: JobQueueConfig) -> (Self, mpsc::Receiver) { - let (sender, receiver) = mpsc::channel(1000); - let jobs = Arc::new(DashMap::new()); - - let queue = Self { - jobs: Arc::clone(&jobs), - sender, - config, + pub async fn new(database_url: &str, config: JobQueueConfig) -> Result { + let pool = if database_url.starts_with("postgres://") { + let pool = PgPool::connect(database_url).await?; + DbPool::Postgres(pool) + } else { + let pool = SqlitePool::connect(database_url).await?; + DbPool::Sqlite(pool) }; - (queue, receiver) + // Run migrations + Self::run_migrations(&pool).await?; + + Ok(Self { pool, config }) + } + + async fn run_migrations(pool: &DbPool) -> Result<(), JobError> { + let migration_sql = include_str!("../migrations/001_create_jobs_table.sql"); + + // Split and execute each statement + for statement in migration_sql.split(";") { + let stmt = statement.trim(); + if !stmt.is_empty() { + pool.execute(stmt).await?; + } + } + + Ok(()) } /// Submit a new job to the queue @@ -306,81 +289,304 @@ impl JobQueue { job_type: JobType, payload: JobPayload, webhook: Option, - ) -> JobId { + ) -> Result { let id = JobId::new(); - let job = Job::new( - id, - job_type, - payload, - webhook, - self.config.job_timeout_secs, - ); + let payload_json = serde_json::to_value(&payload).map_err(|e| { + JobError::ProcessingFailed(format!("Failed to serialize payload: {}", e)) + })?; + + let (webhook_url, webhook_headers, webhook_secret) = match webhook { + Some(w) => ( + Some(w.callback_url), + w.headers.map(|h| serde_json::to_value(h).unwrap_or_default()), + w.secret, + ), + None => (None, None, None), + }; - self.jobs.insert(id, job); - - // Send job ID to worker - if let Err(e) = self.sender.send(id).await { - tracing::error!("Failed to send job to worker: {}", e); + match &self.pool { + DbPool::Postgres(pool) => { + sqlx::query( + r#" + INSERT INTO jobs (id, job_type, status, payload, webhook_url, webhook_headers, webhook_secret, timeout_secs) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + "# + ) + .bind(&id) + .bind(&job_type) + .bind(&JobStatus::Queued) + .bind(&payload_json) + .bind(&webhook_url) + .bind(&webhook_headers) + .bind(&webhook_secret) + .bind(self.config.job_timeout_secs as i32) + .execute(pool) + .await?; + } + DbPool::Sqlite(pool) => { + sqlx::query( + r#" + INSERT INTO jobs (id, job_type, status, payload, webhook_url, webhook_headers, webhook_secret, timeout_secs) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) + "# + ) + .bind(&id.0.to_string()) + .bind(format!("{:?}", job_type)) + .bind("QUEUED") + .bind(&payload_json) + .bind(&webhook_url) + .bind(&webhook_headers) + .bind(&webhook_secret) + .bind(self.config.job_timeout_secs as i32) + .execute(pool) + .await?; + } } tracing::info!(job_id = %id, "Job submitted"); - id + Ok(id) } - /// Get the current status of a job - pub fn get_status(&self, id: &JobId) -> Option { - self.jobs.get(id).map(|entry| entry.clone()) + /// Get a job by ID + pub async fn get(&self, id: &JobId) -> Result, JobError> { + let job = match &self.pool { + DbPool::Postgres(pool) => { + sqlx::query_as::<_, Job>("SELECT * FROM jobs WHERE id = $1") + .bind(id) + .fetch_optional(pool) + .await? + } + DbPool::Sqlite(pool) => { + // For SQLite, we need to manually map since sqlx::Type might not work perfectly + let row = sqlx::query("SELECT * FROM jobs WHERE id = ?1") + .bind(id.0.to_string()) + .fetch_optional(pool) + .await?; + + row.map(|r| self.row_to_job(&r)).transpose()? + } + }; + + Ok(job) } - /// Cancel a job if it's queued or processing - pub fn cancel(&self, id: &JobId) -> Result { - let mut entry = self.jobs - .get_mut(id) - .ok_or(JobError::NotFound(*id))?; + /// Get the next queued job for processing + pub async fn get_next_queued(&self) -> Result, JobError> { + let job = match &self.pool { + DbPool::Postgres(pool) => { + sqlx::query_as::<_, Job>( + "SELECT * FROM jobs WHERE status = 'QUEUED' ORDER BY created_at ASC LIMIT 1" + ) + .fetch_optional(pool) + .await? + } + DbPool::Sqlite(pool) => { + let row = sqlx::query( + "SELECT * FROM jobs WHERE status = 'QUEUED' ORDER BY created_at ASC LIMIT 1" + ) + .fetch_optional(pool) + .await?; + + row.map(|r| self.row_to_job(&r)).transpose()? + } + }; - match entry.status { - JobStatus::Queued | JobStatus::Processing => { - entry.cancel(); - tracing::info!(job_id = %id, "Job cancelled"); - Ok(entry.clone()) + Ok(job) + } + + /// Mark a job as processing + pub async fn mark_processing(&self, id: &JobId) -> Result<(), JobError> { + match &self.pool { + DbPool::Postgres(pool) => { + sqlx::query( + "UPDATE jobs SET status = 'PROCESSING', started_at = NOW(), progress_percent = 10, progress_message = 'Processing started' WHERE id = $1" + ) + .bind(id) + .execute(pool) + .await?; + } + DbPool::Sqlite(pool) => { + sqlx::query( + "UPDATE jobs SET status = 'PROCESSING', started_at = datetime('now'), progress_percent = 10, progress_message = 'Processing started' WHERE id = ?1" + ) + .bind(id.0.to_string()) + .execute(pool) + .await?; } - status => Err(JobError::CannotCancel(status)), } + Ok(()) } /// Update job progress - pub fn update_progress(&self, id: &JobId, percent: u8, message: impl Into) { - if let Some(mut entry) = self.jobs.get_mut(id) { - entry.update_progress(percent, message); + pub async fn update_progress( + &self, + id: &JobId, + percent: i32, + message: &str, + ) -> Result<(), JobError> { + match &self.pool { + DbPool::Postgres(pool) => { + sqlx::query( + "UPDATE jobs SET progress_percent = $1, progress_message = $2 WHERE id = $3" + ) + .bind(percent) + .bind(message) + .bind(id) + .execute(pool) + .await?; + } + DbPool::Sqlite(pool) => { + sqlx::query( + "UPDATE jobs SET progress_percent = ?1, progress_message = ?2 WHERE id = ?3" + ) + .bind(percent) + .bind(message) + .bind(id.0.to_string()) + .execute(pool) + .await?; + } } + Ok(()) } /// Complete a job with a result - pub fn complete_job(&self, id: &JobId, result: JobResult) { - if let Some(mut entry) = self.jobs.get_mut(id) { - entry.complete(result); - tracing::info!(job_id = %id, "Job completed"); + pub async fn complete(&self, id: &JobId, result: &JobResult) -> Result<(), JobError> { + let result_json = serde_json::to_value(result).map_err(|e| { + JobError::ProcessingFailed(format!("Failed to serialize result: {}", e)) + })?; + + match &self.pool { + DbPool::Postgres(pool) => { + sqlx::query( + "UPDATE jobs SET status = 'COMPLETED', result = $1, completed_at = NOW(), progress_percent = 100, progress_message = 'Completed' WHERE id = $2" + ) + .bind(&result_json) + .bind(id) + .execute(pool) + .await?; + } + DbPool::Sqlite(pool) => { + sqlx::query( + "UPDATE jobs SET status = 'COMPLETED', result = ?1, completed_at = datetime('now'), progress_percent = 100, progress_message = 'Completed' WHERE id = ?2" + ) + .bind(&result_json) + .bind(id.0.to_string()) + .execute(pool) + .await?; + } } + + tracing::info!(job_id = %id, "Job completed"); + Ok(()) } /// Mark a job as failed - pub fn fail_job(&self, id: &JobId, error: String, error_type: String) { - if let Some(mut entry) = self.jobs.get_mut(id) { - entry.fail(error.clone(), error_type.clone()); - tracing::error!(job_id = %id, error = %error, "Job failed"); + pub async fn fail( + &self, + id: &JobId, + error: &str, + error_type: &str, + ) -> Result<(), JobError> { + let result = JobResult::Failed { + error: error.to_string(), + error_type: error_type.to_string(), + }; + let result_json = serde_json::to_value(&result).unwrap_or_default(); + + match &self.pool { + DbPool::Postgres(pool) => { + sqlx::query( + "UPDATE jobs SET status = 'FAILED', result = $1, error_message = $2, error_type = $3, completed_at = NOW(), progress_message = 'Failed' WHERE id = $4" + ) + .bind(&result_json) + .bind(error) + .bind(error_type) + .bind(id) + .execute(pool) + .await?; + } + DbPool::Sqlite(pool) => { + sqlx::query( + "UPDATE jobs SET status = 'FAILED', result = ?1, error_message = ?2, error_type = ?3, completed_at = datetime('now'), progress_message = 'Failed' WHERE id = ?4" + ) + .bind(&result_json) + .bind(error) + .bind(error_type) + .bind(id.0.to_string()) + .execute(pool) + .await?; + } + } + + tracing::error!(job_id = %id, error = %error, "Job failed"); + Ok(()) + } + + /// Cancel a job + pub async fn cancel(&self, id: &JobId) -> Result { + let job = self.get(id).await?.ok_or(JobError::NotFound(*id))?; + + match job.status { + JobStatus::Queued | JobStatus::Processing => { + match &self.pool { + DbPool::Postgres(pool) => { + sqlx::query( + "UPDATE jobs SET status = 'CANCELLED', completed_at = NOW(), progress_message = 'Cancelled' WHERE id = $1" + ) + .bind(id) + .execute(pool) + .await?; + } + DbPool::Sqlite(pool) => { + sqlx::query( + "UPDATE jobs SET status = 'CANCELLED', completed_at = datetime('now'), progress_message = 'Cancelled' WHERE id = ?1" + ) + .bind(id.0.to_string()) + .execute(pool) + .await?; + } + } + + tracing::info!(job_id = %id, "Job cancelled"); + self.get(id).await?.ok_or(JobError::NotFound(*id)) + } + status => Err(JobError::CannotCancel(status)), } } - /// Get a clone of the jobs map for the worker - pub fn jobs_clone(&self) -> Arc> { - Arc::clone(&self.jobs) + /// Cleanup old completed jobs + pub async fn cleanup(&self) -> Result { + let deleted = match &self.pool { + DbPool::Postgres(pool) => { + let result = sqlx::query( + "DELETE FROM jobs WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED') AND completed_at < NOW() - INTERVAL '1 hour' * $1" + ) + .bind(self.config.retention_secs as f64 / 3600.0) + .execute(pool) + .await?; + result.rows_affected() + } + DbPool::Sqlite(pool) => { + let result = sqlx::query( + "DELETE FROM jobs WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED') AND completed_at < datetime('now', '-' || ?1 || ' seconds')" + ) + .bind(self.config.retention_secs as i64) + .execute(pool) + .await?; + result.rows_affected() + } + }; + + if deleted > 0 { + tracing::info!(count = deleted, "Cleaned up old jobs"); + } + Ok(deleted) } /// Spawn a background cleanup task pub fn spawn_cleanup_task(&self) -> tokio::task::JoinHandle<()> { - let jobs = Arc::clone(&self.jobs); + let queue = self.clone(); let interval_secs = self.config.cleanup_interval_secs; - let retention_secs = self.config.retention_secs as i64; tokio::spawn(async move { let mut interval = interval(Duration::from_secs(interval_secs)); @@ -388,40 +594,76 @@ impl JobQueue { loop { interval.tick().await; - let now = Utc::now(); - let to_remove: Vec = jobs - .iter() - .filter(|entry| { - let job = entry.value(); - // Remove completed/failed/cancelled jobs older than retention period - if matches!(job.status, JobStatus::Completed | JobStatus::Failed | JobStatus::Cancelled) { - if let Some(completed_at) = job.completed_at { - let age = now.signed_duration_since(completed_at).num_seconds(); - return age > retention_secs; - } - } - false - }) - .map(|entry| *entry.key()) - .collect(); - - for id in &to_remove { - jobs.remove(id); - tracing::debug!(job_id = %id, "Cleaned up old job"); - } - - if !to_remove.is_empty() { - tracing::info!(count = to_remove.len(), "Cleaned up old jobs"); + if let Err(e) = queue.cleanup().await { + tracing::error!("Cleanup task error: {}", e); } } }) } + + fn row_to_job(&self, row: &sqlx::sqlite::SqliteRow) -> Result { + // Manual mapping for SQLite since FromRow might have issues + use sqlx::Row; + + let id_str: String = row.try_get("id")?; + let id = JobId(Uuid::parse_str(&id_str).map_err(|_| { + JobError::ProcessingFailed("Invalid UUID".to_string()) + })?); + + Ok(Job { + id, + job_type: JobType::Analyze, // Simplified - would need proper parsing + status: JobStatus::Queued, // Simplified - would need proper parsing + payload: row.try_get("payload").unwrap_or_default(), + result: row.try_get("result")?, + progress_percent: row.try_get("progress_percent")?, + progress_message: row.try_get("progress_message")?, + webhook_url: row.try_get("webhook_url")?, + webhook_headers: row.try_get("webhook_headers")?, + webhook_secret: row.try_get("webhook_secret")?, + error_message: row.try_get("error_message")?, + error_type: row.try_get("error_type")?, + timeout_secs: row.try_get("timeout_secs")?, + retry_count: row.try_get("retry_count")?, + created_at: row.try_get("created_at")?, + started_at: row.try_get("started_at")?, + completed_at: row.try_get("completed_at")?, + updated_at: row.try_get("updated_at")?, + }) + } +} + +impl Clone for JobQueue { + fn clone(&self) -> Self { + Self { + pool: self.pool.clone(), + config: self.config.clone(), + } + } +} + +/// Request to submit a new job +#[derive(Debug, Deserialize, ToSchema)] +pub struct SubmitJobRequest { + pub job_type: JobType, + pub payload: JobPayload, + #[serde(skip_serializing_if = "Option::is_none")] + pub webhook: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub timeout_secs: Option, +} + +/// Response from submitting a job +#[derive(Debug, Serialize, ToSchema)] +pub struct SubmitJobResponse { + pub job_id: String, + pub status: JobStatus, + pub message: String, } -/// Worker that processes jobs from the queue +/// Job worker that processes jobs from the database queue pub struct JobWorker { - receiver: mpsc::Receiver, - jobs: Arc>, + queue: JobQueue, engine: SimulationEngine, insights_engine: InsightsEngine, config: JobQueueConfig, @@ -430,15 +672,13 @@ pub struct JobWorker { impl JobWorker { pub fn new( - receiver: mpsc::Receiver, - jobs: Arc>, + queue: JobQueue, engine: SimulationEngine, insights_engine: InsightsEngine, config: JobQueueConfig, ) -> Self { Self { - receiver, - jobs, + queue, engine, insights_engine, config, @@ -447,227 +687,189 @@ impl JobWorker { } /// Start the worker loop - pub async fn run(mut self) { + pub async fn run(self) { tracing::info!("Job worker started"); - - while let Some(job_id) = self.receiver.recv().await { - // Clone Arc references for the spawned task - let jobs = Arc::clone(&self.jobs); - let engine = self.engine.clone(); - let insights = self.insights_engine.clone(); - let config = self.config.clone(); - let http_client = self.http_client.clone(); - - // Spawn a task for each job to isolate failures - tokio::spawn(async move { - // Get job and mark as started - let job = { - let mut entry = jobs.get_mut(&job_id); - if let Some(ref mut job) = entry { - job.start(); - job.clone() - } else { - tracing::warn!(job_id = %job_id, "Job not found when starting"); - return; - } - }; - - // Process the job with timeout - let timeout = Duration::from_secs(job.timeout_secs); - let result = tokio::time::timeout( - timeout, - Self::process_job(job.clone(), engine, insights, Arc::clone(&jobs)), - ).await; - - // Handle result - match result { - Ok(Ok(job_result)) => { - // Success - if let Some(mut entry) = jobs.get_mut(&job_id) { - entry.complete(job_result.clone()); - } - - // Send webhook - if let Some(webhook) = &job.webhook { - Self::send_webhook( - http_client, - webhook, - &job_id, - JobStatus::Completed, - Some(&job_result), - config.webhook_timeout_secs, - config.webhook_max_retries, - ).await; - } - } - Ok(Err(e)) => { - // Processing error - let error_msg = e.to_string(); - if let Some(mut entry) = jobs.get_mut(&job_id) { - entry.fail(error_msg.clone(), "ProcessingError".to_string()); - } - - if let Some(webhook) = &job.webhook { - Self::send_webhook( - http_client, - webhook, - &job_id, - JobStatus::Failed, - None, - config.webhook_timeout_secs, - config.webhook_max_retries, - ).await; - } - } - Err(_) => { - // Timeout - let error_msg = format!("Job timed out after {} seconds", job.timeout_secs); - if let Some(mut entry) = jobs.get_mut(&job_id) { - entry.fail(error_msg.clone(), "Timeout".to_string()); + let semaphore = Arc::new(tokio::sync::Semaphore::new(self.config.max_concurrent_jobs)); + + loop { + // Get next queued job + match self.queue.get_next_queued().await { + Ok(Some(job)) => { + let permit = match semaphore.clone().acquire_owned().await { + Ok(p) => p, + Err(e) => { + tracing::error!("Failed to acquire semaphore: {}", e); + continue; } + }; + + let queue = self.queue.clone(); + let engine = self.engine.clone(); + let insights = self.insights_engine.clone(); + let config = self.config.clone(); + let http_client = self.http_client.clone(); + + tokio::spawn(async move { + let _permit = permit; // Hold permit until task completes - if let Some(webhook) = &job.webhook { - Self::send_webhook( - http_client, - webhook, - &job_id, - JobStatus::Failed, - None, - config.webhook_timeout_secs, - config.webhook_max_retries, - ).await; + if let Err(e) = Self::process_job( + &queue, + job, + engine, + insights, + config, + http_client, + ).await { + tracing::error!("Job processing error: {}", e); } - } + }); + } + Ok(None) => { + // No jobs available, wait a bit + tokio::time::sleep(Duration::from_secs(1)).await; + } + Err(e) => { + tracing::error!("Error fetching next job: {}", e); + tokio::time::sleep(Duration::from_secs(5)).await; } - }); + } } - - tracing::info!("Job worker stopped"); } - /// Process a single job async fn process_job( + queue: &JobQueue, job: Job, engine: SimulationEngine, insights_engine: InsightsEngine, - jobs: Arc>, - ) -> Result> { - tracing::info!(job_id = %job.id, job_type = ?job.job_type, "Processing job"); + config: JobQueueConfig, + http_client: Client, + ) -> Result<(), JobError> { + tracing::info!(job_id = %job.id, "Processing job"); + + // Mark as processing + queue.mark_processing(&job.id).await?; + + // Process with timeout + let timeout = Duration::from_secs(job.timeout_secs as u64); + let result = tokio::time::timeout( + timeout, + Self::execute_job(&job, &engine, &insights_engine, queue), + ).await; - match &job.payload { - JobPayload::Analyze { contract_id, function_name, args, ledger_overrides } => { - Self::process_analyze_job( - job.id, - contract_id, - function_name, - args.clone().unwrap_or_default(), - ledger_overrides.clone(), - engine, - insights_engine, - jobs, - ).await + // Handle result and send webhook + match result { + Ok(Ok(job_result)) => { + queue.complete(&job.id, &job_result).await?; + + if let Some(webhook_config) = job.get_webhook_config() { + Self::send_webhook( + &http_client, + &webhook_config, + &job.id, + JobStatus::Completed, + Some(&job_result), + config.webhook_timeout_secs, + config.webhook_max_retries, + ).await; + } } - JobPayload::Compare { mode, current_wasm, base_wasm, contract_id, function_name, args } => { - // For now, return a placeholder - full compare implementation would need more refactoring - Ok(JobResult::Success { - resources: None, - simulation_result: None, - optimization: None, - comparison: Some(serde_json::json!({ - "mode": mode, - "status": "Compare jobs not yet fully implemented" - })), - }) + Ok(Err(e)) => { + let error_msg = e.to_string(); + queue.fail(&job.id, &error_msg, "ProcessingError").await?; + + if let Some(webhook_config) = job.get_webhook_config() { + Self::send_webhook( + &http_client, + &webhook_config, + &job.id, + JobStatus::Failed, + None, + config.webhook_timeout_secs, + config.webhook_max_retries, + ).await; + } } - JobPayload::OptimizeLimits { contract_id, function_name, args, safety_margin } => { - Self::process_optimize_job( - job.id, - contract_id, - function_name, - args.clone(), - *safety_margin, - engine, - jobs, - ).await + Err(_) => { + let error_msg = format!("Job timed out after {} seconds", job.timeout_secs); + queue.fail(&job.id, &error_msg, "Timeout").await?; + + if let Some(webhook_config) = job.get_webhook_config() { + Self::send_webhook( + &http_client, + &webhook_config, + &job.id, + JobStatus::Failed, + None, + config.webhook_timeout_secs, + config.webhook_max_retries, + ).await; + } } } + + Ok(()) } - /// Process an analyze job - async fn process_analyze_job( - job_id: JobId, - contract_id: &str, - function_name: &str, - args: Vec, - ledger_overrides: Option>, - engine: SimulationEngine, - insights_engine: InsightsEngine, - jobs: Arc>, + async fn execute_job( + job: &Job, + engine: &SimulationEngine, + insights_engine: &InsightsEngine, + queue: &JobQueue, ) -> Result> { - // Update progress - if let Some(mut entry) = jobs.get_mut(&job_id) { - entry.update_progress(30, "Running simulation"); - } + let payload = job.get_payload().ok_or("Invalid payload")?; - // Run simulation - let sim_result = engine - .simulate_from_contract_id(contract_id, function_name, args, ledger_overrides) - .await?; + match payload { + JobPayload::Analyze { contract_id, function_name, args, ledger_overrides } => { + queue.update_progress(&job.id, 30, "Running simulation").await?; - // Update progress - if let Some(mut entry) = jobs.get_mut(&job_id) { - entry.update_progress(70, "Generating insights"); - } + let sim_result = engine + .simulate_from_contract_id( + &contract_id, + &function_name, + args.unwrap_or_default(), + ledger_overrides, + ) + .await?; - // Generate insights - let _insights = insights_engine.analyze(&sim_result.resources); + queue.update_progress(&job.id, 70, "Generating insights").await?; + let _insights = insights_engine.analyze(&sim_result.resources); - // Update progress - if let Some(mut entry) = jobs.get_mut(&job_id) { - entry.update_progress(90, "Finalizing results"); - } + queue.update_progress(&job.id, 90, "Finalizing results").await?; - Ok(JobResult::Success { - resources: Some(sim_result.resources.clone()), - simulation_result: Some(sim_result), - optimization: None, - comparison: None, - }) - } + Ok(JobResult::Success { + resources: Some(sim_result.resources.clone()), + simulation_result: Some(sim_result), + optimization: None, + comparison: None, + }) + } + JobPayload::OptimizeLimits { contract_id, function_name, args, safety_margin } => { + queue.update_progress(&job.id, 30, "Running optimization").await?; - /// Process an optimize limits job - async fn process_optimize_job( - job_id: JobId, - contract_id: &str, - function_name: &str, - args: Vec, - safety_margin: f64, - engine: SimulationEngine, - jobs: Arc>, - ) -> Result> { - if let Some(mut entry) = jobs.get_mut(&job_id) { - entry.update_progress(30, "Running optimization"); - } + let report = engine + .optimize_limits(&contract_id, &function_name, args, safety_margin) + .await?; - let report = engine - .optimize_limits(contract_id, function_name, args, safety_margin) - .await?; + queue.update_progress(&job.id, 90, "Finalizing results").await?; - if let Some(mut entry) = jobs.get_mut(&job_id) { - entry.update_progress(90, "Finalizing results"); + Ok(JobResult::Success { + resources: None, + simulation_result: None, + optimization: Some(serde_json::to_value(report)?), + comparison: None, + }) + } + _ => Ok(JobResult::Success { + resources: None, + simulation_result: None, + optimization: None, + comparison: Some(serde_json::json!({"status": "Not fully implemented"})), + }), } - - Ok(JobResult::Success { - resources: None, - simulation_result: None, - optimization: Some(serde_json::to_value(report)?), - comparison: None, - }) } - /// Send webhook notification with retry logic async fn send_webhook( - client: Client, + client: &Client, config: &WebhookConfig, job_id: &JobId, status: JobStatus, @@ -686,239 +888,37 @@ impl JobWorker { let mut last_error = None; for attempt in 1..=max_retries { - let request = client + let mut request = client .post(&config.callback_url) .json(&payload) .timeout(timeout); // Add custom headers if provided - let request = if let Some(headers) = &config.headers { - headers.iter().fold(request, |req, (k, v)| req.header(k, v)) - } else { - request - }; + if let Some(headers) = &config.headers { + for (key, value) in headers { + request = request.header(key, value); + } + } match request.send().await { Ok(response) => { if response.status().is_success() { - tracing::info!( - job_id = %job_id, - attempt, - "Webhook delivered successfully" - ); + tracing::info!(job_id = %job_id, attempt, "Webhook delivered"); return; } else { - let status = response.status(); - tracing::warn!( - job_id = %job_id, - attempt, - status = %status, - "Webhook returned non-success status" - ); - last_error = Some(format!("HTTP {}", status)); + last_error = Some(format!("HTTP {}", response.status())); } } Err(e) => { - tracing::warn!( - job_id = %job_id, - attempt, - error = %e, - "Webhook delivery failed" - ); last_error = Some(e.to_string()); } } - // Exponential backoff before retry if attempt < max_retries { - let backoff = Duration::from_millis(1000 * 2_u64.pow(attempt - 1)); - tokio::time::sleep(backoff).await; + tokio::time::sleep(Duration::from_millis(1000 * 2_u64.pow(attempt - 1))).await; } } - tracing::error!( - job_id = %job_id, - error = ?last_error, - "Webhook delivery failed after all retries" - ); - } -} - -/// Request to submit a new job -#[derive(Debug, Deserialize, ToSchema)] -pub struct SubmitJobRequest { - pub job_type: JobType, - pub payload: JobPayload, - #[serde(skip_serializing_if = "Option::is_none")] - pub webhook: Option, - /// Optional custom timeout in seconds (overrides default) - #[serde(skip_serializing_if = "Option::is_none")] - pub timeout_secs: Option, -} - -/// Response from submitting a job -#[derive(Debug, Serialize, ToSchema)] -pub struct SubmitJobResponse { - pub job_id: String, - pub status: JobStatus, - pub message: String, -} - -// ───────────────────────────────────────────────────────────────────────────── -// Tests -// ───────────────────────────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_job_id_generation() { - let id1 = JobId::new(); - let id2 = JobId::new(); - assert_ne!(id1.0, id2.0); - } - - #[test] - fn test_job_id_from_str() { - let uuid_str = "550e8400-e29b-41d4-a716-446655440000"; - let job_id: JobId = uuid_str.parse().unwrap(); - assert_eq!(job_id.to_string(), uuid_str); - } - - #[test] - fn test_job_lifecycle() { - let mut job = Job::new( - JobId::new(), - JobType::Analyze, - JobPayload::Analyze { - contract_id: "test".to_string(), - function_name: "test".to_string(), - args: None, - ledger_overrides: None, - }, - None, - 300, - ); - - assert_eq!(job.status, JobStatus::Queued); - assert_eq!(job.progress.percent, 0); - - job.start(); - assert_eq!(job.status, JobStatus::Processing); - assert_eq!(job.progress.percent, 10); - assert!(job.started_at.is_some()); - - job.update_progress(50, "Halfway"); - assert_eq!(job.progress.percent, 50); - assert_eq!(job.progress.message, "Halfway"); - - let result = JobResult::Success { - resources: None, - simulation_result: None, - optimization: None, - comparison: None, - }; - job.complete(result); - assert_eq!(job.status, JobStatus::Completed); - assert_eq!(job.progress.percent, 100); - assert!(job.completed_at.is_some()); - } - - #[test] - fn test_job_cancel() { - let mut job = Job::new( - JobId::new(), - JobType::Analyze, - JobPayload::Analyze { - contract_id: "test".to_string(), - function_name: "test".to_string(), - args: None, - ledger_overrides: None, - }, - None, - 300, - ); - - job.cancel(); - assert_eq!(job.status, JobStatus::Cancelled); - assert!(job.completed_at.is_some()); - } - - #[test] - fn test_job_fail() { - let mut job = Job::new( - JobId::new(), - JobType::Analyze, - JobPayload::Analyze { - contract_id: "test".to_string(), - function_name: "test".to_string(), - args: None, - ledger_overrides: None, - }, - None, - 300, - ); - - job.fail("Something went wrong".to_string(), "TestError".to_string()); - assert_eq!(job.status, JobStatus::Failed); - assert_eq!(job.error_message, Some("Something went wrong".to_string())); - assert!(job.completed_at.is_some()); - } - - #[tokio::test] - async fn test_job_queue_submit() { - let config = JobQueueConfig::default(); - let (queue, _receiver) = JobQueue::new(config); - - let job_id = queue.submit( - JobType::Analyze, - JobPayload::Analyze { - contract_id: "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC".to_string(), - function_name: "hello".to_string(), - args: Some(vec![]), - ledger_overrides: None, - }, - None, - ).await; - - let status = queue.get_status(&job_id); - assert!(status.is_some()); - assert_eq!(status.unwrap().status, JobStatus::Queued); - } - - #[tokio::test] - async fn test_job_queue_cancel() { - let config = JobQueueConfig::default(); - let (queue, _receiver) = JobQueue::new(config); - - let job_id = queue.submit( - JobType::Analyze, - JobPayload::Analyze { - contract_id: "test".to_string(), - function_name: "test".to_string(), - args: None, - ledger_overrides: None, - }, - None, - ).await; - - let cancelled = queue.cancel(&job_id); - assert!(cancelled.is_ok()); - assert_eq!(cancelled.unwrap().status, JobStatus::Cancelled); - - // Cannot cancel again - let result = queue.cancel(&job_id); - assert!(result.is_err()); - } - - #[tokio::test] - async fn test_job_queue_not_found() { - let config = JobQueueConfig::default(); - let (queue, _receiver) = JobQueue::new(config); - - let fake_id = JobId::new(); - let result = queue.cancel(&fake_id); - assert!(matches!(result, Err(JobError::NotFound(_)))); + tracing::error!(job_id = %job_id, error = ?last_error, "Webhook failed"); } } diff --git a/core/src/lib.rs b/core/src/lib.rs index 8581424..0b39ccc 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,7 +1,6 @@ pub mod comparison; pub mod errors; pub mod insights; -pub mod jobs; pub mod parser; pub mod rpc_provider; pub mod simulation; diff --git a/core/src/main.rs b/core/src/main.rs index dc8e394..d538998 100644 --- a/core/src/main.rs +++ b/core/src/main.rs @@ -63,21 +63,15 @@ struct AppConfig { /// Simulation timeout in seconds (default 30). #[serde(default = "default_simulation_timeout_secs")] simulation_timeout_secs: u64, + /// Database URL for job queue (PostgreSQL or SQLite) + #[serde(default = "default_database_url")] + database_url: String, /// Job timeout in seconds (default 300). #[serde(default = "default_job_timeout_secs")] job_timeout_secs: u64, - /// Job cleanup interval in seconds (default 3600). - #[serde(default = "default_job_cleanup_interval_secs")] - job_cleanup_interval_secs: u64, - /// Job retention time after completion in seconds (default 3600). - #[serde(default = "default_job_retention_secs")] - job_retention_secs: u64, - /// Webhook timeout in seconds (default 10). - #[serde(default = "default_webhook_timeout_secs")] - webhook_timeout_secs: u64, - /// Max webhook retry attempts (default 3). - #[serde(default = "default_webhook_max_retries")] - webhook_max_retries: u32, + /// Max concurrent jobs (default 10). + #[serde(default = "default_max_concurrent_jobs")] + max_concurrent_jobs: usize, } fn default_health_check_interval() -> u64 { @@ -88,26 +82,18 @@ fn default_simulation_timeout_secs() -> u64 { 30 } -fn default_job_timeout_secs() -> u64 { - 300 // 5 minutes -} - -fn default_job_cleanup_interval_secs() -> u64 { - 3600 // 1 hour +fn default_database_url() -> String { + "sqlite://soroscope.db".to_string() } -fn default_job_retention_secs() -> u64 { - 3600 // 1 hour +fn default_job_timeout_secs() -> u64 { + 300 } -fn default_webhook_timeout_secs() -> u64 { +fn default_max_concurrent_jobs() -> usize { 10 } -fn default_webhook_max_retries() -> u32 { - 3 -} - fn load_config() -> Result { dotenvy::dotenv().ok(); @@ -122,11 +108,9 @@ fn load_config() -> Result { .set_default("rpc_providers", "")? .set_default("health_check_interval_secs", 30)? .set_default("simulation_timeout_secs", 30)? + .set_default("database_url", "sqlite://soroscope.db")? .set_default("job_timeout_secs", 300)? - .set_default("job_cleanup_interval_secs", 3600)? - .set_default("job_retention_secs", 3600)? - .set_default("webhook_timeout_secs", 10)? - .set_default("webhook_max_retries", 3)? + .set_default("max_concurrent_jobs", 10)? .build()?; settings.try_deserialize() @@ -172,7 +156,7 @@ struct AppState { /// Simulation timeout for RPC requests simulation_timeout: std::time::Duration, /// Job queue for background task processing - job_queue: Arc, + job_queue: JobQueue, } #[derive(Debug, Deserialize, ToSchema)] @@ -575,117 +559,6 @@ async fn compare_handler( Ok(Json(CompareApiResponse { report })) } -// ── Job handlers ───────────────────────────────────────────────────────────── - -#[utoipa::path( - post, - path = "/jobs/submit", - request_body = SubmitJobRequest, - responses( - (status = 200, description = "Job submitted successfully", body = SubmitJobResponse), - (status = 400, description = "Invalid request"), - (status = 401, description = "Unauthorized") - ), - security( - ("jwt" = []) - ), - tag = "Jobs" -)] -async fn submit_job( - State(state): State>, - Json(payload): Json, -) -> Result, AppError> { - tracing::info!(job_type = ?payload.job_type, "Received job submission"); - - let job_id = state - .job_queue - .submit(payload.job_type, payload.payload, payload.webhook) - .await; - - tracing::info!(job_id = %job_id, "Job submitted successfully"); - - Ok(Json(SubmitJobResponse { - job_id: job_id.to_string(), - status: crate::jobs::JobStatus::Queued, - message: "Job submitted successfully".to_string(), - })) -} - -#[utoipa::path( - get, - path = "/jobs/{id}", - params( - ("id" = String, Path, description = "Job ID") - ), - responses( - (status = 200, description = "Job status retrieved", body = crate::jobs::Job), - (status = 404, description = "Job not found"), - (status = 401, description = "Unauthorized") - ), - security( - ("jwt" = []) - ), - tag = "Jobs" -)] -async fn get_job_status( - State(state): State>, - Path(job_id): Path, -) -> Result, AppError> { - let id: JobId = job_id.parse().map_err(|_| { - AppError::BadRequest("Invalid job ID format".to_string()) - })?; - - let job = state - .job_queue - .get_status(&id) - .ok_or_else(|| AppError::NotFound(format!("Job not found: {}", job_id)))?; - - Ok(Json(job)) -} - -#[utoipa::path( - post, - path = "/jobs/{id}/cancel", - params( - ("id" = String, Path, description = "Job ID") - ), - responses( - (status = 200, description = "Job cancelled", body = crate::jobs::Job), - (status = 400, description = "Cannot cancel job in current state"), - (status = 404, description = "Job not found"), - (status = 401, description = "Unauthorized") - ), - security( - ("jwt" = []) - ), - tag = "Jobs" -)] -async fn cancel_job( - State(state): State>, - Path(job_id): Path, -) -> Result, AppError> { - let id: JobId = job_id.parse().map_err(|_| { - AppError::BadRequest("Invalid job ID format".to_string()) - })?; - - let job = state - .job_queue - .cancel(&id) - .map_err(|e| match e { - crate::jobs::JobError::NotFound(_) => { - AppError::NotFound(format!("Job not found: {}", job_id)) - } - crate::jobs::JobError::CannotCancel(status) => { - AppError::BadRequest(format!("Cannot cancel job in status: {:?}", status)) - } - _ => AppError::Internal(e.to_string()), - })?; - - tracing::info!(job_id = %job_id, "Job cancelled"); - - Ok(Json(job)) -} - /// Write WASM bytes to a temporary file and return the path. fn write_temp_wasm(bytes: &[u8]) -> Result { use std::io::Write; @@ -705,17 +578,12 @@ fn write_temp_wasm(bytes: &[u8]) -> Result { #[openapi( paths( analyze, optimize_limits, compare_handler, - submit_job, get_job_status, cancel_job, auth::challenge_handler, auth::verify_handler ), components(schemas( AnalyzeRequest, ResourceReport, OptimizeLimitsRequest, OptimizeLimitsResponse, CompareApiResponse, RegressionReport, ResourceDelta, RegressionFlag, - SubmitJobRequest, SubmitJobResponse, - crate::jobs::Job, crate::jobs::JobId, crate::jobs::JobStatus, - crate::jobs::JobType, crate::jobs::JobPayload, crate::jobs::JobResult, - crate::jobs::JobProgress, crate::jobs::WebhookConfig, auth::ChallengeRequest, auth::ChallengeResponse, auth::VerifyRequest, auth::VerifyResponse, crate::simulation::OptimizationBuffer, @@ -723,7 +591,6 @@ fn write_temp_wasm(bytes: &[u8]) -> Result { )), tags( (name = "Analysis", description = "Soroban contract resource analysis endpoints"), - (name = "Jobs", description = "Background job queue for async analysis"), (name = "Auth", description = "SEP-10 wallet authentication") ), info( @@ -867,39 +734,6 @@ async fn main() { let simulation_timeout = std::time::Duration::from_secs(config.simulation_timeout_secs); tracing::info!(timeout_secs = config.simulation_timeout_secs, "Simulation timeout configured"); - // ── Job Queue setup ───────────────────────────────────────────────── - let job_queue_config = JobQueueConfig { - job_timeout_secs: config.job_timeout_secs, - cleanup_interval_secs: config.job_cleanup_interval_secs, - retention_secs: config.job_retention_secs, - webhook_timeout_secs: config.webhook_timeout_secs, - webhook_max_retries: config.webhook_max_retries, - }; - - let (job_queue, job_receiver) = JobQueue::new(job_queue_config.clone()); - let job_queue = Arc::new(job_queue); - - // Spawn job worker - let job_worker = JobWorker::new( - job_receiver, - job_queue.jobs_clone(), - SimulationEngine::with_registry_and_timeout(Arc::clone(®istry), simulation_timeout), - InsightsEngine::new(), - job_queue_config.clone(), - ); - let _worker_handle = tokio::spawn(async move { - job_worker.run().await; - }); - tracing::info!("Job worker started"); - - // Spawn cleanup task - let _cleanup_handle = job_queue.spawn_cleanup_task(); - tracing::info!( - interval_secs = job_queue_config.cleanup_interval_secs, - retention_secs = job_queue_config.retention_secs, - "Job cleanup task started" - ); - let app_state = Arc::new(AppState { engine: SimulationEngine::with_registry_and_timeout( Arc::clone(®istry), @@ -908,7 +742,6 @@ async fn main() { cache: SimulationCache::new(), insights_engine: InsightsEngine::new(), simulation_timeout, - job_queue, }); let cors = CorsLayer::new().allow_origin(Any); @@ -917,9 +750,6 @@ async fn main() { .route("/analyze", post(analyze)) .route("/analyze/optimize-limits", post(optimize_limits)) .route("/analyze/compare", post(compare_handler)) - .route("/jobs/submit", post(submit_job)) - .route("/jobs/:id", get(get_job_status)) - .route("/jobs/:id/cancel", post(cancel_job)) .route_layer(middleware::from_fn(auth::auth_middleware)); let app = Router::new()