Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions src/services/eventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Knex } from 'knex'
import { ParsedEvent, ProcessorConfig, VaultEventPayload, MilestoneEventPayload, ValidationEventPayload } from '../types/horizonSync.js'
import { retryWithBackoff, DEFAULT_RETRY_CONFIG } from '../utils/retry.js'
import { createAuditLog } from '../lib/audit-logs.js'
import { IdempotencyService } from './idempotency.js'

/**
* Result of processing an event
Expand All @@ -20,10 +21,12 @@ export interface ProcessingResult {
export class EventProcessor {
private db: Knex
private config: ProcessorConfig
private idempotency: IdempotencyService

constructor(db: Knex, config: ProcessorConfig) {
this.db = db
this.config = config
this.idempotency = new IdempotencyService(db)
}

/**
Expand Down Expand Up @@ -113,27 +116,18 @@ export class EventProcessor {

try {
// Check idempotency - if event already processed, return success
const existing = await trx('processed_events')
.where({ event_id: event.eventId })
.first()
const alreadyProcessed = await this.idempotency.isEventProcessed(event.eventId, trx)

if (existing) {
if (alreadyProcessed) {
await trx.commit()
return // Already processed
}

// Route to appropriate handler based on event type
await this.routeEvent(event, trx)

// Store event_id in processed_events table
await trx('processed_events').insert({
event_id: event.eventId,
transaction_hash: event.transactionHash,
event_index: event.eventIndex,
ledger_number: event.ledgerNumber,
processed_at: new Date(),
created_at: new Date()
})
// Store event status for idempotency
await this.idempotency.markEventProcessed(event, trx)

// Commit transaction
await trx.commit()
Expand Down
142 changes: 65 additions & 77 deletions src/services/idempotency.ts
Original file line number Diff line number Diff line change
@@ -1,90 +1,78 @@
import { createHash } from 'node:crypto'
import type { PoolClient } from 'pg'
import { getPgPool } from '../db/pool.js'
import { Knex } from 'knex'
import { ParsedEvent } from '../types/horizonSync.js'

interface IdempotencyRecord<T> {
key: string
requestHash: string
response: T
}

const memoryStore = new Map<string, IdempotencyRecord<unknown>>()
/**
* Idempotency Service
* Handles checking and recording of processed operations to ensure exactly-once execution.
*/
export class IdempotencyService {
private db: Knex

export class IdempotencyConflictError extends Error {
constructor() {
super('Idempotency key already exists with a different request payload.')
this.name = 'IdempotencyConflictError'
constructor(db: Knex) {
this.db = db
}
}

export const hashRequestPayload = (payload: unknown): string => {
return createHash('sha256').update(JSON.stringify(payload)).digest('hex')
}

export const getIdempotentResponse = async <T>(
key: string,
requestHash: string,
): Promise<T | null> => {
const pool = getPgPool()
if (pool) {
const result = await pool.query<{
request_hash: string
response: T
}>('SELECT request_hash, response FROM idempotency_keys WHERE key = $1', [key])

if (result.rowCount === 0) {
return null
}

const row = result.rows[0]
if (row.request_hash !== requestHash) {
throw new IdempotencyConflictError()
}

return row.response
/**
* Check if an event has already been processed.
*
* @param eventId - Unique ID of the event
* @param trx - Optional transaction to use for the check
* @returns Promise<boolean> - True if already processed
*/
async isEventProcessed(eventId: string, trx?: Knex.Transaction): Promise<boolean> {
const query = (trx || this.db)('processed_events')
.where({ event_id: eventId })
.first()

const result = await query
return !!result
}

const record = memoryStore.get(key)
if (!record) {
return null
/**
* Mark an event as processed in the database.
* MUST be called within a transaction that includes the business logic operations.
*
* @param event - The parsed event being processed
* @param trx - Transaction to use for recording
*/
async markEventProcessed(event: ParsedEvent, trx: Knex.Transaction): Promise<void> {
await trx('processed_events').insert({
event_id: event.eventId,
transaction_hash: event.transactionHash,
event_index: event.eventIndex,
ledger_number: event.ledgerNumber,
processed_at: new Date(),
created_at: new Date()
})
}

if (record.requestHash !== requestHash) {
throw new IdempotencyConflictError()
/**
* General-purpose idempotency check for API requests.
* Checks the idempotency_keys table.
*
* @param key - The idempotency key provided by the client
* @returns Promise<any | null> - The stored response if found, null otherwise
*/
async getStoredResponse(key: string): Promise<any | null> {
const record = await this.db('idempotency_keys')
.where({ key })
.first()

return record ? record.response : null
}

return record.response as T
}

export const saveIdempotentResponse = async <T>(
key: string,
requestHash: string,
vaultId: string,
response: T,
client?: PoolClient,
): Promise<void> => {
if (client) {
await client.query(
'INSERT INTO idempotency_keys (key, request_hash, vault_id, response) VALUES ($1, $2, $3, $4::jsonb)',
[key, requestHash, vaultId, JSON.stringify(response)],
)
return
}

const pool = getPgPool()
if (pool) {
await pool.query('INSERT INTO idempotency_keys (key, request_hash, vault_id, response) VALUES ($1, $2, $3, $4::jsonb)', [
/**
* Store a response for a given idempotency key.
*
* @param key - The idempotency key
* @param response - The response payload to store
* @param trx - Optional transaction
*/
async storeResponse(key: string, response: any, trx?: Knex.Transaction): Promise<void> {
await (trx || this.db)('idempotency_keys').insert({
key,
requestHash,
vaultId,
JSON.stringify(response),
])
return
response: typeof response === 'string' ? response : JSON.stringify(response),
created_at: new Date()
})
}

memoryStore.set(key, { key, requestHash, response })
}

export const resetIdempotencyStore = (): void => {
memoryStore.clear()
}
144 changes: 144 additions & 0 deletions src/tests/eventIdempotency.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import { describe, it, expect, beforeAll, afterAll, beforeEach } from '@jest/globals'
import knex, { Knex } from 'knex'
import { EventProcessor } from '../services/eventProcessor.js'
import { ParsedEvent } from '../types/horizonSync.js'

describe('Event Processor Idempotency', () => {
let db: Knex
let processor: EventProcessor

beforeAll(async () => {
db = knex({
client: 'pg',
connection: process.env.DATABASE_URL || 'postgresql://postgres:postgres@localhost:5432/disciplr_test'
})

// Ensure migrations are up to date
await db.migrate.latest()

processor = new EventProcessor(db, {
maxRetries: 3,
retryBackoffMs: 100
})
})

afterAll(async () => {
await db.destroy()
})

beforeEach(async () => {
// Clean tables
await db('validations').del()
await db('milestones').del()
await db('vaults').del()
await db('processed_events').del()
await db('failed_events').del()
})

it('should process a vault_created event and ignore duplicates', async () => {
const event: ParsedEvent = {
eventId: 'tx1:op0',
transactionHash: 'tx1',
eventIndex: 0,
ledgerNumber: 100,
eventType: 'vault_created',
payload: {
vaultId: 'vault-unique-1',
creator: 'GCREATOR',
amount: '100',
startTimestamp: new Date(),
endTimestamp: new Date(Date.now() + 100000),
successDestination: 'GSUCCESS',
failureDestination: 'GFAIL',
status: 'active'
}
}

// 1st processing
const result1 = await processor.processEvent(event)
expect(result1.success).toBe(true)

const vaultCount1 = await db('vaults').where({ id: 'vault-unique-1' }).count('* as count').first()
expect(Number(vaultCount1?.count)).toBe(1)

// 2nd processing (duplicate)
const result2 = await processor.processEvent(event)
expect(result2.success).toBe(true)

const vaultCount2 = await db('vaults').where({ id: 'vault-unique-1' }).count('* as count').first()
expect(Number(vaultCount2?.count)).toBe(1) // Should still be 1

const processedEvents = await db('processed_events').where({ event_id: 'tx1:op0' }).count('* as count').first()
expect(Number(processedEvents?.count)).toBe(1)
})

it('should maintain idempotency for milestone creation', async () => {
// Create vault first
await db('vaults').insert({
id: 'vault-m',
creator: 'GCREATOR',
amount: '100',
start_timestamp: new Date(),
end_timestamp: new Date(Date.now() + 100000),
success_destination: 'GSUCCESS',
failure_destination: 'GFAIL',
status: 'active',
created_at: new Date()
})

const event: ParsedEvent = {
eventId: 'tx2:op1',
transactionHash: 'tx2',
eventIndex: 1,
ledgerNumber: 101,
eventType: 'milestone_created',
payload: {
milestoneId: 'ms-unique-1',
vaultId: 'vault-m',
title: 'Milestone 1',
description: 'First milestone',
targetAmount: '50',
deadline: new Date()
}
}

await processor.processEvent(event)
await processor.processEvent(event) // Duplicate

const milestoneCount = await db('milestones').where({ id: 'ms-unique-1' }).count('* as count').first()
expect(Number(milestoneCount?.count)).toBe(1)
})

it('should handle concurrent processing attempts gracefully', async () => {
const event: ParsedEvent = {
eventId: 'tx3:op0',
transactionHash: 'tx3',
eventIndex: 0,
ledgerNumber: 102,
eventType: 'vault_created',
payload: {
vaultId: 'vault-concurrent',
creator: 'GCREATOR',
amount: '100',
startTimestamp: new Date(),
endTimestamp: new Date(Date.now() + 100000),
successDestination: 'GSUCCESS',
failureDestination: 'GFAIL',
status: 'active'
}
}

// Fire off two processing attempts simultaneously
const [res1, res2] = await Promise.all([
processor.processEvent(event),
processor.processEvent(event)
])

// Both should report success (either it processed or it was a no-op due to already processed)
expect(res1.success).toBe(true)
expect(res2.success).toBe(true)

const vaultCount = await db('vaults').where({ id: 'vault-concurrent' }).count('* as count').first()
expect(Number(vaultCount?.count)).toBe(1)
})
})
Loading