Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ on:

env:
CARGO_TERM_COLOR: always
# Prevent utoipa-swagger-ui build script from hitting the network.
# The "vendored" Cargo feature bundles assets; this env var is a belt-and-
# suspenders guard in case any older path still attempts a fetch.
SWAGGER_UI_DOWNLOAD_URL: ""

jobs:
# ------------------------------------------------------------------
Expand All @@ -22,6 +26,15 @@ jobs:
uses: dtolnay/rust-toolchain@stable
with:
components: rustfmt
- name: Cache Cargo registry
uses: actions/cache@v3
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: ${{ runner.os }}-cargo-fmt-${{ hashFiles('**/Cargo.lock') }}
restore-keys: ${{ runner.os }}-cargo-fmt-
- name: Check Formatting
run: cargo fmt --all -- --check

Expand All @@ -38,6 +51,15 @@ jobs:
with:
components: clippy
target: wasm32-unknown-unknown
- name: Cache Cargo registry
uses: actions/cache@v3
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: ${{ runner.os }}-cargo-clippy-${{ hashFiles('**/Cargo.lock') }}
restore-keys: ${{ runner.os }}-cargo-clippy-
- name: Install System Dependencies
run: sudo apt-get update && sudo apt-get install -y pkg-config libdbus-1-dev libudev-dev
- name: Install Soroban CLI
Expand All @@ -57,12 +79,21 @@ jobs:
uses: dtolnay/rust-toolchain@stable
with:
target: wasm32-unknown-unknown
- name: Cache Cargo registry
uses: actions/cache@v3
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: ${{ runner.os }}-cargo-test-${{ hashFiles('**/Cargo.lock') }}
restore-keys: ${{ runner.os }}-cargo-test-
- name: Install System Dependencies
run: sudo apt-get update && sudo apt-get install -y pkg-config libdbus-1-dev libudev-dev
- name: Install Soroban CLI
run: cargo install soroban-cli
- name: Run Tests
run: cargo test
run: cargo test --workspace

# ------------------------------------------------------------------
# Job 4: Frontend (Lint & Typecheck)
Expand Down
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tower = "0.4"
tower-http = { version = "0.5", features = ["cors", "trace"] }
utoipa = { version = "4", features = ["axum_extras"] }
utoipa-swagger-ui = { version = "7", features = ["axum"] }
utoipa-swagger-ui = { version = "7", features = ["axum", "vendored"] }
reqwest = { version = "0.12", features = ["json"] }
base64 = "0.22"
hex = "0.4"
Expand All @@ -31,4 +31,6 @@ jsonwebtoken = "9"
ed25519-dalek = "2"
sha2 = "0.10"
rand = "0.8"
dashmap = "5.5"
moka = { version = "0.12", features = ["future"] }
uuid = { version = "1.0", features = ["v4"] }
248 changes: 248 additions & 0 deletions core/src/jobs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{timeout, Duration};
use utoipa::ToSchema;
use uuid::Uuid;

use crate::errors::AppError;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum JobStatus {
Queued,
Processing,
Completed,
Failed,
Cancelled,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobMetadata {
pub id: String,
pub status: JobStatus,
pub created_at: u64,
pub updated_at: u64,
pub expires_at: Option<u64>,
pub callback_url: Option<String>,
pub result: Option<serde_json::Value>,
pub error: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct SubmitJobRequest {
pub callback_url: Option<String>,
pub expires_in_seconds: Option<u64>,
pub job_data: serde_json::Value,
}

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct JobResponse {
pub job_id: String,
pub status: JobStatus,
pub created_at: u64,
pub expires_at: Option<u64>,
}

#[derive(Clone)]
pub struct JobQueue {
jobs: Arc<DashMap<String, JobMetadata>>,
}

impl JobQueue {
pub fn new() -> Self {
Self {
jobs: Arc::new(DashMap::new()),
}
}

pub fn submit_job(&self, request: SubmitJobRequest) -> Result<JobResponse, AppError> {
let job_id = Uuid::new_v4().to_string();

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| AppError::Internal(format!("Time error: {}", e)))?
.as_secs();

let expires_at = request.expires_in_seconds.map(|secs| now + secs);

let job_metadata = JobMetadata {
id: job_id.clone(),
status: JobStatus::Queued,
created_at: now,
updated_at: now,
expires_at,
callback_url: request.callback_url,
result: None,
error: None,
};

self.jobs.insert(job_id.clone(), job_metadata);

Ok(JobResponse {
job_id,
status: JobStatus::Queued,
created_at: now,
expires_at,
})
}

pub async fn get_job(&self, job_id: &str) -> Result<JobResponse, AppError> {
let job = self
.jobs
.get(job_id)
.ok_or_else(|| AppError::NotFound(format!("Job with ID {} not found", job_id)))?;

Ok(JobResponse {
job_id: job.id.clone(),
status: job.status.clone(),
created_at: job.created_at,
expires_at: job.expires_at,
})
}

pub fn update_job_status(
&self,
job_id: &str,
status: JobStatus,
result: Option<serde_json::Value>,
error: Option<String>,
) -> Result<(), AppError> {
if let Some(mut job) = self.jobs.get_mut(job_id) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| AppError::Internal(format!("Time error: {}", e)))?
.as_secs();

job.status = status;
job.updated_at = now;
job.result = result;
job.error = error;

// If job is completed, trigger webhook if callback URL exists
if matches!(job.status, JobStatus::Completed | JobStatus::Failed) {
if let Some(ref callback_url) = job.callback_url.clone() {
// Clone into fully-owned values so the async task is 'static
let owned_url: String = callback_url.clone();
let job_snapshot: JobMetadata = job.clone();
tokio::spawn(async move {
Self::trigger_webhook(owned_url, job_snapshot).await;
});
}
}
} else {
return Err(AppError::NotFound(format!(
"Job with ID {} not found",
job_id
)));
}

Ok(())
}

pub fn cancel_job(&self, job_id: &str) -> Result<(), AppError> {
self.update_job_status(
job_id,
JobStatus::Cancelled,
None,
Some("Job cancelled".to_string()),
)
}

pub fn cleanup_expired_jobs(&self) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();

self.jobs.retain(|_, job| {
// Keep job if it doesn't expire or hasn't expired yet
job.expires_at.map_or(true, |exp| exp > now)
});
}

pub async fn trigger_webhook(callback_url: String, job: JobMetadata) {
let client = reqwest::Client::new();
let payload = serde_json::json!({
"job_id": job.id,
"status": format!("{:?}", job.status),
"result": job.result,
"error": job.error,
"completed_at": job.updated_at
});

match timeout(
Duration::from_secs(30),
client.post(&callback_url).json(&payload).send(),
)
.await
{
Ok(Ok(response)) => {
if response.status().is_success() {
tracing::info!(
"Webhook sent successfully to {} for job {}",
callback_url,
job.id
);
} else {
tracing::warn!(
"Webhook failed with status {} for job {} at {}",
response.status(),
job.id,
callback_url
);
}
}
Ok(Err(e)) => {
tracing::error!("Failed to send webhook for job {}: {}", job.id, e);
}
Err(_) => {
tracing::error!(
"Webhook request timed out for job {} at {}",
job.id,
callback_url
);
}
}
}

pub fn get_job_result(&self, job_id: &str) -> Result<Option<serde_json::Value>, AppError> {
let job = self
.jobs
.get(job_id)
.ok_or_else(|| AppError::NotFound(format!("Job with ID {} not found", job_id)))?;

Ok(job.result.clone())
}

pub fn is_job_expired(&self, job_id: &str) -> Result<bool, AppError> {
let job = self
.jobs
.get(job_id)
.ok_or_else(|| AppError::NotFound(format!("Job with ID {} not found", job_id)))?;

match job.expires_at {
Some(expiry) => {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| AppError::Internal(format!("Time error: {}", e)))?
.as_secs();

Ok(now > expiry)
}
None => Ok(false),
}
}
}

// Background cleanup task to remove expired jobs
pub async fn start_cleanup_task(job_queue: JobQueue, interval_seconds: u64) {
let job_queue_clone = job_queue.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(interval_seconds)).await;
job_queue_clone.cleanup_expired_jobs();
tracing::debug!("Expired jobs cleanup completed");
}
});
}
Loading
Loading