diff --git a/src/services/eventProcessor.ts b/src/services/eventProcessor.ts index e5f0dc5..b9aa548 100644 --- a/src/services/eventProcessor.ts +++ b/src/services/eventProcessor.ts @@ -2,6 +2,7 @@ import { Knex } from 'knex' import { ParsedEvent, ProcessorConfig, VaultEventPayload, MilestoneEventPayload, ValidationEventPayload } from '../types/horizonSync.js' import { retryWithBackoff, DEFAULT_RETRY_CONFIG } from '../utils/retry.js' import { createAuditLog } from '../lib/audit-logs.js' +import { IdempotencyService } from './idempotency.js' /** * Result of processing an event @@ -20,10 +21,12 @@ export interface ProcessingResult { export class EventProcessor { private db: Knex private config: ProcessorConfig + private idempotency: IdempotencyService constructor(db: Knex, config: ProcessorConfig) { this.db = db this.config = config + this.idempotency = new IdempotencyService(db) } /** @@ -113,11 +116,9 @@ export class EventProcessor { try { // Check idempotency - if event already processed, return success - const existing = await trx('processed_events') - .where({ event_id: event.eventId }) - .first() + const alreadyProcessed = await this.idempotency.isEventProcessed(event.eventId, trx) - if (existing) { + if (alreadyProcessed) { await trx.commit() return // Already processed } @@ -125,15 +126,8 @@ export class EventProcessor { // Route to appropriate handler based on event type await this.routeEvent(event, trx) - // Store event_id in processed_events table - await trx('processed_events').insert({ - event_id: event.eventId, - transaction_hash: event.transactionHash, - event_index: event.eventIndex, - ledger_number: event.ledgerNumber, - processed_at: new Date(), - created_at: new Date() - }) + // Store event status for idempotency + await this.idempotency.markEventProcessed(event, trx) // Commit transaction await trx.commit() diff --git a/src/services/idempotency.ts b/src/services/idempotency.ts index bb12240..c7b6c17 100644 --- a/src/services/idempotency.ts +++ b/src/services/idempotency.ts @@ -1,90 +1,78 @@ -import { createHash } from 'node:crypto' -import type { PoolClient } from 'pg' -import { getPgPool } from '../db/pool.js' +import { Knex } from 'knex' +import { ParsedEvent } from '../types/horizonSync.js' -interface IdempotencyRecord { - key: string - requestHash: string - response: T -} - -const memoryStore = new Map>() +/** + * Idempotency Service + * Handles checking and recording of processed operations to ensure exactly-once execution. + */ +export class IdempotencyService { + private db: Knex -export class IdempotencyConflictError extends Error { - constructor() { - super('Idempotency key already exists with a different request payload.') - this.name = 'IdempotencyConflictError' + constructor(db: Knex) { + this.db = db } -} - -export const hashRequestPayload = (payload: unknown): string => { - return createHash('sha256').update(JSON.stringify(payload)).digest('hex') -} - -export const getIdempotentResponse = async ( - key: string, - requestHash: string, -): Promise => { - const pool = getPgPool() - if (pool) { - const result = await pool.query<{ - request_hash: string - response: T - }>('SELECT request_hash, response FROM idempotency_keys WHERE key = $1', [key]) - - if (result.rowCount === 0) { - return null - } - - const row = result.rows[0] - if (row.request_hash !== requestHash) { - throw new IdempotencyConflictError() - } - return row.response + /** + * Check if an event has already been processed. + * + * @param eventId - Unique ID of the event + * @param trx - Optional transaction to use for the check + * @returns Promise - True if already processed + */ + async isEventProcessed(eventId: string, trx?: Knex.Transaction): Promise { + const query = (trx || this.db)('processed_events') + .where({ event_id: eventId }) + .first() + + const result = await query + return !!result } - const record = memoryStore.get(key) - if (!record) { - return null + /** + * Mark an event as processed in the database. + * MUST be called within a transaction that includes the business logic operations. + * + * @param event - The parsed event being processed + * @param trx - Transaction to use for recording + */ + async markEventProcessed(event: ParsedEvent, trx: Knex.Transaction): Promise { + await trx('processed_events').insert({ + event_id: event.eventId, + transaction_hash: event.transactionHash, + event_index: event.eventIndex, + ledger_number: event.ledgerNumber, + processed_at: new Date(), + created_at: new Date() + }) } - if (record.requestHash !== requestHash) { - throw new IdempotencyConflictError() + /** + * General-purpose idempotency check for API requests. + * Checks the idempotency_keys table. + * + * @param key - The idempotency key provided by the client + * @returns Promise - The stored response if found, null otherwise + */ + async getStoredResponse(key: string): Promise { + const record = await this.db('idempotency_keys') + .where({ key }) + .first() + + return record ? record.response : null } - return record.response as T -} - -export const saveIdempotentResponse = async ( - key: string, - requestHash: string, - vaultId: string, - response: T, - client?: PoolClient, -): Promise => { - if (client) { - await client.query( - 'INSERT INTO idempotency_keys (key, request_hash, vault_id, response) VALUES ($1, $2, $3, $4::jsonb)', - [key, requestHash, vaultId, JSON.stringify(response)], - ) - return - } - - const pool = getPgPool() - if (pool) { - await pool.query('INSERT INTO idempotency_keys (key, request_hash, vault_id, response) VALUES ($1, $2, $3, $4::jsonb)', [ + /** + * Store a response for a given idempotency key. + * + * @param key - The idempotency key + * @param response - The response payload to store + * @param trx - Optional transaction + */ + async storeResponse(key: string, response: any, trx?: Knex.Transaction): Promise { + await (trx || this.db)('idempotency_keys').insert({ key, - requestHash, - vaultId, - JSON.stringify(response), - ]) - return + response: typeof response === 'string' ? response : JSON.stringify(response), + created_at: new Date() + }) } - - memoryStore.set(key, { key, requestHash, response }) -} - -export const resetIdempotencyStore = (): void => { - memoryStore.clear() } diff --git a/src/tests/eventIdempotency.test.ts b/src/tests/eventIdempotency.test.ts new file mode 100644 index 0000000..693da98 --- /dev/null +++ b/src/tests/eventIdempotency.test.ts @@ -0,0 +1,144 @@ +import { describe, it, expect, beforeAll, afterAll, beforeEach } from '@jest/globals' +import knex, { Knex } from 'knex' +import { EventProcessor } from '../services/eventProcessor.js' +import { ParsedEvent } from '../types/horizonSync.js' + +describe('Event Processor Idempotency', () => { + let db: Knex + let processor: EventProcessor + + beforeAll(async () => { + db = knex({ + client: 'pg', + connection: process.env.DATABASE_URL || 'postgresql://postgres:postgres@localhost:5432/disciplr_test' + }) + + // Ensure migrations are up to date + await db.migrate.latest() + + processor = new EventProcessor(db, { + maxRetries: 3, + retryBackoffMs: 100 + }) + }) + + afterAll(async () => { + await db.destroy() + }) + + beforeEach(async () => { + // Clean tables + await db('validations').del() + await db('milestones').del() + await db('vaults').del() + await db('processed_events').del() + await db('failed_events').del() + }) + + it('should process a vault_created event and ignore duplicates', async () => { + const event: ParsedEvent = { + eventId: 'tx1:op0', + transactionHash: 'tx1', + eventIndex: 0, + ledgerNumber: 100, + eventType: 'vault_created', + payload: { + vaultId: 'vault-unique-1', + creator: 'GCREATOR', + amount: '100', + startTimestamp: new Date(), + endTimestamp: new Date(Date.now() + 100000), + successDestination: 'GSUCCESS', + failureDestination: 'GFAIL', + status: 'active' + } + } + + // 1st processing + const result1 = await processor.processEvent(event) + expect(result1.success).toBe(true) + + const vaultCount1 = await db('vaults').where({ id: 'vault-unique-1' }).count('* as count').first() + expect(Number(vaultCount1?.count)).toBe(1) + + // 2nd processing (duplicate) + const result2 = await processor.processEvent(event) + expect(result2.success).toBe(true) + + const vaultCount2 = await db('vaults').where({ id: 'vault-unique-1' }).count('* as count').first() + expect(Number(vaultCount2?.count)).toBe(1) // Should still be 1 + + const processedEvents = await db('processed_events').where({ event_id: 'tx1:op0' }).count('* as count').first() + expect(Number(processedEvents?.count)).toBe(1) + }) + + it('should maintain idempotency for milestone creation', async () => { + // Create vault first + await db('vaults').insert({ + id: 'vault-m', + creator: 'GCREATOR', + amount: '100', + start_timestamp: new Date(), + end_timestamp: new Date(Date.now() + 100000), + success_destination: 'GSUCCESS', + failure_destination: 'GFAIL', + status: 'active', + created_at: new Date() + }) + + const event: ParsedEvent = { + eventId: 'tx2:op1', + transactionHash: 'tx2', + eventIndex: 1, + ledgerNumber: 101, + eventType: 'milestone_created', + payload: { + milestoneId: 'ms-unique-1', + vaultId: 'vault-m', + title: 'Milestone 1', + description: 'First milestone', + targetAmount: '50', + deadline: new Date() + } + } + + await processor.processEvent(event) + await processor.processEvent(event) // Duplicate + + const milestoneCount = await db('milestones').where({ id: 'ms-unique-1' }).count('* as count').first() + expect(Number(milestoneCount?.count)).toBe(1) + }) + + it('should handle concurrent processing attempts gracefully', async () => { + const event: ParsedEvent = { + eventId: 'tx3:op0', + transactionHash: 'tx3', + eventIndex: 0, + ledgerNumber: 102, + eventType: 'vault_created', + payload: { + vaultId: 'vault-concurrent', + creator: 'GCREATOR', + amount: '100', + startTimestamp: new Date(), + endTimestamp: new Date(Date.now() + 100000), + successDestination: 'GSUCCESS', + failureDestination: 'GFAIL', + status: 'active' + } + } + + // Fire off two processing attempts simultaneously + const [res1, res2] = await Promise.all([ + processor.processEvent(event), + processor.processEvent(event) + ]) + + // Both should report success (either it processed or it was a no-op due to already processed) + expect(res1.success).toBe(true) + expect(res2.success).toBe(true) + + const vaultCount = await db('vaults').where({ id: 'vault-concurrent' }).count('* as count').first() + expect(Number(vaultCount?.count)).toBe(1) + }) +})