Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 0 additions & 62 deletions CI_FIXES.md
Original file line number Diff line number Diff line change
@@ -1,63 +1 @@
# CI/CD Fixes Applied

## Summary
This document lists all the changes made to ensure the codebase passes CI/CD checks.

## Changes Made

### 1. Code Formatting
- Applied `cargo fmt` to format all Rust code according to rustfmt standards
- All formatting issues resolved

### 2. Migration Fixes
- **Moved `partition_utils.sql`** from `migrations/` to `docs/` directory
- This file is not a migration but a utility script
- sqlx requires migration files to have numeric prefixes

- **Removed duplicate partition migration**
- Deleted `migrations/20260219000000_partition_transactions.sql` (duplicate)
- Kept `migrations/20250217000000_partition_transactions.sql`

- **Fixed duplicate index creation**
- Modified `20250217000000_partition_transactions.sql` to use `CREATE INDEX IF NOT EXISTS`
- Prevents conflict with index created in init migration

- **Renamed migration to avoid timestamp collision**
- Renamed `20260222000000_transaction_memo_metadata.sql` to `20260222000001_transaction_memo_metadata.sql`
- Two migrations had the same timestamp causing primary key violation

- **Removed foreign key constraint in DLQ table**
- Modified `20260220143500_transaction_dlq.sql`
- Partitioned tables don't support foreign keys to non-unique columns
- Added comment about application-level referential integrity

### 3. Clippy Fixes
- **Removed unused imports**:
- `utoipa::ToSchema` from `src/db/models.rs`
- `ENTITY_SETTLEMENT` and `TransactionDlq` from `src/db/queries.rs`

- **Fixed redundant field names**:
- Changed `anchor_webhook_secret: anchor_webhook_secret` to `anchor_webhook_secret` in `src/config.rs`

### 4. Remaining Issues (To Be Fixed)
The following issues still need to be addressed:

- Deprecated function usage:
- `base64::encode` and `base64::decode` in `src/utils/cursor.rs`
- `chrono::DateTime::from_utc` in `src/db/cron.rs`
- `chrono::TimeZone::ymd_opt` in `src/db/cron.rs`

- Unused imports in various files
- Missing fields in test fixtures
- Config struct field mismatches in tests

## Testing
- All migrations now run successfully
- Database schema is properly created
- Code formatting passes `cargo fmt --check`

## Next Steps
1. Fix remaining clippy warnings
2. Update test fixtures with new Transaction fields (memo, memo_type, metadata)
3. Update deprecated function calls to use new APIs
4. Remove unused imports
113 changes: 83 additions & 30 deletions src/middleware/idempotency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use serde::{Deserialize, Serialize};
use std::time::Duration;

#[derive(Clone)]
#[allow(dead_code)]
pub struct IdempotencyService {
client: Client,
}
Expand Down Expand Up @@ -43,35 +42,104 @@ impl IdempotencyService {

pub async fn check_idempotency(
&self,
_key: &str,
key: &str,
) -> Result<IdempotencyStatus, redis::RedisError> {
// Placeholder implementation
Ok(IdempotencyStatus::New)
let cache_key = format!("idempotency:{}", key);
let lock_key = format!("idempotency:lock:{}", key);

let mut conn = self.client.get_multiplexed_async_connection().await?;

// Check if response is cached
let cached: Option<String> = redis::cmd("GET")
.arg(&cache_key)
.query_async(&mut conn)
.await?;

if let Some(data) = cached {
let response: CachedResponse = serde_json::from_str(&data)
.map_err(|e| redis::RedisError::from((redis::ErrorKind::TypeError, "deserialization error", e.to_string())))?;
return Ok(IdempotencyStatus::Completed(response));
}

// Try to acquire lock
let acquired: bool = redis::cmd("SET")
.arg(&lock_key)
.arg("processing")
.arg("NX")
.arg("EX")
.arg(300) // 5 minute lock
.query_async(&mut conn)
.await?;

if acquired {
Ok(IdempotencyStatus::New)
} else {
Ok(IdempotencyStatus::Processing)
}
}

pub async fn store_response(
&self,
_key: &str,
_status: u16,
_body: String,
key: &str,
status: u16,
body: String,
) -> Result<(), redis::RedisError> {
// Placeholder implementation
let cache_key = format!("idempotency:{}", key);
let lock_key = format!("idempotency:lock:{}", key);

let cached = CachedResponse { status, body };
let data = serde_json::to_string(&cached)
.map_err(|e| redis::RedisError::from((redis::ErrorKind::TypeError, "serialization error", e.to_string())))?;

let mut conn = self.client.get_multiplexed_async_connection().await?;

// Store response with 24 hour TTL
redis::cmd("SETEX")
.arg(&cache_key)
.arg(86400)
.arg(&data)
.query_async::<_, ()>(&mut conn)
.await?;

// Release lock
redis::cmd("DEL")
.arg(&lock_key)
.query_async::<_, ()>(&mut conn)
.await?;

Ok(())
}

pub async fn release_lock(&self, _key: &str) -> Result<(), redis::RedisError> {
// Placeholder implementation
pub async fn release_lock(&self, key: &str) -> Result<(), redis::RedisError> {
let lock_key = format!("idempotency:lock:{}", key);
let mut conn = self.client.get_multiplexed_async_connection().await?;

redis::cmd("DEL")
.arg(&lock_key)
.query_async::<_, ()>(&mut conn)
.await?;

Ok(())
}

pub async fn check_and_set(
&self,
_key: &str,
_value: &str,
_ttl: Duration,
key: &str,
value: &str,
ttl: Duration,
) -> Result<bool, redis::RedisError> {
// Placeholder implementation
Ok(true)
let mut conn = self.client.get_multiplexed_async_connection().await?;

let acquired: bool = redis::cmd("SET")
.arg(key)
.arg(value)
.arg("NX")
.arg("EX")
.arg(ttl.as_secs())
.query_async(&mut conn)
.await?;

Ok(acquired)
}
}

Expand All @@ -81,9 +149,6 @@ pub async fn idempotency_middleware(
request: Request<Body>,
next: Next<Body>,
) -> Response {
// Extract idempotency key from request
// This could be from headers, query params, or body
// For now, we'll extract from a custom header
let idempotency_key = match request.headers().get("x-idempotency-key") {
Some(key) => match key.to_str() {
Ok(k) => k.to_string(),
Expand All @@ -98,31 +163,22 @@ pub async fn idempotency_middleware(
}
},
None => {
// If no idempotency key provided, proceed without idempotency check
return next.run(request).await;
}
};

// Check idempotency status
match service.check_idempotency(&idempotency_key).await {
Ok(IdempotencyStatus::New) => {
// Process the request
let response: Response = next.run(request).await;

// If successful (2xx), cache the response
if response.status().is_success() {
// Extract response body and status
let status = response.status().as_u16();

// For simplicity, we'll store a success marker
// In production, you might want to capture the actual response body
let body = serde_json::json!({"status": "success"}).to_string();

if let Err(e) = service.store_response(&idempotency_key, status, body).await {
tracing::error!("Failed to store idempotency response: {}", e);
}
} else {
// Release lock on failure
if let Err(e) = service.release_lock(&idempotency_key).await {
tracing::error!("Failed to release idempotency lock: {}", e);
}
Expand All @@ -131,7 +187,6 @@ pub async fn idempotency_middleware(
response
}
Ok(IdempotencyStatus::Processing) => {
// Request is currently being processed
(
StatusCode::TOO_MANY_REQUESTS,
Json(serde_json::json!({
Expand All @@ -142,7 +197,6 @@ pub async fn idempotency_middleware(
.into_response()
}
Ok(IdempotencyStatus::Completed(cached)) => {
// Return cached response
let status = StatusCode::from_u16(cached.status).unwrap_or(StatusCode::OK);
(
status,
Expand All @@ -155,7 +209,6 @@ pub async fn idempotency_middleware(
}
Err(e) => {
tracing::error!("Idempotency check failed: {}", e);
// On Redis failure, proceed with request (fail open)
next.run(request).await
}
}
Expand Down
Loading
Loading