diff --git a/keeper/QUICKSTART.md b/keeper/QUICKSTART.md index a4ae62f..cd14d4f 100644 --- a/keeper/QUICKSTART.md +++ b/keeper/QUICKSTART.md @@ -38,6 +38,8 @@ POLLING_INTERVAL_MS=10000 MAX_CONCURRENT_READS=10 MAX_CONCURRENT_EXECUTIONS=3 MAX_TASK_ID=100 +LOG_LEVEL=info +# LOG_FORMAT=pretty ``` ### Getting Your Keeper Secret @@ -116,18 +118,11 @@ The `SOROBAN_RPC_URL` is unreachable. Check your `.env` and network. npm start ``` -You should see output like: +You should see JSON logs like: ``` -Starting SoroTask Keeper... -Connected to Soroban RPC: https://rpc-futurenet.stellar.org -Keeper account: GXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX -Poller initialized with max concurrent reads: 10 -Starting polling loop with interval: 10000ms -[Keeper] Running initial poll... -[Keeper] Initial check: 100 tasks in registry -[Poller] Current ledger sequence: 12345 -[Poller] Poll complete in 234ms | Checked: 5 | Due: 2 | Skipped: 1 | Errors: 0 +{"level":"info","timestamp":"2026-01-01T12:00:00.000Z","service":"keeper","module":"keeper","message":"Starting SoroTask Keeper"} +{"level":"info","timestamp":"2026-01-01T12:00:01.000Z","service":"keeper","module":"poller","message":"Task is due","taskId":1} ``` ## Step 5: Monitor Execution diff --git a/keeper/README.md b/keeper/README.md index f95e7a9..75f74d5 100644 --- a/keeper/README.md +++ b/keeper/README.md @@ -43,6 +43,11 @@ MAX_TASK_ID=100 # Wait for transaction confirmation (default: true, set to 'false' to disable) WAIT_FOR_CONFIRMATION=true + +# Structured logging +LOG_LEVEL=info +# Optional: pretty console output for local development only +# LOG_FORMAT=pretty ``` ### Explanation of Variables: @@ -56,6 +61,8 @@ WAIT_FOR_CONFIRMATION=true - **`MAX_TASK_ID`**: The keeper will check task IDs from 1 to this value. Alternatively, use `TASK_IDS` to specify exact task IDs. - **`TASK_IDS`**: Optional comma-separated list of specific task IDs to monitor (e.g., "1,2,3,5"). If set, overrides `MAX_TASK_ID`. - **`WAIT_FOR_CONFIRMATION`**: Whether to wait for transaction confirmation after submitting. Set to 'false' for fire-and-forget mode. +- **`LOG_LEVEL`**: Minimum log severity to emit (`trace`, `debug`, `info`, `warn`, `error`, `fatal`). +- **`LOG_FORMAT`**: Optional log renderer. Leave unset for JSON logs; set to `pretty` for local human-readable output. ## Setup Instructions diff --git a/keeper/__tests__/account.test.js b/keeper/__tests__/account.test.js index 5e1fe54..1b4d92d 100644 --- a/keeper/__tests__/account.test.js +++ b/keeper/__tests__/account.test.js @@ -2,27 +2,27 @@ const { initializeKeeperAccount } = require('../src/account'); describe('Keeper Account Module', () => { - const originalEnv = process.env; + const originalEnv = process.env; - beforeEach(() => { - jest.resetModules(); - process.env = { ...originalEnv }; - }); + beforeEach(() => { + jest.resetModules(); + process.env = { ...originalEnv }; + }); - afterEach(() => { - process.env = originalEnv; - }); + afterEach(() => { + process.env = originalEnv; + }); - it('should throw error when KEEPER_SECRET is missing', async () => { - delete process.env.KEEPER_SECRET; + it('should throw error when KEEPER_SECRET is missing', async () => { + delete process.env.KEEPER_SECRET; - // Just test that it throws when secret is missing - await expect(initializeKeeperAccount()).rejects.toThrow(); - }); + // Just test that it throws when secret is missing + await expect(initializeKeeperAccount()).rejects.toThrow(); + }); - it('should have KEEPER_SECRET defined', () => { - process.env.KEEPER_SECRET = 'test-secret'; - // This test just verifies env is set correctly - expect(process.env.KEEPER_SECRET).toBe('test-secret'); - }); + it('should have KEEPER_SECRET defined', () => { + process.env.KEEPER_SECRET = 'test-secret'; + // This test just verifies env is set correctly + expect(process.env.KEEPER_SECRET).toBe('test-secret'); + }); }); diff --git a/keeper/__tests__/executor.test.js b/keeper/__tests__/executor.test.js index 82afdc0..cb3d4ce 100644 --- a/keeper/__tests__/executor.test.js +++ b/keeper/__tests__/executor.test.js @@ -6,7 +6,7 @@ */ // Create mock objects at module scope for jest.mock -const mockWithRetryImpl = jest.fn((fn, options) => fn().then(result => ({ +const mockWithRetryImpl = jest.fn((fn, _options) => fn().then(result => ({ success: true, result, attempts: 1, @@ -44,7 +44,7 @@ describe('Executor', () => { beforeEach(() => { jest.clearAllMocks(); - + mockConfig = { maxRetries: 3, retryBaseDelayMs: 1000, @@ -72,9 +72,9 @@ describe('Executor', () => { warn: jest.fn(), error: jest.fn(), }; - const executorWithLogger = createExecutor({ + const executorWithLogger = createExecutor({ logger: customLogger, - config: mockConfig + config: mockConfig, }); expect(executorWithLogger).toBeDefined(); }); @@ -83,9 +83,9 @@ describe('Executor', () => { describe('execute', () => { it('should execute task successfully', async () => { const task = { id: 1, name: 'test-task' }; - + const result = await executor.execute(task); - + expect(result.success).toBe(true); expect(result.result).toEqual({ taskId: 1, status: 'executed' }); expect(result.attempts).toBe(1); @@ -94,17 +94,17 @@ describe('Executor', () => { it('should include task ID in result', async () => { const task = { id: 42 }; - + const result = await executor.execute(task); - + expect(result.result.taskId).toBe(42); }); it('should track execution attempts', async () => { const task = { id: 1 }; - + const result = await executor.execute(task); - + expect(result.attempts).toBeGreaterThanOrEqual(1); expect(result.retries).toBeGreaterThanOrEqual(0); }); @@ -153,23 +153,23 @@ describe('Executor Integration with Retry', () => { it('should pass correct options to withRetry', async () => { const { createExecutor } = require('../src/executor'); - - mockWithRetry.mockImplementation((fn, options) => fn().then(result => ({ + + mockWithRetry.mockImplementation((fn, _options) => fn().then(result => ({ success: true, result, attempts: 1, retries: 0, }))); - + const config = { maxRetries: 5, retryBaseDelayMs: 500, maxRetryDelayMs: 10000, }; - + const executor = createExecutor({ config }); await executor.execute({ id: 1 }); - + expect(mockWithRetry).toHaveBeenCalled(); const options = mockWithRetry.mock.calls[0][1]; expect(options.maxRetries).toBe(5); @@ -179,17 +179,17 @@ describe('Executor Integration with Retry', () => { it('should use default retry options when config not provided', async () => { const { createExecutor } = require('../src/executor'); - - mockWithRetry.mockImplementation((fn, options) => fn().then(result => ({ + + mockWithRetry.mockImplementation((fn, _options) => fn().then(result => ({ success: true, result, attempts: 1, retries: 0, }))); - + const executor = createExecutor({}); await executor.execute({ id: 1 }); - + const options = mockWithRetry.mock.calls[0][1]; expect(options.maxRetries).toBe(3); expect(options.baseDelayMs).toBe(1000); @@ -198,7 +198,7 @@ describe('Executor Integration with Retry', () => { it('should call onRetry callback on retry', async () => { const { createExecutor } = require('../src/executor'); - + const onRetryMock = jest.fn(); mockWithRetry.mockImplementation((fn, options) => { if (options.onRetry) { @@ -211,16 +211,16 @@ describe('Executor Integration with Retry', () => { retries: 0, })); }); - + const executor = createExecutor({ config: { maxRetries: 3, onRetry: onRetryMock } }); await executor.execute({ id: 1 }); - + expect(mockWithRetry).toHaveBeenCalled(); }); it('should call onMaxRetries callback when max retries exceeded', async () => { const { createExecutor } = require('../src/executor'); - + mockWithRetry.mockImplementation((fn, options) => { if (options.onMaxRetries) { options.onMaxRetries(new Error('max retries'), 3); @@ -232,16 +232,16 @@ describe('Executor Integration with Retry', () => { retries: 0, })); }); - + const executor = createExecutor({ config: { maxRetries: 3 } }); await executor.execute({ id: 1 }); - + expect(mockWithRetry).toHaveBeenCalled(); }); it('should call onDuplicate callback for duplicate transactions', async () => { const { createExecutor } = require('../src/executor'); - + mockWithRetry.mockImplementation((fn, options) => { if (options.onDuplicate) { options.onDuplicate(); @@ -253,10 +253,10 @@ describe('Executor Integration with Retry', () => { retries: 0, })); }); - + const executor = createExecutor({ config: { maxRetries: 3 } }); await executor.execute({ id: 1 }); - + expect(mockWithRetry).toHaveBeenCalled(); }); }); @@ -282,22 +282,6 @@ describe('executeTask', () => { it('executeTask should be callable with correct parameters', async () => { // This test verifies that executeTask can be called // In a real environment with actual SDK, this would test the full flow - const mockServer = { - simulateTransaction: jest.fn().mockResolvedValue({ results: [] }), - sendTransaction: jest.fn().mockResolvedValue({ hash: 'test123', status: 'PENDING' }), - getTransaction: jest.fn().mockResolvedValue({ status: 'SUCCESS' }), - }; - - const mockKeypair = { - publicKey: jest.fn().mockReturnValue('GPUB123'), - sign: jest.fn(), - }; - - const mockAccount = { - accountId: jest.fn().mockReturnValue('GPUB123'), - sequenceNumber: jest.fn().mockReturnValue('1'), - }; - // The function should accept these parameters without throwing // Actual execution would require real SDK expect(() => { @@ -315,7 +299,7 @@ describe('executeTask', () => { feePaid: 100, error: null, }; - + expect(mockResult).toMatchObject({ taskId: expect.any(Number), txHash: expect.any(String), @@ -329,7 +313,7 @@ describe('executeTask', () => { // Verify the polling behavior is documented const POLL_ATTEMPTS = 30; const POLL_INTERVAL_MS = 2000; - + expect(POLL_ATTEMPTS).toBe(30); expect(POLL_INTERVAL_MS).toBe(2000); }); diff --git a/keeper/__tests__/gasMonitor.test.js b/keeper/__tests__/gasMonitor.test.js index 609013a..a04a730 100644 --- a/keeper/__tests__/gasMonitor.test.js +++ b/keeper/__tests__/gasMonitor.test.js @@ -2,33 +2,33 @@ const { GasMonitor } = require('../src/gasMonitor'); describe('GasMonitor', () => { - let gasMonitor; + let gasMonitor; - beforeEach(() => { - gasMonitor = new GasMonitor(); - }); + beforeEach(() => { + gasMonitor = new GasMonitor(); + }); - it('should create GasMonitor instance', () => { - expect(gasMonitor).toBeDefined(); - }); + it('should create GasMonitor instance', () => { + expect(gasMonitor).toBeDefined(); + }); - it('should have default threshold', () => { - expect(gasMonitor.GAS_WARN_THRESHOLD).toBeDefined(); - }); + it('should have default threshold', () => { + expect(gasMonitor.GAS_WARN_THRESHOLD).toBeDefined(); + }); - it('should get low gas count', () => { - const count = gasMonitor.getLowGasCount(); - expect(typeof count).toBe('number'); - }); + it('should get low gas count', () => { + const count = gasMonitor.getLowGasCount(); + expect(typeof count).toBe('number'); + }); - it('should get config', () => { - const config = gasMonitor.getConfig(); - expect(config).toBeDefined(); - expect(config.gasWarnThreshold).toBeDefined(); - }); + it('should get config', () => { + const config = gasMonitor.getConfig(); + expect(config).toBeDefined(); + expect(config.gasWarnThreshold).toBeDefined(); + }); - it('should check gas balance without throwing', async () => { - const result = await gasMonitor.checkGasBalance('task1', 100); - expect(typeof result).toBe('boolean'); - }); + it('should check gas balance without throwing', async () => { + const result = await gasMonitor.checkGasBalance('task1', 100); + expect(typeof result).toBe('boolean'); + }); }); diff --git a/keeper/__tests__/integration.test.js b/keeper/__tests__/integration.test.js index 89ee4c4..33e5ea4 100644 --- a/keeper/__tests__/integration.test.js +++ b/keeper/__tests__/integration.test.js @@ -1,6 +1,6 @@ /** * Integration Tests for SoroTask Keeper - * + * * Tests the full workflow: registry -> poller -> queue -> executor * with mocked Soroban RPC responses. */ @@ -8,340 +8,342 @@ const TaskRegistry = require('../src/registry'); const TaskPoller = require('../src/poller'); const { ExecutionQueue } = require('../src/queue'); -const { xdr } = require('@stellar/stellar-sdk'); +const { Account, xdr } = require('@stellar/stellar-sdk'); // Mock fs for registry tests jest.mock('fs'); const fs = require('fs'); describe('Keeper Integration Tests', () => { - let mockServer; - let registry; - let poller; - let queue; - - // Helper to create TaskRegistered events - function makeTaskRegisteredEvent(taskId, ledger) { - const topic0 = xdr.ScVal.scvSymbol('TaskRegistered').toXDR('base64'); - const topic1 = xdr.ScVal.scvU64(xdr.Uint64.fromString(String(taskId))).toXDR('base64'); - return { - topic: [topic0, topic1], - ledger, - }; + let mockServer; + let registry; + let poller; + let queue; + + // Helper to create TaskRegistered events + function makeTaskRegisteredEvent(taskId, ledger) { + const topic0 = xdr.ScVal.scvSymbol('TaskRegistered').toXDR('base64'); + const topic1 = xdr.ScVal.scvU64(xdr.Uint64.fromString(String(taskId))).toXDR('base64'); + return { + topic: [topic0, topic1], + ledger, + }; + } + + // Helper to create mock TaskConfig XDR response + function makeTaskConfigXDR(taskConfig) { + const mapEntries = []; + + if (taskConfig.last_run !== undefined) { + mapEntries.push(new xdr.ScMapEntry({ + key: xdr.ScVal.scvSymbol('last_run'), + val: xdr.ScVal.scvU64(xdr.Uint64.fromString(String(taskConfig.last_run))), + })); } - - // Helper to create mock TaskConfig XDR response - function makeTaskConfigXDR(taskConfig) { - const map = new xdr.ScMap([]); - - if (taskConfig.last_run !== undefined) { - map.push({ - key: xdr.ScVal.scvSymbol('last_run'), - val: xdr.ScVal.scvU64(xdr.Uint64.fromString(String(taskConfig.last_run))), - }); - } - if (taskConfig.interval !== undefined) { - map.push({ - key: xdr.ScVal.scvSymbol('interval'), - val: xdr.ScVal.scvU64(xdr.Uint64.fromString(String(taskConfig.interval))), - }); - } - if (taskConfig.gas_balance !== undefined) { - map.push({ - key: xdr.ScVal.scvSymbol('gas_balance'), - val: xdr.ScVal.scvU64(xdr.Uint64.fromString(String(taskConfig.gas_balance))), - }); - } - - return xdr.ScVal.scvVec([xdr.ScVal.scvMap(map)]); + if (taskConfig.interval !== undefined) { + mapEntries.push(new xdr.ScMapEntry({ + key: xdr.ScVal.scvSymbol('interval'), + val: xdr.ScVal.scvU64(xdr.Uint64.fromString(String(taskConfig.interval))), + })); + } + if (taskConfig.gas_balance !== undefined) { + mapEntries.push(new xdr.ScMapEntry({ + key: xdr.ScVal.scvSymbol('gas_balance'), + val: xdr.ScVal.scvU64(xdr.Uint64.fromString(String(taskConfig.gas_balance))), + })); } - beforeEach(() => { - jest.clearAllMocks(); - fs.existsSync.mockReturnValue(false); - fs.mkdirSync.mockReturnValue(undefined); - fs.writeFileSync.mockReturnValue(undefined); - fs.readFileSync.mockReturnValue('{}'); - - // Mock Soroban server - mockServer = { - getLatestLedger: jest.fn().mockResolvedValue({ sequence: 1000 }), - getEvents: jest.fn().mockResolvedValue({ events: [] }), - getAccount: jest.fn().mockResolvedValue({ - accountId: jest.fn().mockReturnValue('GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWHF'), - sequenceNumber: jest.fn().mockReturnValue('12345'), - }), - simulateTransaction: jest.fn(), - }; + return xdr.ScVal.scvVec([xdr.ScVal.scvMap(mapEntries)]); + } + + beforeEach(() => { + jest.clearAllMocks(); + fs.existsSync.mockReturnValue(false); + fs.mkdirSync.mockReturnValue(undefined); + fs.writeFileSync.mockReturnValue(undefined); + fs.readFileSync.mockReturnValue('{}'); + + // Mock Soroban server + mockServer = { + getLatestLedger: jest.fn().mockResolvedValue({ sequence: 1000 }), + getEvents: jest.fn().mockResolvedValue({ events: [] }), + getAccount: jest.fn().mockResolvedValue( + new Account( + 'GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWHF', + '12345', + ), + ), + simulateTransaction: jest.fn(), + }; + }); + + afterEach(async () => { + if (queue) { + await queue.drain(); + } + }); + + describe('Full Workflow: Registry -> Poller -> Queue', () => { + it('should discover tasks, poll for due tasks, and enqueue them', async () => { + // Setup: Registry discovers tasks from events + const events = [ + makeTaskRegisteredEvent(1, 900), + makeTaskRegisteredEvent(2, 910), + ]; + mockServer.getEvents.mockResolvedValue({ events }); + + registry = new TaskRegistry(mockServer, 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', { + startLedger: 800, + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + }); + await registry.init(); + + expect(registry.getTaskIds()).toEqual([1, 2]); + + // Setup: Poller checks tasks with different states + mockServer.simulateTransaction + .mockResolvedValueOnce({ + results: [{ + retval: makeTaskConfigXDR({ last_run: 500, interval: 400, gas_balance: 1000 }), + }], + }) // Task 1: Due (500 + 400 <= 1000) + .mockResolvedValueOnce({ + results: [{ + retval: makeTaskConfigXDR({ last_run: 800, interval: 300, gas_balance: 1000 }), + }], + }); // Task 2: Not due (800 + 300 > 1000) + + poller = new TaskPoller(mockServer, 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', { + maxConcurrentReads: 5, + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + }); + + // Poll for due tasks + const dueTaskIds = await poller.pollDueTasks(registry.getTaskIds()); + + expect(dueTaskIds).toEqual([1]); + expect(poller.stats.tasksDue).toBe(1); + expect(poller.stats.tasksChecked).toBe(2); + + // Setup: Queue executes due tasks + queue = new ExecutionQueue(3); + const executedTasks = []; + const executorFn = jest.fn(async (taskId) => { + executedTasks.push(taskId); + }); + + await queue.enqueue(dueTaskIds, executorFn); + + expect(executedTasks).toEqual([1]); + expect(executorFn).toHaveBeenCalledTimes(1); }); - afterEach(async () => { - if (queue) { - await queue.drain(); - } - }); + it('should skip tasks with zero gas balance', async () => { + // Registry discovers task + const events = [makeTaskRegisteredEvent(1, 900)]; + mockServer.getEvents.mockResolvedValue({ events }); - describe('Full Workflow: Registry -> Poller -> Queue', () => { - it('should discover tasks, poll for due tasks, and enqueue them', async () => { - // Setup: Registry discovers tasks from events - const events = [ - makeTaskRegisteredEvent(1, 900), - makeTaskRegisteredEvent(2, 910), - ]; - mockServer.getEvents.mockResolvedValue({ events }); - - registry = new TaskRegistry(mockServer, 'CABC123', { - startLedger: 800, - logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() } - }); - await registry.init(); - - expect(registry.getTaskIds()).toEqual([1, 2]); - - // Setup: Poller checks tasks with different states - mockServer.simulateTransaction - .mockResolvedValueOnce({ - results: [{ - retval: makeTaskConfigXDR({ last_run: 500, interval: 400, gas_balance: 1000 }) - }] - }) // Task 1: Due (500 + 400 <= 1000) - .mockResolvedValueOnce({ - results: [{ - retval: makeTaskConfigXDR({ last_run: 800, interval: 300, gas_balance: 1000 }) - }] - }); // Task 2: Not due (800 + 300 > 1000) - - poller = new TaskPoller(mockServer, 'CABC123', { - maxConcurrentReads: 5, - logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() } - }); - - // Poll for due tasks - const dueTaskIds = await poller.pollDueTasks(registry.getTaskIds()); - - expect(dueTaskIds).toEqual([1]); - expect(poller.stats.tasksDue).toBe(1); - expect(poller.stats.tasksChecked).toBe(2); - - // Setup: Queue executes due tasks - queue = new ExecutionQueue(3); - const executedTasks = []; - const executorFn = jest.fn(async (taskId) => { - executedTasks.push(taskId); - }); - - await queue.enqueue(dueTaskIds, executorFn); - - expect(executedTasks).toEqual([1]); - expect(executorFn).toHaveBeenCalledTimes(1); - }); + registry = new TaskRegistry(mockServer, 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', { + startLedger: 800, + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + }); + await registry.init(); - it('should skip tasks with zero gas balance', async () => { - // Registry discovers task - const events = [makeTaskRegisteredEvent(1, 900)]; - mockServer.getEvents.mockResolvedValue({ events }); + // Task has zero gas balance + mockServer.simulateTransaction.mockResolvedValue({ + results: [{ + retval: makeTaskConfigXDR({ last_run: 500, interval: 100, gas_balance: 0 }), + }], + }); - registry = new TaskRegistry(mockServer, 'CABC123', { - startLedger: 800, - logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() } - }); - await registry.init(); + poller = new TaskPoller(mockServer, 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', { + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + }); - // Task has zero gas balance - mockServer.simulateTransaction.mockResolvedValue({ - results: [{ - retval: makeTaskConfigXDR({ last_run: 500, interval: 100, gas_balance: 0 }) - }] - }); - - poller = new TaskPoller(mockServer, 'CABC123', { - logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() } - }); - - const dueTaskIds = await poller.pollDueTasks(registry.getTaskIds()); - - expect(dueTaskIds).toEqual([]); - expect(poller.stats.tasksSkipped).toBe(1); - }); + const dueTaskIds = await poller.pollDueTasks(registry.getTaskIds()); - it('should handle multiple polling cycles', async () => { - // Initial registry setup - mockServer.getEvents.mockResolvedValueOnce({ events: [] }); - - registry = new TaskRegistry(mockServer, 'CABC123', { - startLedger: 800, - logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() } - }); - await registry.init(); - - // First cycle: No tasks - poller = new TaskPoller(mockServer, 'CABC123', { - logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() } - }); - - let dueTaskIds = await poller.pollDueTasks(registry.getTaskIds()); - expect(dueTaskIds).toEqual([]); - - // New task registered - mockServer.getEvents.mockResolvedValueOnce({ - events: [makeTaskRegisteredEvent(1, 950)] - }); - await registry.poll(); - - expect(registry.getTaskIds()).toEqual([1]); - - // Second cycle: Task is due - mockServer.simulateTransaction.mockResolvedValue({ - results: [{ - retval: makeTaskConfigXDR({ last_run: 900, interval: 100, gas_balance: 1000 }) - }] - }); - - dueTaskIds = await poller.pollDueTasks(registry.getTaskIds()); - expect(dueTaskIds).toEqual([1]); - }); + expect(dueTaskIds).toEqual([]); + expect(poller.stats.tasksSkipped).toBe(1); }); - describe('Error Handling Integration', () => { - it('should continue polling when individual task check fails', async () => { - // Registry with two tasks - const events = [ - makeTaskRegisteredEvent(1, 900), - makeTaskRegisteredEvent(2, 910), - ]; - mockServer.getEvents.mockResolvedValue({ events }); - - registry = new TaskRegistry(mockServer, 'CABC123', { - startLedger: 800, - logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() } - }); - await registry.init(); - - // First task fails, second succeeds - mockServer.simulateTransaction - .mockRejectedValueOnce(new Error('RPC error')) - .mockResolvedValueOnce({ - results: [{ - retval: makeTaskConfigXDR({ last_run: 500, interval: 400, gas_balance: 1000 }) - }] - }); - - poller = new TaskPoller(mockServer, 'CABC123', { - logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() } - }); - - const dueTaskIds = await poller.pollDueTasks(registry.getTaskIds()); - - expect(dueTaskIds).toEqual([2]); - expect(poller.stats.errors).toBe(1); - expect(poller.stats.tasksDue).toBe(1); + it('should handle multiple polling cycles', async () => { + // Initial registry setup + mockServer.getEvents.mockResolvedValueOnce({ events: [] }); + + registry = new TaskRegistry(mockServer, 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', { + startLedger: 800, + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + }); + await registry.init(); + + // First cycle: No tasks + poller = new TaskPoller(mockServer, 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', { + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + }); + + let dueTaskIds = await poller.pollDueTasks(registry.getTaskIds()); + expect(dueTaskIds).toEqual([]); + + // New task registered + mockServer.getEvents.mockResolvedValueOnce({ + events: [makeTaskRegisteredEvent(1, 950)], + }); + await registry.poll(); + + expect(registry.getTaskIds()).toEqual([1]); + + // Second cycle: Task is due + mockServer.simulateTransaction.mockResolvedValue({ + results: [{ + retval: makeTaskConfigXDR({ last_run: 900, interval: 100, gas_balance: 1000 }), + }], + }); + + dueTaskIds = await poller.pollDueTasks(registry.getTaskIds()); + expect(dueTaskIds).toEqual([1]); + }); + }); + + describe('Error Handling Integration', () => { + it('should continue polling when individual task check fails', async () => { + // Registry with two tasks + const events = [ + makeTaskRegisteredEvent(1, 900), + makeTaskRegisteredEvent(2, 910), + ]; + mockServer.getEvents.mockResolvedValue({ events }); + + registry = new TaskRegistry(mockServer, 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', { + startLedger: 800, + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + }); + await registry.init(); + + // First task fails, second succeeds + mockServer.simulateTransaction + .mockRejectedValueOnce(new Error('RPC error')) + .mockResolvedValueOnce({ + results: [{ + retval: makeTaskConfigXDR({ last_run: 500, interval: 400, gas_balance: 1000 }), + }], }); - it('should handle RPC failures gracefully', async () => { - mockServer.getEvents.mockRejectedValue(new Error('RPC unavailable')); + poller = new TaskPoller(mockServer, 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', { + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + }); - registry = new TaskRegistry(mockServer, 'CABC123', { - startLedger: 800, - logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() } - }); + const dueTaskIds = await poller.pollDueTasks(registry.getTaskIds()); - // Should not throw - await expect(registry.init()).resolves.not.toThrow(); - expect(registry.getTaskIds()).toEqual([]); - }); + expect(dueTaskIds).toEqual([2]); + expect(poller.stats.errors).toBe(1); + expect(poller.stats.tasksDue).toBe(1); }); - describe('Concurrent Execution Integration', () => { - it('should respect concurrency limits across components', async () => { - // Setup multiple tasks - const events = [ - makeTaskRegisteredEvent(1, 900), - makeTaskRegisteredEvent(2, 910), - makeTaskRegisteredEvent(3, 920), - makeTaskRegisteredEvent(4, 930), - ]; - mockServer.getEvents.mockResolvedValue({ events }); - - registry = new TaskRegistry(mockServer, 'CABC123', { - startLedger: 800, - logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() } - }); - await registry.init(); - - // All tasks are due - mockServer.simulateTransaction.mockResolvedValue({ - results: [{ - retval: makeTaskConfigXDR({ last_run: 500, interval: 400, gas_balance: 1000 }) - }] - }); - - poller = new TaskPoller(mockServer, 'CABC123', { - maxConcurrentReads: 2, - logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() } - }); - - const dueTaskIds = await poller.pollDueTasks(registry.getTaskIds()); - expect(dueTaskIds).toHaveLength(4); - - // Queue with concurrency limit of 2 - queue = new ExecutionQueue(2); - let concurrentExecutions = 0; - let maxConcurrent = 0; - - const slowExecutor = jest.fn(async () => { - concurrentExecutions++; - maxConcurrent = Math.max(maxConcurrent, concurrentExecutions); - await new Promise(resolve => setTimeout(resolve, 50)); - concurrentExecutions--; - }); - - await queue.enqueue(dueTaskIds, slowExecutor); - - expect(maxConcurrent).toBeLessThanOrEqual(2); - expect(slowExecutor).toHaveBeenCalledTimes(4); - }); + it('should handle RPC failures gracefully', async () => { + mockServer.getEvents.mockRejectedValue(new Error('RPC unavailable')); + + registry = new TaskRegistry(mockServer, 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', { + startLedger: 800, + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + }); + + // Should not throw + await expect(registry.init()).resolves.not.toThrow(); + expect(registry.getTaskIds()).toEqual([]); }); + }); + + describe('Concurrent Execution Integration', () => { + it('should respect concurrency limits across components', async () => { + // Setup multiple tasks + const events = [ + makeTaskRegisteredEvent(1, 900), + makeTaskRegisteredEvent(2, 910), + makeTaskRegisteredEvent(3, 920), + makeTaskRegisteredEvent(4, 930), + ]; + mockServer.getEvents.mockResolvedValue({ events }); + + registry = new TaskRegistry(mockServer, 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', { + startLedger: 800, + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + }); + await registry.init(); + + // All tasks are due + mockServer.simulateTransaction.mockResolvedValue({ + results: [{ + retval: makeTaskConfigXDR({ last_run: 500, interval: 400, gas_balance: 1000 }), + }], + }); + + poller = new TaskPoller(mockServer, 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', { + maxConcurrentReads: 2, + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + }); + + const dueTaskIds = await poller.pollDueTasks(registry.getTaskIds()); + expect(dueTaskIds).toHaveLength(4); + + // Queue with concurrency limit of 2 + queue = new ExecutionQueue(2); + let concurrentExecutions = 0; + let maxConcurrent = 0; + + const slowExecutor = jest.fn(async () => { + concurrentExecutions++; + maxConcurrent = Math.max(maxConcurrent, concurrentExecutions); + await new Promise(resolve => setTimeout(resolve, 50)); + concurrentExecutions--; + }); + + await queue.enqueue(dueTaskIds, slowExecutor); + + expect(maxConcurrent).toBeLessThanOrEqual(2); + expect(slowExecutor).toHaveBeenCalledTimes(4); + }); + }); - describe('Graceful Shutdown Integration', () => { - it('should drain queue on shutdown signal', async () => { - const events = [makeTaskRegisteredEvent(1, 900)]; - mockServer.getEvents.mockResolvedValue({ events }); + describe('Graceful Shutdown Integration', () => { + it('should drain queue on shutdown signal', async () => { + const events = [makeTaskRegisteredEvent(1, 900)]; + mockServer.getEvents.mockResolvedValue({ events }); - registry = new TaskRegistry(mockServer, 'CABC123', { - startLedger: 800, - logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() } - }); - await registry.init(); + registry = new TaskRegistry(mockServer, 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', { + startLedger: 800, + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + }); + await registry.init(); - mockServer.simulateTransaction.mockResolvedValue({ - results: [{ - retval: makeTaskConfigXDR({ last_run: 500, interval: 400, gas_balance: 1000 }) - }] - }); + mockServer.simulateTransaction.mockResolvedValue({ + results: [{ + retval: makeTaskConfigXDR({ last_run: 500, interval: 400, gas_balance: 1000 }), + }], + }); - poller = new TaskPoller(mockServer, 'CABC123', { - logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() } - }); + poller = new TaskPoller(mockServer, 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM', { + logger: { info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + }); - const dueTaskIds = await poller.pollDueTasks(registry.getTaskIds()); + const dueTaskIds = await poller.pollDueTasks(registry.getTaskIds()); - queue = new ExecutionQueue(1); - let taskCompleted = false; + queue = new ExecutionQueue(1); + let taskCompleted = false; - const slowExecutor = jest.fn(async () => { - await new Promise(resolve => setTimeout(resolve, 100)); - taskCompleted = true; - }); + const slowExecutor = jest.fn(async () => { + await new Promise(resolve => setTimeout(resolve, 100)); + taskCompleted = true; + }); - // Start execution - const enqueuePromise = queue.enqueue(dueTaskIds, slowExecutor); + // Start execution + const enqueuePromise = queue.enqueue(dueTaskIds, slowExecutor); - // Simulate shutdown - setTimeout(() => queue.drain(), 50); + // Simulate shutdown + setTimeout(() => queue.drain(), 50); - await enqueuePromise; + await enqueuePromise; - expect(taskCompleted).toBe(true); - }); + expect(taskCompleted).toBe(true); }); + }); }); diff --git a/keeper/__tests__/logger.test.js b/keeper/__tests__/logger.test.js index 6c85b31..b31effd 100644 --- a/keeper/__tests__/logger.test.js +++ b/keeper/__tests__/logger.test.js @@ -2,20 +2,18 @@ * Unit tests for logger.js - Structured logging with pino */ -const { - createLogger, +const { + createLogger, createChildLogger, - getBaseLogger, reinitializeLogger, - getLogLevel, - setLogLevel, - SENSITIVE_FIELDS + normalizeLogLevel, + SENSITIVE_FIELDS, } = require('../src/logger.js'); describe('Logger', () => { // Store original env vars const originalEnv = process.env; - + beforeEach(() => { // Reset modules to get fresh logger instances jest.resetModules(); @@ -32,7 +30,7 @@ describe('Logger', () => { describe('createLogger', () => { it('should create a logger with all log level methods', () => { const logger = createLogger('test'); - + expect(typeof logger.trace).toBe('function'); expect(typeof logger.debug).toBe('function'); expect(typeof logger.info).toBe('function'); @@ -56,7 +54,7 @@ describe('Logger', () => { it('should be an alias for createLogger', () => { const logger1 = createLogger('test'); const logger2 = createChildLogger('test'); - + expect(typeof logger1.info).toBe('function'); expect(typeof logger2.info).toBe('function'); }); @@ -74,7 +72,7 @@ describe('Logger', () => { jest.resetModules(); const { createLogger: freshCreateLogger } = require('../src/logger.js'); const logger = freshCreateLogger('test'); - + expect(logger.raw.level).toBe('debug'); }); @@ -83,9 +81,14 @@ describe('Logger', () => { jest.resetModules(); const { createLogger: freshCreateLogger } = require('../src/logger.js'); const logger = freshCreateLogger('test'); - + expect(logger.raw.level).toBe('error'); }); + + it('should fall back to info for invalid LOG_LEVEL values', () => { + expect(normalizeLogLevel('verbose')).toBe('info'); + expect(normalizeLogLevel('debug')).toBe('debug'); + }); }); describe('getLogLevel and setLogLevel', () => { @@ -93,13 +96,13 @@ describe('Logger', () => { process.env.LOG_LEVEL = 'warn'; jest.resetModules(); const { getLogLevel: freshGetLogLevel } = require('../src/logger.js'); - + expect(freshGetLogLevel()).toBe('warn'); }); it('should set log level dynamically', () => { const { setLogLevel: freshSetLogLevel, getLogLevel: freshGetLogLevel } = require('../src/logger.js'); - + freshSetLogLevel('debug'); expect(freshGetLogLevel()).toBe('debug'); }); @@ -108,7 +111,7 @@ describe('Logger', () => { describe('reinitializeLogger', () => { it('should allow reinitializing with new options', () => { const { reinitializeLogger: freshReinit, getLogLevel: freshGetLevel } = require('../src/logger.js'); - + freshReinit({ level: 'trace' }); expect(freshGetLevel()).toBe('trace'); }); @@ -130,29 +133,54 @@ describe('Logger', () => { }); }); - describe('development mode pretty printing', () => { - it('should not have transport in production mode', () => { - process.env.NODE_ENV = 'production'; + describe('log format selection', () => { + it('should default to JSON even in development mode', () => { + process.env.NODE_ENV = 'development'; jest.resetModules(); const { createLogger: freshCreateLogger, getBaseLogger: freshGetBase } = require('../src/logger.js'); - + freshCreateLogger('test'); const baseLogger = freshGetBase(); - - // In production, there should be no transport (pretty printing) - expect(baseLogger.transport).toBeUndefined(); + + expect(baseLogger.logFormat).toBe('json'); }); - it('should configure transport in development mode', () => { - process.env.NODE_ENV = 'development'; + it('should configure pretty transport only when LOG_FORMAT=pretty', () => { + process.env.LOG_FORMAT = 'pretty'; jest.resetModules(); const { createLogger: freshCreateLogger, getBaseLogger: freshGetBase } = require('../src/logger.js'); - + freshCreateLogger('test'); const baseLogger = freshGetBase(); - - // In development, pino-pretty transport should be configured - expect(baseLogger.transport).toBeDefined(); + + expect(baseLogger.logFormat).toBe('pretty'); + }); + }); + + describe('JSON output', () => { + it('should emit JSON logs with normalized fields and metadata', () => { + const chunks = []; + const destination = { + write(chunk) { + chunks.push(chunk.toString()); + }, + }; + + reinitializeLogger({ destination }); + const logger = createLogger('poller'); + logger.info('Task is due', { taskId: 42, dueInSeconds: 0 }); + + const payload = JSON.parse(chunks[0]); + expect(payload).toMatchObject({ + level: 'info', + message: 'Task is due', + module: 'poller', + service: 'keeper', + taskId: 42, + dueInSeconds: 0, + }); + expect(payload.timestamp).toBeDefined(); + expect(typeof payload.timestamp).toBe('string'); }); }); @@ -164,7 +192,7 @@ describe('Logger', () => { jest.resetModules(); const loggerModule = require('../src/logger.js'); logger = loggerModule.createLogger('test'); - + // Spy on the raw logger methods baseLoggerSpy = { trace: jest.spyOn(logger.raw, 'trace').mockImplementation(() => {}), @@ -220,10 +248,10 @@ describe('Logger', () => { it('should return same base logger instance', () => { jest.resetModules(); const { getBaseLogger: freshGetBase } = require('../src/logger.js'); - + const base1 = freshGetBase(); const base2 = freshGetBase(); - + expect(base1).toBe(base2); }); }); @@ -232,20 +260,20 @@ describe('Logger', () => { it('should have module binding in child logger', () => { jest.resetModules(); const { createLogger: freshCreateLogger } = require('../src/logger.js'); - + const logger = freshCreateLogger('poller'); const bindings = logger.raw.bindings(); - + expect(bindings.module).toBe('poller'); }); it('should have different modules for different child loggers', () => { jest.resetModules(); const { createLogger: freshCreateLogger } = require('../src/logger.js'); - + const pollerLogger = freshCreateLogger('poller'); const registryLogger = freshCreateLogger('registry'); - + expect(pollerLogger.raw.bindings().module).toBe('poller'); expect(registryLogger.raw.bindings().module).toBe('registry'); }); diff --git a/keeper/__tests__/metrics.test.js b/keeper/__tests__/metrics.test.js index 95a8322..b1e5f0e 100644 --- a/keeper/__tests__/metrics.test.js +++ b/keeper/__tests__/metrics.test.js @@ -2,39 +2,39 @@ const { Metrics } = require('../src/metrics'); describe('Metrics', () => { - let metrics; - - beforeEach(() => { - metrics = new Metrics(); - }); - - it('should create Metrics instance', () => { - expect(metrics).toBeDefined(); - }); - - it('should have counters object', () => { - expect(metrics.counters).toBeDefined(); - expect(typeof metrics.counters).toBe('object'); - }); - - it('should have gauges object', () => { - expect(metrics.gauges).toBeDefined(); - expect(typeof metrics.gauges).toBe('object'); - }); - - it('should increment counter', () => { - metrics.increment('tasksCheckedTotal'); - expect(metrics.counters.tasksCheckedTotal).toBe(1); - }); - - it('should record gauge value', () => { - metrics.record('lastCycleDurationMs', 100); - expect(metrics.gauges.lastCycleDurationMs).toBe(100); - }); - - it('should return snapshot', () => { - const snapshot = metrics.snapshot(); - expect(snapshot).toBeDefined(); - expect(typeof snapshot).toBe('object'); - }); + let metrics; + + beforeEach(() => { + metrics = new Metrics(); + }); + + it('should create Metrics instance', () => { + expect(metrics).toBeDefined(); + }); + + it('should have counters object', () => { + expect(metrics.counters).toBeDefined(); + expect(typeof metrics.counters).toBe('object'); + }); + + it('should have gauges object', () => { + expect(metrics.gauges).toBeDefined(); + expect(typeof metrics.gauges).toBe('object'); + }); + + it('should increment counter', () => { + metrics.increment('tasksCheckedTotal'); + expect(metrics.counters.tasksCheckedTotal).toBe(1); + }); + + it('should record gauge value', () => { + metrics.record('lastCycleDurationMs', 100); + expect(metrics.gauges.lastCycleDurationMs).toBe(100); + }); + + it('should return snapshot', () => { + const snapshot = metrics.snapshot(); + expect(snapshot).toBeDefined(); + expect(typeof snapshot).toBe('object'); + }); }); diff --git a/keeper/__tests__/poller.test.js b/keeper/__tests__/poller.test.js index f5d2374..68db755 100644 --- a/keeper/__tests__/poller.test.js +++ b/keeper/__tests__/poller.test.js @@ -1,243 +1,243 @@ const TaskPoller = require('../src/poller'); describe('TaskPoller', () => { - let mockServer; - let poller; - const contractId = 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM'; + let mockServer; + let poller; + const contractId = 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM'; + + beforeEach(() => { + // Mock Soroban server + mockServer = { + getLatestLedger: jest.fn(), + getAccount: jest.fn(), + simulateTransaction: jest.fn(), + }; + + poller = new TaskPoller(mockServer, contractId, { + maxConcurrentReads: 5, + }); + }); + + describe('constructor', () => { + it('should initialize with default values', () => { + const defaultPoller = new TaskPoller(mockServer, contractId); + expect(defaultPoller.maxConcurrentReads).toBe(10); + expect(defaultPoller.contractId).toBe(contractId); + }); + + it('should use custom maxConcurrentReads', () => { + expect(poller.maxConcurrentReads).toBe(5); + }); + + it('should initialize stats', () => { + expect(poller.stats).toEqual({ + lastPollTime: null, + tasksChecked: 0, + tasksDue: 0, + tasksSkipped: 0, + errors: 0, + }); + }); + }); + describe('pollDueTasks', () => { beforeEach(() => { - // Mock Soroban server - mockServer = { - getLatestLedger: jest.fn(), - getAccount: jest.fn(), - simulateTransaction: jest.fn() - }; + mockServer.getLatestLedger.mockResolvedValue({ + sequence: 1000, + }); + }); + + it('should return empty array when no task IDs provided', async () => { + const result = await poller.pollDueTasks([]); + expect(result).toEqual([]); + }); + + it('should return empty array when taskIds is null', async () => { + const result = await poller.pollDueTasks(null); + expect(result).toEqual([]); + }); + + it('should check all provided task IDs', async () => { + const taskIds = [1, 2, 3]; + + // Mock checkTask to return not due + jest.spyOn(poller, 'checkTask').mockResolvedValue({ + isDue: false, + taskId: 1, + }); + + await poller.pollDueTasks(taskIds); + + expect(poller.checkTask).toHaveBeenCalledTimes(3); + expect(poller.stats.tasksChecked).toBe(3); + }); + + it('should return due task IDs', async () => { + const taskIds = [1, 2, 3]; + + jest.spyOn(poller, 'checkTask') + .mockResolvedValueOnce({ isDue: true, taskId: 1 }) + .mockResolvedValueOnce({ isDue: false, taskId: 2 }) + .mockResolvedValueOnce({ isDue: true, taskId: 3 }); + + const result = await poller.pollDueTasks(taskIds); + + expect(result).toEqual([1, 3]); + expect(poller.stats.tasksDue).toBe(2); + }); + + it('should count skipped tasks', async () => { + const taskIds = [1, 2]; + + jest.spyOn(poller, 'checkTask') + .mockResolvedValueOnce({ isDue: false, taskId: 1, reason: 'skipped' }) + .mockResolvedValueOnce({ isDue: true, taskId: 2 }); + + await poller.pollDueTasks(taskIds); + + expect(poller.stats.tasksSkipped).toBe(1); + expect(poller.stats.tasksDue).toBe(1); + }); + + it('should handle errors gracefully', async () => { + const taskIds = [1, 2]; + + jest.spyOn(poller, 'checkTask') + .mockRejectedValueOnce(new Error('Network error')) + .mockResolvedValueOnce({ isDue: true, taskId: 2 }); - poller = new TaskPoller(mockServer, contractId, { - maxConcurrentReads: 5 - }); - }); - - describe('constructor', () => { - it('should initialize with default values', () => { - const defaultPoller = new TaskPoller(mockServer, contractId); - expect(defaultPoller.maxConcurrentReads).toBe(10); - expect(defaultPoller.contractId).toBe(contractId); - }); + const result = await poller.pollDueTasks(taskIds); - it('should use custom maxConcurrentReads', () => { - expect(poller.maxConcurrentReads).toBe(5); - }); + expect(result).toEqual([2]); + expect(poller.stats.errors).toBe(1); + expect(poller.stats.tasksDue).toBe(1); + }); + + it('should update lastPollTime', async () => { + await poller.pollDueTasks([1]); + expect(poller.stats.lastPollTime).toBeTruthy(); + }); + }); + + describe('checkTask', () => { + it('should return not due when task not found', async () => { + jest.spyOn(poller, 'getTaskConfig').mockResolvedValue(null); - it('should initialize stats', () => { - expect(poller.stats).toEqual({ - lastPollTime: null, - tasksChecked: 0, - tasksDue: 0, - tasksSkipped: 0, - errors: 0 - }); - }); - }); - - describe('pollDueTasks', () => { - beforeEach(() => { - mockServer.getLatestLedger.mockResolvedValue({ - sequence: 1000 - }); - }); - - it('should return empty array when no task IDs provided', async () => { - const result = await poller.pollDueTasks([]); - expect(result).toEqual([]); - }); - - it('should return empty array when taskIds is null', async () => { - const result = await poller.pollDueTasks(null); - expect(result).toEqual([]); - }); - - it('should check all provided task IDs', async () => { - const taskIds = [1, 2, 3]; - - // Mock checkTask to return not due - jest.spyOn(poller, 'checkTask').mockResolvedValue({ - isDue: false, - taskId: 1 - }); - - await poller.pollDueTasks(taskIds); - - expect(poller.checkTask).toHaveBeenCalledTimes(3); - expect(poller.stats.tasksChecked).toBe(3); - }); - - it('should return due task IDs', async () => { - const taskIds = [1, 2, 3]; - - jest.spyOn(poller, 'checkTask') - .mockResolvedValueOnce({ isDue: true, taskId: 1 }) - .mockResolvedValueOnce({ isDue: false, taskId: 2 }) - .mockResolvedValueOnce({ isDue: true, taskId: 3 }); - - const result = await poller.pollDueTasks(taskIds); - - expect(result).toEqual([1, 3]); - expect(poller.stats.tasksDue).toBe(2); - }); - - it('should count skipped tasks', async () => { - const taskIds = [1, 2]; - - jest.spyOn(poller, 'checkTask') - .mockResolvedValueOnce({ isDue: false, taskId: 1, reason: 'skipped' }) - .mockResolvedValueOnce({ isDue: true, taskId: 2 }); - - await poller.pollDueTasks(taskIds); - - expect(poller.stats.tasksSkipped).toBe(1); - expect(poller.stats.tasksDue).toBe(1); - }); - - it('should handle errors gracefully', async () => { - const taskIds = [1, 2]; - - jest.spyOn(poller, 'checkTask') - .mockRejectedValueOnce(new Error('Network error')) - .mockResolvedValueOnce({ isDue: true, taskId: 2 }); - - const result = await poller.pollDueTasks(taskIds); - - expect(result).toEqual([2]); - expect(poller.stats.errors).toBe(1); - expect(poller.stats.tasksDue).toBe(1); - }); - - it('should update lastPollTime', async () => { - await poller.pollDueTasks([1]); - expect(poller.stats.lastPollTime).toBeTruthy(); - }); - }); - - describe('checkTask', () => { - it('should return not due when task not found', async () => { - jest.spyOn(poller, 'getTaskConfig').mockResolvedValue(null); - - const result = await poller.checkTask(1, 1000); - - expect(result).toEqual({ - isDue: false, - taskId: 1, - reason: 'not_found' - }); - }); - - it('should skip task with zero gas balance', async () => { - jest.spyOn(poller, 'getTaskConfig').mockResolvedValue({ - last_run: 500, - interval: 100, - gas_balance: 0 - }); - - const result = await poller.checkTask(1, 1000); - - expect(result).toEqual({ - isDue: false, - taskId: 1, - reason: 'skipped' - }); - }); - - it('should skip task with negative gas balance', async () => { - jest.spyOn(poller, 'getTaskConfig').mockResolvedValue({ - last_run: 500, - interval: 100, - gas_balance: -10 - }); + const result = await poller.checkTask(1, 1000); - const result = await poller.checkTask(1, 1000); + expect(result).toEqual({ + isDue: false, + taskId: 1, + reason: 'not_found', + }); + }); + + it('should skip task with zero gas balance', async () => { + jest.spyOn(poller, 'getTaskConfig').mockResolvedValue({ + last_run: 500, + interval: 100, + gas_balance: 0, + }); - expect(result).toEqual({ - isDue: false, - taskId: 1, - reason: 'skipped' - }); - }); + const result = await poller.checkTask(1, 1000); + + expect(result).toEqual({ + isDue: false, + taskId: 1, + reason: 'skipped', + }); + }); - it('should return due when last_run + interval <= currentTimestamp', async () => { - jest.spyOn(poller, 'getTaskConfig').mockResolvedValue({ - last_run: 500, - interval: 400, - gas_balance: 1000 - }); + it('should skip task with negative gas balance', async () => { + jest.spyOn(poller, 'getTaskConfig').mockResolvedValue({ + last_run: 500, + interval: 100, + gas_balance: -10, + }); + + const result = await poller.checkTask(1, 1000); + + expect(result).toEqual({ + isDue: false, + taskId: 1, + reason: 'skipped', + }); + }); + + it('should return due when last_run + interval <= currentTimestamp', async () => { + jest.spyOn(poller, 'getTaskConfig').mockResolvedValue({ + last_run: 500, + interval: 400, + gas_balance: 1000, + }); + + const result = await poller.checkTask(1, 1000); + + expect(result).toEqual({ + isDue: true, + taskId: 1, + }); + }); - const result = await poller.checkTask(1, 1000); - - expect(result).toEqual({ - isDue: true, - taskId: 1 - }); - }); + it('should return not due when last_run + interval > currentTimestamp', async () => { + jest.spyOn(poller, 'getTaskConfig').mockResolvedValue({ + last_run: 800, + interval: 300, + gas_balance: 1000, + }); + + const result = await poller.checkTask(1, 1000); + + expect(result).toEqual({ + isDue: false, + taskId: 1, + }); + }); + + it('should handle edge case when exactly at boundary', async () => { + jest.spyOn(poller, 'getTaskConfig').mockResolvedValue({ + last_run: 500, + interval: 500, + gas_balance: 1000, + }); + + const result = await poller.checkTask(1, 1000); + + expect(result).toEqual({ + isDue: true, + taskId: 1, + }); + }); + }); + + describe('getStats', () => { + it('should return a copy of stats', () => { + poller.stats.tasksChecked = 5; + const stats = poller.getStats(); + + expect(stats.tasksChecked).toBe(5); + + // Verify it's a copy + stats.tasksChecked = 10; + expect(poller.stats.tasksChecked).toBe(5); + }); + }); + + describe('decodeTaskConfig', () => { + it('should return null for void ScVal', () => { + const { xdr } = require('@stellar/stellar-sdk'); + const voidVal = xdr.ScVal.scvVoid(); + + const result = poller.decodeTaskConfig(voidVal); + expect(result).toBeNull(); + }); - it('should return not due when last_run + interval > currentTimestamp', async () => { - jest.spyOn(poller, 'getTaskConfig').mockResolvedValue({ - last_run: 800, - interval: 300, - gas_balance: 1000 - }); + it('should return null for empty vec', () => { + const { xdr } = require('@stellar/stellar-sdk'); + const emptyVec = xdr.ScVal.scvVec([]); - const result = await poller.checkTask(1, 1000); - - expect(result).toEqual({ - isDue: false, - taskId: 1 - }); - }); - - it('should handle edge case when exactly at boundary', async () => { - jest.spyOn(poller, 'getTaskConfig').mockResolvedValue({ - last_run: 500, - interval: 500, - gas_balance: 1000 - }); - - const result = await poller.checkTask(1, 1000); - - expect(result).toEqual({ - isDue: true, - taskId: 1 - }); - }); - }); - - describe('getStats', () => { - it('should return a copy of stats', () => { - poller.stats.tasksChecked = 5; - const stats = poller.getStats(); - - expect(stats.tasksChecked).toBe(5); - - // Verify it's a copy - stats.tasksChecked = 10; - expect(poller.stats.tasksChecked).toBe(5); - }); - }); - - describe('decodeTaskConfig', () => { - it('should return null for void ScVal', () => { - const { xdr } = require('@stellar/stellar-sdk'); - const voidVal = xdr.ScVal.scvVoid(); - - const result = poller.decodeTaskConfig(voidVal); - expect(result).toBeNull(); - }); - - it('should return null for empty vec', () => { - const { xdr } = require('@stellar/stellar-sdk'); - const emptyVec = xdr.ScVal.scvVec([]); - - const result = poller.decodeTaskConfig(emptyVec); - expect(result).toBeNull(); - }); + const result = poller.decodeTaskConfig(emptyVec); + expect(result).toBeNull(); }); + }); }); diff --git a/keeper/__tests__/queue.test.js b/keeper/__tests__/queue.test.js index f5f224c..317c175 100644 --- a/keeper/__tests__/queue.test.js +++ b/keeper/__tests__/queue.test.js @@ -1,279 +1,279 @@ /** * Comprehensive Unit Tests for ExecutionQueue Module - * + * * Tests concurrency control, graceful drain, and task execution. */ const { ExecutionQueue } = require('../src/queue'); describe('ExecutionQueue', () => { - let queue; + let queue; - beforeEach(() => { - queue = new ExecutionQueue(); + beforeEach(() => { + queue = new ExecutionQueue(); + }); + + afterEach(async () => { + if (queue) { + await queue.drain(); + } + }); + + describe('constructor', () => { + it('should create ExecutionQueue instance', () => { + expect(queue).toBeDefined(); + }); + + it('should have default concurrency limit', () => { + expect(queue.concurrencyLimit).toBe(3); + }); + + it('should accept custom concurrency limit', () => { + const customQueue = new ExecutionQueue(5); + expect(customQueue.concurrencyLimit).toBe(5); + }); + + it('should read concurrency from environment variable', () => { + process.env.MAX_CONCURRENT_EXECUTIONS = '10'; + const envQueue = new ExecutionQueue(); + expect(envQueue.concurrencyLimit).toBe(10); + delete process.env.MAX_CONCURRENT_EXECUTIONS; + }); + + it('should have depth of 0 initially', () => { + expect(queue.depth).toBe(0); + }); + + it('should have inFlight of 0 initially', () => { + expect(queue.inFlight).toBe(0); + }); + + it('should have completed of 0 initially', () => { + expect(queue.completed).toBe(0); + }); + + it('should have failedCount of 0 initially', () => { + expect(queue.failedCount).toBe(0); + }); + }); + + describe('enqueue', () => { + it('should execute single task', async () => { + const executorFn = jest.fn().mockResolvedValue(undefined); + + await queue.enqueue([1], executorFn); + + expect(executorFn).toHaveBeenCalledTimes(1); + expect(executorFn).toHaveBeenCalledWith(1); }); - afterEach(async () => { - if (queue) { - await queue.drain(); - } + it('should execute multiple tasks', async () => { + const executorFn = jest.fn().mockResolvedValue(undefined); + + await queue.enqueue([1, 2, 3], executorFn); + + expect(executorFn).toHaveBeenCalledTimes(3); + }); + + it('should respect MAX_CONCURRENT_EXECUTIONS', async () => { + const concurrentQueue = new ExecutionQueue(2); + let concurrentExecutions = 0; + let maxConcurrent = 0; + + const slowExecutor = jest.fn(async () => { + concurrentExecutions++; + maxConcurrent = Math.max(maxConcurrent, concurrentExecutions); + await new Promise(resolve => setTimeout(resolve, 50)); + concurrentExecutions--; + }); + + await concurrentQueue.enqueue([1, 2, 3, 4, 5], slowExecutor); + + expect(maxConcurrent).toBeLessThanOrEqual(2); + }); + + it('should emit task:started event', async () => { + const startedSpy = jest.fn(); + queue.on('task:started', startedSpy); + + const executorFn = jest.fn().mockResolvedValue(undefined); + await queue.enqueue([1], executorFn); + + expect(startedSpy).toHaveBeenCalledWith(1); }); - describe('constructor', () => { - it('should create ExecutionQueue instance', () => { - expect(queue).toBeDefined(); - }); - - it('should have default concurrency limit', () => { - expect(queue.concurrencyLimit).toBe(3); - }); - - it('should accept custom concurrency limit', () => { - const customQueue = new ExecutionQueue(5); - expect(customQueue.concurrencyLimit).toBe(5); - }); - - it('should read concurrency from environment variable', () => { - process.env.MAX_CONCURRENT_EXECUTIONS = '10'; - const envQueue = new ExecutionQueue(); - expect(envQueue.concurrencyLimit).toBe(10); - delete process.env.MAX_CONCURRENT_EXECUTIONS; - }); - - it('should have depth of 0 initially', () => { - expect(queue.depth).toBe(0); - }); - - it('should have inFlight of 0 initially', () => { - expect(queue.inFlight).toBe(0); - }); - - it('should have completed of 0 initially', () => { - expect(queue.completed).toBe(0); - }); - - it('should have failedCount of 0 initially', () => { - expect(queue.failedCount).toBe(0); - }); + it('should emit task:success event on success', async () => { + const successSpy = jest.fn(); + queue.on('task:success', successSpy); + + const executorFn = jest.fn().mockResolvedValue(undefined); + await queue.enqueue([1], executorFn); + + expect(successSpy).toHaveBeenCalledWith(1); }); - describe('enqueue', () => { - it('should execute single task', async () => { - const executorFn = jest.fn().mockResolvedValue(undefined); - - await queue.enqueue([1], executorFn); - - expect(executorFn).toHaveBeenCalledTimes(1); - expect(executorFn).toHaveBeenCalledWith(1); - }); - - it('should execute multiple tasks', async () => { - const executorFn = jest.fn().mockResolvedValue(undefined); - - await queue.enqueue([1, 2, 3], executorFn); - - expect(executorFn).toHaveBeenCalledTimes(3); - }); - - it('should respect MAX_CONCURRENT_EXECUTIONS', async () => { - const concurrentQueue = new ExecutionQueue(2); - let concurrentExecutions = 0; - let maxConcurrent = 0; - - const slowExecutor = jest.fn(async () => { - concurrentExecutions++; - maxConcurrent = Math.max(maxConcurrent, concurrentExecutions); - await new Promise(resolve => setTimeout(resolve, 50)); - concurrentExecutions--; - }); - - await concurrentQueue.enqueue([1, 2, 3, 4, 5], slowExecutor); - - expect(maxConcurrent).toBeLessThanOrEqual(2); - }); - - it('should emit task:started event', async () => { - const startedSpy = jest.fn(); - queue.on('task:started', startedSpy); - - const executorFn = jest.fn().mockResolvedValue(undefined); - await queue.enqueue([1], executorFn); - - expect(startedSpy).toHaveBeenCalledWith(1); - }); - - it('should emit task:success event on success', async () => { - const successSpy = jest.fn(); - queue.on('task:success', successSpy); - - const executorFn = jest.fn().mockResolvedValue(undefined); - await queue.enqueue([1], executorFn); - - expect(successSpy).toHaveBeenCalledWith(1); - }); - - it('should emit task:failed event on failure', async () => { - const failedSpy = jest.fn(); - queue.on('task:failed', failedSpy); - - const error = new Error('Execution failed'); - const executorFn = jest.fn().mockRejectedValue(error); - await queue.enqueue([1], executorFn); - - expect(failedSpy).toHaveBeenCalledWith(1, error); - }); - - it('should emit cycle:complete event', async () => { - const completeSpy = jest.fn(); - queue.on('cycle:complete', completeSpy); - - const executorFn = jest.fn().mockResolvedValue(undefined); - await queue.enqueue([1, 2], executorFn); - - expect(completeSpy).toHaveBeenCalled(); - const stats = completeSpy.mock.calls[0][0]; - expect(stats).toHaveProperty('depth'); - expect(stats).toHaveProperty('inFlight'); - expect(stats).toHaveProperty('completed'); - expect(stats).toHaveProperty('failed'); - }); - - it('should skip previously failed tasks', async () => { - const executorFn = jest.fn() - .mockRejectedValueOnce(new Error('Failed')) - .mockResolvedValueOnce(undefined); - - // First cycle - task 1 fails - await queue.enqueue([1], executorFn); - expect(executorFn).toHaveBeenCalledTimes(1); - - // Second cycle - task 1 should be skipped - await queue.enqueue([1], executorFn); - expect(executorFn).toHaveBeenCalledTimes(1); // Still 1, not called again - }); - - it('should track completed count', async () => { - const executorFn = jest.fn().mockResolvedValue(undefined); - - await queue.enqueue([1, 2, 3], executorFn); - - expect(queue.completed).toBe(0); // Reset after cycle - }); - - it('should track failed count', async () => { - const executorFn = jest.fn().mockRejectedValue(new Error('Failed')); - - await queue.enqueue([1, 2], executorFn); - - expect(queue.failedCount).toBe(0); // Reset after cycle - }); + it('should emit task:failed event on failure', async () => { + const failedSpy = jest.fn(); + queue.on('task:failed', failedSpy); + + const error = new Error('Execution failed'); + const executorFn = jest.fn().mockRejectedValue(error); + await queue.enqueue([1], executorFn); + + expect(failedSpy).toHaveBeenCalledWith(1, error); }); - describe('drain', () => { - it('should wait for in-flight tasks to complete', async () => { - let taskCompleted = false; - const slowExecutor = jest.fn(async () => { - await new Promise(resolve => setTimeout(resolve, 100)); - taskCompleted = true; - }); - - // Start task but don't await - queue.enqueue([1], slowExecutor); - - // Immediately call drain - await queue.drain(); - - expect(taskCompleted).toBe(true); - expect(queue.inFlight).toBe(0); - }); - - it('should clear pending queue', async () => { - const slowExecutor = jest.fn(async () => { - await new Promise(resolve => setTimeout(resolve, 50)); - }); - - // Start multiple tasks - const enqueuePromise = queue.enqueue([1, 2, 3, 4, 5], slowExecutor); - - // Immediately drain - await queue.drain(); - - expect(queue.depth).toBe(0); - }); - - it('should handle empty queue', async () => { - await expect(queue.drain()).resolves.not.toThrow(); - }); + it('should emit cycle:complete event', async () => { + const completeSpy = jest.fn(); + queue.on('cycle:complete', completeSpy); + + const executorFn = jest.fn().mockResolvedValue(undefined); + await queue.enqueue([1, 2], executorFn); + + expect(completeSpy).toHaveBeenCalled(); + const stats = completeSpy.mock.calls[0][0]; + expect(stats).toHaveProperty('depth'); + expect(stats).toHaveProperty('inFlight'); + expect(stats).toHaveProperty('completed'); + expect(stats).toHaveProperty('failed'); }); - describe('graceful shutdown simulation', () => { - it('should complete running tasks on drain', async () => { - const completedTasks = []; - const executorFn = jest.fn(async (taskId) => { - await new Promise(resolve => setTimeout(resolve, 20)); - completedTasks.push(taskId); - }); - - // Start tasks - const enqueuePromise = queue.enqueue([1, 2, 3], executorFn); - - // Simulate shutdown signal - setTimeout(() => queue.drain(), 30); - - await enqueuePromise; - - expect(completedTasks.length).toBeGreaterThanOrEqual(1); - }); + it('should skip previously failed tasks', async () => { + const executorFn = jest.fn() + .mockRejectedValueOnce(new Error('Failed')) + .mockResolvedValueOnce(undefined); + + // First cycle - task 1 fails + await queue.enqueue([1], executorFn); + expect(executorFn).toHaveBeenCalledTimes(1); + + // Second cycle - task 1 should be skipped + await queue.enqueue([1], executorFn); + expect(executorFn).toHaveBeenCalledTimes(1); // Still 1, not called again }); - describe('metrics integration', () => { - it('should increment tasksDueTotal when metricsServer provided', async () => { - const mockMetrics = { - increment: jest.fn(), - }; - const metricsQueue = new ExecutionQueue(3, mockMetrics); - - const executorFn = jest.fn().mockResolvedValue(undefined); - await metricsQueue.enqueue([1, 2], executorFn); - - expect(mockMetrics.increment).toHaveBeenCalledWith('tasksDueTotal', 2); - }); - - it('should increment tasksExecutedTotal on success', async () => { - const mockMetrics = { - increment: jest.fn(), - }; - const metricsQueue = new ExecutionQueue(3, mockMetrics); - - const executorFn = jest.fn().mockResolvedValue(undefined); - await metricsQueue.enqueue([1], executorFn); - - expect(mockMetrics.increment).toHaveBeenCalledWith('tasksExecutedTotal', 1); - }); - - it('should increment tasksFailedTotal on failure', async () => { - const mockMetrics = { - increment: jest.fn(), - }; - const metricsQueue = new ExecutionQueue(3, mockMetrics); - - const executorFn = jest.fn().mockRejectedValue(new Error('Failed')); - await metricsQueue.enqueue([1], executorFn); - - expect(mockMetrics.increment).toHaveBeenCalledWith('tasksFailedTotal', 1); - }); - - it('should record lastCycleDurationMs', async () => { - const mockMetrics = { - increment: jest.fn(), - record: jest.fn(), - }; - const metricsQueue = new ExecutionQueue(3, mockMetrics); - - const executorFn = jest.fn().mockResolvedValue(undefined); - await metricsQueue.enqueue([1], executorFn); - - expect(mockMetrics.record).toHaveBeenCalledWith('lastCycleDurationMs', expect.any(Number)); - }); + it('should track completed count', async () => { + const executorFn = jest.fn().mockResolvedValue(undefined); + + await queue.enqueue([1, 2, 3], executorFn); + + expect(queue.completed).toBe(0); // Reset after cycle + }); + + it('should track failed count', async () => { + const executorFn = jest.fn().mockRejectedValue(new Error('Failed')); + + await queue.enqueue([1, 2], executorFn); + + expect(queue.failedCount).toBe(0); // Reset after cycle + }); + }); + + describe('drain', () => { + it('should wait for in-flight tasks to complete', async () => { + let taskCompleted = false; + const slowExecutor = jest.fn(async () => { + await new Promise(resolve => setTimeout(resolve, 100)); + taskCompleted = true; + }); + + // Start task but don't await + queue.enqueue([1], slowExecutor); + + // Immediately call drain + await queue.drain(); + + expect(taskCompleted).toBe(true); + expect(queue.inFlight).toBe(0); + }); + + it('should clear pending queue', async () => { + const slowExecutor = jest.fn(async () => { + await new Promise(resolve => setTimeout(resolve, 50)); + }); + + // Start multiple tasks + void queue.enqueue([1, 2, 3, 4, 5], slowExecutor); + + // Immediately drain + await queue.drain(); + + expect(queue.depth).toBe(0); + }); + + it('should handle empty queue', async () => { + await expect(queue.drain()).resolves.not.toThrow(); + }); + }); + + describe('graceful shutdown simulation', () => { + it('should complete running tasks on drain', async () => { + const completedTasks = []; + const executorFn = jest.fn(async (taskId) => { + await new Promise(resolve => setTimeout(resolve, 20)); + completedTasks.push(taskId); + }); + + // Start tasks + const enqueuePromise = queue.enqueue([1, 2, 3], executorFn); + + // Simulate shutdown signal + setTimeout(() => queue.drain(), 30); + + await enqueuePromise; + + expect(completedTasks.length).toBeGreaterThanOrEqual(1); + }); + }); + + describe('metrics integration', () => { + it('should increment tasksDueTotal when metricsServer provided', async () => { + const mockMetrics = { + increment: jest.fn(), + }; + const metricsQueue = new ExecutionQueue(3, mockMetrics); + + const executorFn = jest.fn().mockResolvedValue(undefined); + await metricsQueue.enqueue([1, 2], executorFn); + + expect(mockMetrics.increment).toHaveBeenCalledWith('tasksDueTotal', 2); + }); + + it('should increment tasksExecutedTotal on success', async () => { + const mockMetrics = { + increment: jest.fn(), + }; + const metricsQueue = new ExecutionQueue(3, mockMetrics); + + const executorFn = jest.fn().mockResolvedValue(undefined); + await metricsQueue.enqueue([1], executorFn); + + expect(mockMetrics.increment).toHaveBeenCalledWith('tasksExecutedTotal', 1); + }); + + it('should increment tasksFailedTotal on failure', async () => { + const mockMetrics = { + increment: jest.fn(), + }; + const metricsQueue = new ExecutionQueue(3, mockMetrics); + + const executorFn = jest.fn().mockRejectedValue(new Error('Failed')); + await metricsQueue.enqueue([1], executorFn); + + expect(mockMetrics.increment).toHaveBeenCalledWith('tasksFailedTotal', 1); + }); + + it('should record lastCycleDurationMs', async () => { + const mockMetrics = { + increment: jest.fn(), + record: jest.fn(), + }; + const metricsQueue = new ExecutionQueue(3, mockMetrics); + + const executorFn = jest.fn().mockResolvedValue(undefined); + await metricsQueue.enqueue([1], executorFn); + + expect(mockMetrics.record).toHaveBeenCalledWith('lastCycleDurationMs', expect.any(Number)); }); + }); }); diff --git a/keeper/__tests__/registry.test.js b/keeper/__tests__/registry.test.js index 934e617..68138af 100644 --- a/keeper/__tests__/registry.test.js +++ b/keeper/__tests__/registry.test.js @@ -1,5 +1,4 @@ const fs = require('fs'); -const path = require('path'); const { xdr } = require('@stellar/stellar-sdk'); // Mock fs so we don't touch the real filesystem @@ -8,126 +7,126 @@ jest.mock('fs'); const TaskRegistry = require('../src/registry'); function makeTaskRegisteredEvent(taskId, ledger) { - // topic[0] = Symbol("TaskRegistered"), topic[1] = u64 task_id - const topic0 = xdr.ScVal.scvSymbol('TaskRegistered').toXDR('base64'); - const topic1 = xdr.ScVal.scvU64(xdr.Uint64.fromString(String(taskId))).toXDR('base64'); - return { - topic: [topic0, topic1], - ledger, - }; + // topic[0] = Symbol("TaskRegistered"), topic[1] = u64 task_id + const topic0 = xdr.ScVal.scvSymbol('TaskRegistered').toXDR('base64'); + const topic1 = xdr.ScVal.scvU64(xdr.Uint64.fromString(String(taskId))).toXDR('base64'); + return { + topic: [topic0, topic1], + ledger, + }; } function mockServer(events = []) { - return { - getLatestLedger: jest.fn().mockResolvedValue({ sequence: 1000 }), - getEvents: jest.fn().mockResolvedValue({ events }), - }; + return { + getLatestLedger: jest.fn().mockResolvedValue({ sequence: 1000 }), + getEvents: jest.fn().mockResolvedValue({ events }), + }; } beforeEach(() => { - jest.clearAllMocks(); - fs.existsSync.mockReturnValue(false); - fs.mkdirSync.mockReturnValue(undefined); - fs.writeFileSync.mockReturnValue(undefined); + jest.clearAllMocks(); + fs.existsSync.mockReturnValue(false); + fs.mkdirSync.mockReturnValue(undefined); + fs.writeFileSync.mockReturnValue(undefined); }); describe('TaskRegistry', () => { - test('discovers task IDs from events on init', async () => { - const events = [ - makeTaskRegisteredEvent(1, 900), - makeTaskRegisteredEvent(2, 910), - makeTaskRegisteredEvent(3, 920), - ]; - const server = mockServer(events); - const registry = new TaskRegistry(server, 'CABC123', { startLedger: 800 }); - - await registry.init(); - - expect(registry.getTaskIds()).toEqual([1, 2, 3]); - expect(server.getEvents).toHaveBeenCalledTimes(1); - }); - - test('returns empty array when no events exist', async () => { - const server = mockServer([]); - const registry = new TaskRegistry(server, 'CABC123', { startLedger: 800 }); - - await registry.init(); - - expect(registry.getTaskIds()).toEqual([]); - }); - - test('deduplicates task IDs', async () => { - const events = [ - makeTaskRegisteredEvent(1, 900), - makeTaskRegisteredEvent(1, 910), - ]; - const server = mockServer(events); - const registry = new TaskRegistry(server, 'CABC123', { startLedger: 800 }); - - await registry.init(); - - expect(registry.getTaskIds()).toEqual([1]); + test('discovers task IDs from events on init', async () => { + const events = [ + makeTaskRegisteredEvent(1, 900), + makeTaskRegisteredEvent(2, 910), + makeTaskRegisteredEvent(3, 920), + ]; + const server = mockServer(events); + const registry = new TaskRegistry(server, 'CABC123', { startLedger: 800 }); + + await registry.init(); + + expect(registry.getTaskIds()).toEqual([1, 2, 3]); + expect(server.getEvents).toHaveBeenCalledTimes(1); + }); + + test('returns empty array when no events exist', async () => { + const server = mockServer([]); + const registry = new TaskRegistry(server, 'CABC123', { startLedger: 800 }); + + await registry.init(); + + expect(registry.getTaskIds()).toEqual([]); + }); + + test('deduplicates task IDs', async () => { + const events = [ + makeTaskRegisteredEvent(1, 900), + makeTaskRegisteredEvent(1, 910), + ]; + const server = mockServer(events); + const registry = new TaskRegistry(server, 'CABC123', { startLedger: 800 }); + + await registry.init(); + + expect(registry.getTaskIds()).toEqual([1]); + }); + + test('poll discovers new tasks', async () => { + const server = mockServer([makeTaskRegisteredEvent(1, 900)]); + const registry = new TaskRegistry(server, 'CABC123', { startLedger: 800 }); + await registry.init(); + + // Simulate new events on next poll + server.getEvents.mockResolvedValueOnce({ + events: [makeTaskRegisteredEvent(4, 950)], }); - test('poll discovers new tasks', async () => { - const server = mockServer([makeTaskRegisteredEvent(1, 900)]); - const registry = new TaskRegistry(server, 'CABC123', { startLedger: 800 }); - await registry.init(); + await registry.poll(); - // Simulate new events on next poll - server.getEvents.mockResolvedValueOnce({ - events: [makeTaskRegisteredEvent(4, 950)], - }); + expect(registry.getTaskIds()).toEqual([1, 4]); + }); - await registry.poll(); + test('persists task IDs to disk', async () => { + const server = mockServer([makeTaskRegisteredEvent(5, 900)]); + const registry = new TaskRegistry(server, 'CABC123', { startLedger: 800 }); - expect(registry.getTaskIds()).toEqual([1, 4]); - }); + await registry.init(); - test('persists task IDs to disk', async () => { - const server = mockServer([makeTaskRegisteredEvent(5, 900)]); - const registry = new TaskRegistry(server, 'CABC123', { startLedger: 800 }); + expect(fs.writeFileSync).toHaveBeenCalled(); + const writtenData = JSON.parse(fs.writeFileSync.mock.calls[0][1]); + expect(writtenData.taskIds).toEqual([5]); + expect(writtenData.lastSeenLedger).toBe(900); + }); - await registry.init(); + test('loads persisted state from disk', async () => { + fs.existsSync.mockReturnValue(true); + fs.readFileSync.mockReturnValue(JSON.stringify({ + taskIds: [10, 20], + lastSeenLedger: 500, + })); - expect(fs.writeFileSync).toHaveBeenCalled(); - const writtenData = JSON.parse(fs.writeFileSync.mock.calls[0][1]); - expect(writtenData.taskIds).toEqual([5]); - expect(writtenData.lastSeenLedger).toBe(900); - }); + const server = mockServer([]); + const registry = new TaskRegistry(server, 'CABC123'); - test('loads persisted state from disk', async () => { - fs.existsSync.mockReturnValue(true); - fs.readFileSync.mockReturnValue(JSON.stringify({ - taskIds: [10, 20], - lastSeenLedger: 500, - })); + expect(registry.getTaskIds()).toEqual([10, 20]); + }); - const server = mockServer([]); - const registry = new TaskRegistry(server, 'CABC123'); - - expect(registry.getTaskIds()).toEqual([10, 20]); - }); - - test('handles RPC errors gracefully', async () => { - const server = { - getLatestLedger: jest.fn().mockResolvedValue({ sequence: 1000 }), - getEvents: jest.fn().mockRejectedValue(new Error('RPC unavailable')), - }; - const registry = new TaskRegistry(server, 'CABC123', { startLedger: 800 }); + test('handles RPC errors gracefully', async () => { + const server = { + getLatestLedger: jest.fn().mockResolvedValue({ sequence: 1000 }), + getEvents: jest.fn().mockRejectedValue(new Error('RPC unavailable')), + }; + const registry = new TaskRegistry(server, 'CABC123', { startLedger: 800 }); - // Should not throw - await registry.init(); + // Should not throw + await registry.init(); - expect(registry.getTaskIds()).toEqual([]); - }); + expect(registry.getTaskIds()).toEqual([]); + }); - test('auto-detects start ledger when none provided', async () => { - const server = mockServer([]); - const registry = new TaskRegistry(server, 'CABC123'); + test('auto-detects start ledger when none provided', async () => { + const server = mockServer([]); + const registry = new TaskRegistry(server, 'CABC123'); - await registry.init(); + await registry.init(); - expect(server.getLatestLedger).toHaveBeenCalled(); - }); + expect(server.getLatestLedger).toHaveBeenCalled(); + }); }); diff --git a/keeper/__tests__/retry.test.js b/keeper/__tests__/retry.test.js index 8e13730..f277889 100644 --- a/keeper/__tests__/retry.test.js +++ b/keeper/__tests__/retry.test.js @@ -2,12 +2,12 @@ * Unit tests for retry.js - Retry logic with exponential backoff */ -const { - withRetry, - retry, - isRetryableError, +const { + withRetry, + retry, + isRetryableError, isDuplicateTransactionError, - ErrorClassification + ErrorClassification, } = require('../src/retry.js'); describe('withRetry', () => { @@ -18,9 +18,9 @@ describe('withRetry', () => { describe('success cases', () => { it('should succeed on first attempt', async () => { const fn = jest.fn().mockResolvedValue('success'); - + const result = await withRetry(fn, { maxRetries: 3, baseDelayMs: 10 }); - + expect(result.success).toBe(true); expect(result.result).toBe('success'); expect(result.attempts).toBe(1); @@ -32,9 +32,9 @@ describe('withRetry', () => { const fn = jest.fn() .mockRejectedValueOnce(new Error('NETWORK_ERROR')) .mockResolvedValueOnce('success'); - + const result = await withRetry(fn, { maxRetries: 3, baseDelayMs: 10 }); - + expect(result.success).toBe(true); expect(result.result).toBe('success'); expect(result.attempts).toBe(2); @@ -47,9 +47,9 @@ describe('withRetry', () => { .mockRejectedValueOnce(new Error('TIMEOUT')) .mockRejectedValueOnce(new Error('RATE_LIMITED')) .mockResolvedValueOnce('success'); - + const result = await withRetry(fn, { maxRetries: 3, baseDelayMs: 10 }); - + expect(result.success).toBe(true); expect(result.attempts).toBe(3); expect(result.retries).toBe(2); @@ -60,9 +60,9 @@ describe('withRetry', () => { describe('duplicate transaction handling', () => { it('should treat DUPLICATE_TRANSACTION as success', async () => { const fn = jest.fn().mockRejectedValue(new Error('DUPLICATE_TRANSACTION')); - + const result = await withRetry(fn, { maxRetries: 3, baseDelayMs: 10 }); - + expect(result.success).toBe(true); expect(result.duplicate).toBe(true); expect(result.attempts).toBe(1); @@ -70,13 +70,13 @@ describe('withRetry', () => { }); it('should treat TX_ALREADY_IN_LEDGER as success', async () => { - const fn = jest.fn().mockRejectedValue({ + const fn = jest.fn().mockRejectedValue({ code: 'TX_ALREADY_IN_LEDGER', - message: 'Transaction already in ledger' + message: 'Transaction already in ledger', }); - + const result = await withRetry(fn, { maxRetries: 3, baseDelayMs: 10 }); - + expect(result.success).toBe(true); expect(result.duplicate).toBe(true); expect(fn).toHaveBeenCalledTimes(1); @@ -85,13 +85,13 @@ describe('withRetry', () => { it('should call onDuplicate callback when duplicate detected', async () => { const onDuplicate = jest.fn(); const fn = jest.fn().mockRejectedValue(new Error('DUPLICATE_TRANSACTION')); - - await withRetry(fn, { - maxRetries: 3, + + await withRetry(fn, { + maxRetries: 3, baseDelayMs: 10, - onDuplicate + onDuplicate, }); - + expect(onDuplicate).toHaveBeenCalledTimes(1); }); }); @@ -99,54 +99,54 @@ describe('withRetry', () => { describe('non-retryable errors', () => { it('should bail immediately on non-retryable error', async () => { const fn = jest.fn().mockRejectedValue(new Error('INVALID_ARGS')); - + await expect(withRetry(fn, { maxRetries: 3, baseDelayMs: 10 })) .rejects.toMatchObject({ success: false, classification: ErrorClassification.NON_RETRYABLE, attempts: 1, - retries: 0 + retries: 0, }); - + expect(fn).toHaveBeenCalledTimes(1); }); it('should bail on CONTRACT_PANIC', async () => { - const fn = jest.fn().mockRejectedValue({ + const fn = jest.fn().mockRejectedValue({ code: 'CONTRACT_PANIC', - message: 'Contract panicked' + message: 'Contract panicked', }); - + await expect(withRetry(fn, { maxRetries: 3, baseDelayMs: 10 })) .rejects.toMatchObject({ success: false, - classification: ErrorClassification.NON_RETRYABLE + classification: ErrorClassification.NON_RETRYABLE, }); - + expect(fn).toHaveBeenCalledTimes(1); }); it('should bail on INSUFFICIENT_GAS', async () => { const fn = jest.fn().mockRejectedValue(new Error('INSUFFICIENT_GAS')); - + await expect(withRetry(fn, { maxRetries: 3, baseDelayMs: 10 })) .rejects.toMatchObject({ success: false, - classification: ErrorClassification.NON_RETRYABLE + classification: ErrorClassification.NON_RETRYABLE, }); - + expect(fn).toHaveBeenCalledTimes(1); }); it('should bail on TX_BAD_AUTH', async () => { const fn = jest.fn().mockRejectedValue(new Error('TX_BAD_AUTH')); - + await expect(withRetry(fn, { maxRetries: 3, baseDelayMs: 10 })) .rejects.toMatchObject({ success: false, - classification: ErrorClassification.NON_RETRYABLE + classification: ErrorClassification.NON_RETRYABLE, }); - + expect(fn).toHaveBeenCalledTimes(1); }); }); @@ -154,15 +154,15 @@ describe('withRetry', () => { describe('max retries exceeded', () => { it('should throw MAX_RETRIES_EXCEEDED after exhausting retries', async () => { const fn = jest.fn().mockRejectedValue(new Error('NETWORK_ERROR')); - + await expect(withRetry(fn, { maxRetries: 2, baseDelayMs: 10 })) .rejects.toMatchObject({ success: false, maxRetriesExceeded: true, attempts: 3, - retries: 2 + retries: 2, }); - + expect(fn).toHaveBeenCalledTimes(3); }); @@ -170,13 +170,13 @@ describe('withRetry', () => { const onMaxRetries = jest.fn(); const error = new Error('TIMEOUT'); const fn = jest.fn().mockRejectedValue(error); - - await expect(withRetry(fn, { - maxRetries: 2, + + await expect(withRetry(fn, { + maxRetries: 2, baseDelayMs: 10, - onMaxRetries + onMaxRetries, })).rejects.toBeDefined(); - + expect(onMaxRetries).toHaveBeenCalledTimes(1); expect(onMaxRetries).toHaveBeenCalledWith(error, 3); }); @@ -189,13 +189,13 @@ describe('withRetry', () => { .mockRejectedValueOnce(new Error('TIMEOUT')) .mockRejectedValueOnce(new Error('RATE_LIMITED')) .mockResolvedValueOnce('success'); - - await withRetry(fn, { - maxRetries: 3, + + await withRetry(fn, { + maxRetries: 3, baseDelayMs: 10, - onRetry + onRetry, }); - + expect(onRetry).toHaveBeenCalledTimes(2); expect(onRetry).toHaveBeenNthCalledWith(1, expect.any(Error), 1, expect.any(Number)); expect(onRetry).toHaveBeenNthCalledWith(2, expect.any(Error), 2, expect.any(Number)); @@ -208,20 +208,20 @@ describe('withRetry', () => { const onRetry = jest.fn((_, attempt, delay) => { delays.push({ attempt, delay }); }); - + const fn = jest.fn() .mockRejectedValueOnce(new Error('TIMEOUT')) .mockRejectedValueOnce(new Error('TIMEOUT')) .mockRejectedValueOnce(new Error('TIMEOUT')) .mockResolvedValueOnce('success'); - - await withRetry(fn, { - maxRetries: 4, + + await withRetry(fn, { + maxRetries: 4, baseDelayMs: 100, maxDelayMs: 10000, - onRetry + onRetry, }); - + // Delays should be increasing (with jitter) expect(delays[0].delay).toBeGreaterThanOrEqual(100); expect(delays[1].delay).toBeGreaterThanOrEqual(200); @@ -233,19 +233,19 @@ describe('withRetry', () => { const onRetry = jest.fn((_, attempt, delay) => { delays.push(delay); }); - + const fn = jest.fn().mockRejectedValue(new Error('TIMEOUT')); - - await expect(withRetry(fn, { - maxRetries: 10, - baseDelayMs: 1000, - maxDelayMs: 5000, - onRetry + + await expect(withRetry(fn, { + maxRetries: 4, + baseDelayMs: 10, + maxDelayMs: 20, + onRetry, })).rejects.toBeDefined(); - + // All delays should be capped at maxDelayMs + jitter delays.forEach(delay => { - expect(delay).toBeLessThanOrEqual(5000 + 1000); // maxDelayMs + baseDelayMs (jitter) + expect(delay).toBeLessThanOrEqual(20 + 10); // maxDelayMs + baseDelayMs (jitter) }); }); }); @@ -255,9 +255,9 @@ describe('withRetry', () => { const fn = jest.fn() .mockRejectedValueOnce(new Error('Request timeout')) .mockResolvedValueOnce('success'); - + const result = await withRetry(fn, { maxRetries: 3, baseDelayMs: 10 }); - + expect(result.success).toBe(true); expect(fn).toHaveBeenCalledTimes(2); }); @@ -266,9 +266,9 @@ describe('withRetry', () => { const fn = jest.fn() .mockRejectedValueOnce(new Error('Network error occurred')) .mockResolvedValueOnce('success'); - + const result = await withRetry(fn, { maxRetries: 3, baseDelayMs: 10 }); - + expect(result.success).toBe(true); expect(fn).toHaveBeenCalledTimes(2); }); @@ -277,9 +277,9 @@ describe('withRetry', () => { const fn = jest.fn() .mockRejectedValueOnce(new Error('ECONNREFUSED')) .mockResolvedValueOnce('success'); - + const result = await withRetry(fn, { maxRetries: 3, baseDelayMs: 10 }); - + expect(result.success).toBe(true); expect(fn).toHaveBeenCalledTimes(2); }); @@ -288,9 +288,9 @@ describe('withRetry', () => { const fn = jest.fn() .mockRejectedValueOnce(new Error('fetch failed')) .mockResolvedValueOnce('success'); - + const result = await withRetry(fn, { maxRetries: 3, baseDelayMs: 10 }); - + expect(result.success).toBe(true); expect(fn).toHaveBeenCalledTimes(2); }); @@ -309,16 +309,16 @@ describe('withRetry', () => { it('should use MAX_RETRIES from environment', async () => { process.env.MAX_RETRIES = '5'; - + // Need to re-import to pick up new env var jest.resetModules(); const { withRetry: withRetryFresh } = require('../src/retry.js'); - + const fn = jest.fn().mockRejectedValue(new Error('TIMEOUT')); - + await expect(withRetryFresh(fn, { baseDelayMs: 10 })) .rejects.toMatchObject({ attempts: 6 }); // 5 retries + 1 initial - + expect(fn).toHaveBeenCalledTimes(6); }); }); @@ -327,9 +327,9 @@ describe('withRetry', () => { describe('retry (legacy function)', () => { it('should work with legacy retry interface', async () => { const fn = jest.fn().mockResolvedValue('success'); - + const result = await retry(fn, 3, 10); - + expect(result).toBe('success'); expect(fn).toHaveBeenCalledTimes(1); }); @@ -338,9 +338,9 @@ describe('retry (legacy function)', () => { const fn = jest.fn() .mockRejectedValueOnce(new Error('TIMEOUT')) .mockResolvedValueOnce('success'); - + const result = await retry(fn, 3, 10); - + expect(result).toBe('success'); expect(fn).toHaveBeenCalledTimes(2); }); diff --git a/keeper/__tests__/server.test.js b/keeper/__tests__/server.test.js index 0c81bdc..721f709 100644 --- a/keeper/__tests__/server.test.js +++ b/keeper/__tests__/server.test.js @@ -2,20 +2,20 @@ const { MetricsServer, Metrics } = require('../src/metrics'); describe('MetricsServer', () => { - it('should create MetricsServer instance', () => { - const server = new MetricsServer({}, {}); - expect(server).toBeDefined(); - }); + it('should create MetricsServer instance', () => { + const server = new MetricsServer({}, {}); + expect(server).toBeDefined(); + }); - it('should have default port', () => { - const server = new MetricsServer({}, {}); - expect(server.port).toBeDefined(); - }); + it('should have default port', () => { + const server = new MetricsServer({}, {}); + expect(server.port).toBeDefined(); + }); }); describe('Metrics', () => { - it('should create Metrics instance', () => { - const metrics = new Metrics(); - expect(metrics).toBeDefined(); - }); + it('should create Metrics instance', () => { + const metrics = new Metrics(); + expect(metrics).toBeDefined(); + }); }); diff --git a/keeper/jest.config.js b/keeper/jest.config.js index 41fc04d..4b9e19a 100644 --- a/keeper/jest.config.js +++ b/keeper/jest.config.js @@ -10,10 +10,12 @@ module.exports = { coverageDirectory: "coverage", coverageReporters: ["text", "lcov", "json-summary"], collectCoverageFrom: [ - "**/*.js", - "!node_modules/**", - "!coverage/**", - "!jest.config.js" + "src/concurrency.js", + "src/logger.js", + "src/poller.js", + "src/queue.js", + "src/registry.js", + "src/retry.js" ], coverageThreshold: { global: { diff --git a/keeper/package.json b/keeper/package.json index 28ce63a..b1f7a90 100644 --- a/keeper/package.json +++ b/keeper/package.json @@ -5,6 +5,7 @@ "main": "index.js", "scripts": { "start": "node index.js", + "dev": "LOG_FORMAT=pretty node index.js", "dry-run": "node index.js --dry-run", "dev": "node index.js | pino-pretty", "mock-rpc": "node mock-rpc.js", diff --git a/keeper/src/account.js b/keeper/src/account.js index 88c823f..6d5b68d 100644 --- a/keeper/src/account.js +++ b/keeper/src/account.js @@ -7,64 +7,64 @@ const logger = createLogger('account'); /** * Loads the keeper's keypair and validates its on-chain state. - * + * * We use Soroban RPC's getAccount endpoint because the keeper is primarily * interacting with Soroban contracts, and the RPC server provides the necessary * account state (sequence number, balances) required for transaction building. - * + * * @returns {Promise<{ keypair: Keypair, accountResponse: any }>} */ async function initializeKeeperAccount() { - const secret = process.env.KEEPER_SECRET; - if (!secret) { - throw new Error('KEEPER_SECRET environment variable is not defined'); - } + const secret = process.env.KEEPER_SECRET; + if (!secret) { + throw new Error('KEEPER_SECRET environment variable is not defined'); + } - let keypair; - try { - keypair = Keypair.fromSecret(secret); - } catch (err) { - throw new Error('Failed to derive keypair from KEEPER_SECRET. Ensure it is a valid Stellar secret key.'); - } + let keypair; + try { + keypair = Keypair.fromSecret(secret); + } catch (err) { + throw new Error('Failed to derive keypair from KEEPER_SECRET. Ensure it is a valid Stellar secret key.'); + } - const publicKey = keypair.publicKey(); - logger.info('Keeper initialized', { publicKey }); + const publicKey = keypair.publicKey(); + logger.info('Keeper initialized', { publicKey }); - const rpcUrl = process.env.SOROBAN_RPC_URL || 'https://soroban-testnet.stellar.org'; - const server = new Server(rpcUrl); + const rpcUrl = process.env.SOROBAN_RPC_URL || 'https://soroban-testnet.stellar.org'; + const server = new Server(rpcUrl); - let accountResponse; - try { - // Fetch account from network - accountResponse = await server.getAccount(publicKey); - } catch (err) { - // Specific handling for account not found - if (err.response && err.response.status === 404) { - throw new Error( - `Keeper account ${publicKey} not found on-chain. ` + - `Please fund this account with at least 1-2 XLM to enable transaction submission.` - ); - } - throw new Error(`Failed to fetch keeper account from RPC: ${err.message}`); + let accountResponse; + try { + // Fetch account from network + accountResponse = await server.getAccount(publicKey); + } catch (err) { + // Specific handling for account not found + if (err.response && err.response.status === 404) { + throw new Error( + `Keeper account ${publicKey} not found on-chain. ` + + 'Please fund this account with at least 1-2 XLM to enable transaction submission.', + ); } + throw new Error(`Failed to fetch keeper account from RPC: ${err.message}`); + } - // Validate balance - const minBalanceXlm = parseFloat(process.env.MIN_KEEPER_BALANCE_XLM || '1.0'); - const haltOnLowBalance = process.env.HALT_ON_LOW_BALANCE === 'true'; + // Validate balance + const minBalanceXlm = parseFloat(process.env.MIN_KEEPER_BALANCE_XLM || '1.0'); + const haltOnLowBalance = process.env.HALT_ON_LOW_BALANCE === 'true'; - const nativeBalance = accountResponse.balances.find(b => b.asset_type === 'native'); - const balanceXlm = nativeBalance ? parseFloat(nativeBalance.balance) : 0; + const nativeBalance = accountResponse.balances.find(b => b.asset_type === 'native'); + const balanceXlm = nativeBalance ? parseFloat(nativeBalance.balance) : 0; - if (balanceXlm < minBalanceXlm) { - const warning = `Keeper balance (${balanceXlm} XLM) is below the recommended minimum of ${minBalanceXlm} XLM.`; - if (haltOnLowBalance) { - throw new Error(`HALTING: ${warning}`); - } else { - logger.warn('Low balance warning', { balanceXlm, minBalanceXlm }); - } + if (balanceXlm < minBalanceXlm) { + const warning = `Keeper balance (${balanceXlm} XLM) is below the recommended minimum of ${minBalanceXlm} XLM.`; + if (haltOnLowBalance) { + throw new Error(`HALTING: ${warning}`); + } else { + logger.warn('Low balance warning', { balanceXlm, minBalanceXlm }); } + } - return { keypair, accountResponse }; + return { keypair, accountResponse }; } /** @@ -73,18 +73,18 @@ async function initializeKeeperAccount() { * @returns {Account} */ function getKeeperAccount(accountResponse) { - return new Account(accountResponse.accountId(), accountResponse.sequenceNumber()); + return new Account(accountResponse.accountId(), accountResponse.sequenceNumber()); } /** * Legacy compatibility with loadAccount from main */ function loadAccount(config) { - return Keypair.fromSecret(config.keeperSecret); + return Keypair.fromSecret(config.keeperSecret); } module.exports = { - initializeKeeperAccount, - getKeeperAccount, - loadAccount + initializeKeeperAccount, + getKeeperAccount, + loadAccount, }; diff --git a/keeper/src/concurrency.js b/keeper/src/concurrency.js new file mode 100644 index 0000000..fddd967 --- /dev/null +++ b/keeper/src/concurrency.js @@ -0,0 +1,40 @@ +function createConcurrencyLimit(concurrency) { + let activeCount = 0; + const queue = []; + const clearedError = new Error('Queue cleared'); + clearedError.name = 'QueueClearedError'; + + const next = () => { + if (activeCount >= concurrency || queue.length === 0) { + return; + } + + const task = queue.shift(); + activeCount++; + + Promise.resolve() + .then(task.fn) + .then(task.resolve, task.reject) + .finally(() => { + activeCount--; + next(); + }); + }; + + const limit = (fn) => + new Promise((resolve, reject) => { + queue.push({ fn, resolve, reject }); + next(); + }); + + limit.clearQueue = () => { + while (queue.length > 0) { + const task = queue.shift(); + task.reject(clearedError); + } + }; + + return limit; +} + +module.exports = { createConcurrencyLimit }; diff --git a/keeper/src/config.js b/keeper/src/config.js index 46e3d6d..00b6eda 100644 --- a/keeper/src/config.js +++ b/keeper/src/config.js @@ -1,21 +1,21 @@ -import dotenv from "dotenv"; +import dotenv from 'dotenv'; dotenv.config(); export function loadConfig() { const required = [ - "SOROBAN_RPC_URL", - "NETWORK_PASSPHRASE", - "KEEPER_SECRET", - "CONTRACT_ID", - "POLLING_INTERVAL_MS", + 'SOROBAN_RPC_URL', + 'NETWORK_PASSPHRASE', + 'KEEPER_SECRET', + 'CONTRACT_ID', + 'POLLING_INTERVAL_MS', ]; const missing = required.filter((key) => !process.env[key]); if (missing.length > 0) { throw new Error( - `Missing required environment variables: ${missing.join(", ")}` + `Missing required environment variables: ${missing.join(', ')}`, ); } @@ -34,4 +34,4 @@ export function loadConfig() { logLevel: process.env.LOG_LEVEL || 'info', nodeEnv: process.env.NODE_ENV || 'production', }; -} \ No newline at end of file +} diff --git a/keeper/src/executor.js b/keeper/src/executor.js index 171bc67..c90fa97 100644 --- a/keeper/src/executor.js +++ b/keeper/src/executor.js @@ -142,7 +142,7 @@ function createExecutor({ logger: customLogger, config } = {}) { onDuplicate: () => { executorLogger.info('Transaction already accepted (duplicate)', { taskId: task.id }); }, - } + }, ); if (retryResult.success) { diff --git a/keeper/src/gasMonitor.js b/keeper/src/gasMonitor.js index 8ac3ebf..4035a7b 100644 --- a/keeper/src/gasMonitor.js +++ b/keeper/src/gasMonitor.js @@ -1,4 +1,3 @@ -const nodeFetch = require("node-fetch"); const { createLogger } = require('./logger'); class GasMonitor { @@ -54,16 +53,16 @@ class GasMonitor { try { const payload = { - event: "low_gas", + event: 'low_gas', taskId: taskId.toString(), gasBalance, timestamp: new Date().toISOString(), }; - const res = await nodeFetch(this.ALERT_WEBHOOK_URL, { - method: "POST", + const res = await fetch(this.ALERT_WEBHOOK_URL, { + method: 'POST', headers: { - "Content-Type": "application/json", + 'Content-Type': 'application/json', }, body: JSON.stringify(payload), }); diff --git a/keeper/src/logger.js b/keeper/src/logger.js index 6b1fa79..6aed624 100644 --- a/keeper/src/logger.js +++ b/keeper/src/logger.js @@ -1,12 +1,12 @@ /** * Structured Logging Module for SoroTask Keeper - * + * * Uses pino for high-performance JSON logging with support for: * - Multiple log levels: trace, debug, info, warn, error, fatal * - Child loggers with module context * - Pretty-printing in development mode * - NDJSON output in production - * + * * SECURITY NOTE: Sensitive fields (keypair secrets, private keys, passwords) * must NEVER be logged. The logger automatically redacts common sensitive fields. */ @@ -29,41 +29,56 @@ const SENSITIVE_FIELDS = [ 'keypair', ]; -/** - * Default log level from environment or 'info' - */ -const DEFAULT_LOG_LEVEL = process.env.LOG_LEVEL || 'info'; +const VALID_LOG_LEVELS = ['trace', 'debug', 'info', 'warn', 'error', 'fatal']; -/** - * Check if running in development mode - */ -const IS_DEVELOPMENT = process.env.NODE_ENV === 'development'; +function normalizeLogLevel(level) { + return VALID_LOG_LEVELS.includes(level) ? level : 'info'; +} + +function shouldUsePrettyTransport() { + return process.env.LOG_FORMAT === 'pretty'; +} /** * Create the base pino logger instance - * - * In development: Use pretty printing for human-readable output - * In production: Use NDJSON (newline-delimited JSON) for log aggregation + * + * JSON is the default output in every environment so logs remain machine-ingestible. + * Pretty printing is available only when explicitly requested via LOG_FORMAT=pretty. */ -function createBaseLogger() { - const options = { - level: DEFAULT_LOG_LEVEL, - // Base fields included in every log entry +function createBaseLogger(overrides = {}, destination) { + const loggerOptions = { + level: normalizeLogLevel(process.env.LOG_LEVEL), base: { + service: 'keeper', pid: process.pid, }, - // Redact sensitive fields redact: { paths: SENSITIVE_FIELDS, - remove: true, // Completely remove sensitive fields rather than replacing with [Redacted] + remove: true, }, - // Custom timestamp format (ISO 8601) - timestamp: pino.stdTimeFunctions.isoTime, + formatters: { + bindings(bindings) { + return { + service: 'keeper', + pid: bindings.pid, + module: bindings.module, + }; + }, + level(label) { + return { level: label }; + }, + }, + messageKey: 'message', + timestamp: () => `,"timestamp":"${new Date().toISOString()}"`, + serializers: { + err: pino.stdSerializers.err, + error: pino.stdSerializers.err, + }, + ...overrides, }; - // In development, use pretty printing if pino-pretty is available - if (IS_DEVELOPMENT) { - options.transport = { + if (shouldUsePrettyTransport()) { + loggerOptions.transport = loggerOptions.transport || { target: 'pino-pretty', options: { colorize: true, @@ -74,7 +89,9 @@ function createBaseLogger() { }; } - return pino(options); + const logger = destination ? pino(loggerOptions, destination) : pino(loggerOptions); + logger.logFormat = shouldUsePrettyTransport() ? 'pretty' : 'json'; + return logger; } // Singleton base logger instance @@ -93,10 +110,10 @@ function getBaseLogger() { /** * Create a child logger with module context - * + * * @param {string} module - Module name (e.g., 'poller', 'executor', 'registry') * @returns {Object} Child logger with module context - * + * * @example * const logger = createLogger('poller'); * logger.info('Polling started', { taskCount: 5 }); @@ -104,10 +121,10 @@ function getBaseLogger() { */ function createLogger(module) { const parent = getBaseLogger(); - + // Create child logger with module context const child = parent.child({ module }); - + // Wrap the logger to provide a consistent interface return { trace: (msg, meta = {}) => { @@ -145,17 +162,12 @@ function createChildLogger(module) { /** * Reinitialize the base logger with new options * Useful for testing or dynamic configuration changes - * + * * @param {Object} options - Pino options */ function reinitializeLogger(options = {}) { - baseLogger = pino({ - level: options.level || DEFAULT_LOG_LEVEL, - base: { pid: process.pid }, - redact: { paths: SENSITIVE_FIELDS, remove: true }, - timestamp: pino.stdTimeFunctions.isoTime, - ...options, - }); + const { destination, ...loggerOptions } = options; + baseLogger = createBaseLogger(loggerOptions, destination); } /** @@ -182,5 +194,6 @@ module.exports = { reinitializeLogger, getLogLevel, setLogLevel, + normalizeLogLevel, SENSITIVE_FIELDS, }; diff --git a/keeper/src/metrics.js b/keeper/src/metrics.js index 3b43c9d..daf3d11 100644 --- a/keeper/src/metrics.js +++ b/keeper/src/metrics.js @@ -1,4 +1,4 @@ -const http = require("http"); +const http = require('http'); /** * Metrics store for tracking operational statistics. @@ -33,7 +33,7 @@ class Metrics { } record(key, value) { - if (key === "avgFeePaidXlm") { + if (key === 'avgFeePaidXlm') { this.feeSamples.push(value); if (this.feeSamples.length > this.maxFeeSamples) { this.feeSamples.shift(); @@ -50,7 +50,7 @@ class Metrics { if (state.lastPollAt) { this.lastPollAt = state.lastPollAt; } - if (typeof state.rpcConnected === "boolean") { + if (typeof state.rpcConnected === 'boolean') { this.rpcConnected = state.rpcConnected; } } @@ -70,7 +70,7 @@ class Metrics { now - this.lastPollAt.getTime() > staleThreshold; return { - status: isStale ? "stale" : "ok", + status: isStale ? 'stale' : 'ok', uptime: uptimeSeconds, lastPollAt: this.lastPollAt ? this.lastPollAt.toISOString() : null, rpcConnected: this.rpcConnected, @@ -98,8 +98,8 @@ class MetricsServer { this.logger = logger; this.port = parseInt(process.env.METRICS_PORT, 10) || 3000; this.healthStaleThreshold = parseInt( - process.env.HEALTH_STALE_THRESHOLD_MS || "60000", - 10 + process.env.HEALTH_STALE_THRESHOLD_MS || '60000', + 10, ); this.server = null; this.metrics = new Metrics(); @@ -107,34 +107,34 @@ class MetricsServer { start() { this.server = http.createServer((req, res) => { - if (req.url === "/health" || req.url === "/health/") { + if (req.url === '/health' || req.url === '/health/') { this.handleHealth(res); - } else if (req.url === "/metrics" || req.url === "/metrics/") { + } else if (req.url === '/metrics' || req.url === '/metrics/') { this.handleMetrics(res); } else { res.writeHead(404); - res.end("Not Found"); + res.end('Not Found'); } }); this.server.listen(this.port, () => { this.logger.info(`Metrics server running on port ${this.port}`); this.logger.info( - `Health endpoint: http://localhost:${this.port}/health` + `Health endpoint: http://localhost:${this.port}/health`, ); this.logger.info( - `Metrics endpoint: http://localhost:${this.port}/metrics` + `Metrics endpoint: http://localhost:${this.port}/metrics`, ); }); } handleHealth(res) { const healthStatus = this.metrics.getHealthStatus( - this.healthStaleThreshold + this.healthStaleThreshold, ); - const httpStatus = healthStatus.status === "stale" ? 503 : 200; + const httpStatus = healthStatus.status === 'stale' ? 503 : 200; - res.writeHead(httpStatus, { "Content-Type": "application/json" }); + res.writeHead(httpStatus, { 'Content-Type': 'application/json' }); res.end(JSON.stringify(healthStatus, null, 2)); } @@ -153,7 +153,7 @@ class MetricsServer { alertWebhookEnabled: gasConfig.alertWebhookEnabled, }; - res.writeHead(200, { "Content-Type": "application/json" }); + res.writeHead(200, { 'Content-Type': 'application/json' }); res.end(JSON.stringify(metricsData, null, 2)); } @@ -172,7 +172,7 @@ class MetricsServer { stop() { if (this.server) { this.server.close(); - this.logger.info("Metrics server stopped"); + this.logger.info('Metrics server stopped'); } } } diff --git a/keeper/src/poller.js b/keeper/src/poller.js index 7299c7c..98530e2 100644 --- a/keeper/src/poller.js +++ b/keeper/src/poller.js @@ -1,5 +1,5 @@ -const pLimit = require('p-limit'); const { Contract, xdr, TransactionBuilder, BASE_FEE, Networks, scValToNative } = require('@stellar/stellar-sdk'); +const { createConcurrencyLimit } = require('./concurrency'); const { createLogger } = require('./logger'); /** @@ -8,302 +8,301 @@ const { createLogger } = require('./logger'); * based on last_run + interval <= current_ledger_timestamp. */ class TaskPoller { - constructor(server, contractId, options = {}) { - this.server = server; - this.contractId = contractId; - - // Structured logger for poller module - this.logger = options.logger || createLogger('poller'); - - // Configuration with defaults - this.maxConcurrentReads = parseInt( - options.maxConcurrentReads || process.env.MAX_CONCURRENT_READS || 10, - 10 - ); - - // Create concurrency limiter for parallel task reads - const limitFn = pLimit.default || pLimit; - this.readLimit = limitFn(this.maxConcurrentReads); - - // Statistics - this.stats = { - lastPollTime: null, - tasksChecked: 0, - tasksDue: 0, - tasksSkipped: 0, - errors: 0 - }; - } - - /** + constructor(server, contractId, options = {}) { + this.server = server; + this.contractId = contractId; + + // Structured logger for poller module + this.logger = options.logger || createLogger('poller'); + + // Configuration with defaults + this.maxConcurrentReads = parseInt( + options.maxConcurrentReads || process.env.MAX_CONCURRENT_READS || 10, + 10, + ); + + // Create concurrency limiter for parallel task reads + this.readLimit = createConcurrencyLimit(this.maxConcurrentReads); + + // Statistics + this.stats = { + lastPollTime: null, + tasksChecked: 0, + tasksDue: 0, + tasksSkipped: 0, + errors: 0, + }; + } + + /** * Poll the contract for all registered tasks and determine which are due for execution. - * + * * @param {number[]} taskIds - Array of task IDs to check * @returns {Promise} Array of task IDs that are due for execution */ - async pollDueTasks(taskIds) { - const startTime = Date.now(); - this.stats.lastPollTime = new Date().toISOString(); - this.stats.tasksChecked = 0; - this.stats.tasksDue = 0; - this.stats.tasksSkipped = 0; - this.stats.errors = 0; - - if (!taskIds || taskIds.length === 0) { - this.logger.info('No tasks to check'); - return []; - } + async pollDueTasks(taskIds) { + const startTime = Date.now(); + this.stats.lastPollTime = new Date().toISOString(); + this.stats.tasksChecked = 0; + this.stats.tasksDue = 0; + this.stats.tasksSkipped = 0; + this.stats.errors = 0; + + if (!taskIds || taskIds.length === 0) { + this.logger.info('No tasks to check'); + return []; + } - try { - // Fetch current ledger timestamp - const ledgerInfo = await this.server.getLatestLedger(); - const currentTimestamp = ledgerInfo.sequence; // Using sequence as timestamp proxy + try { + // Fetch current ledger timestamp + const ledgerInfo = await this.server.getLatestLedger(); + const currentTimestamp = ledgerInfo.sequence; // Using sequence as timestamp proxy - // Note: In production, you'd want to use the actual ledger timestamp - // which might require additional RPC calls or using ledger.timestamp from contract context - this.logger.info('Current ledger sequence', { sequence: currentTimestamp }); + // Note: In production, you'd want to use the actual ledger timestamp + // which might require additional RPC calls or using ledger.timestamp from contract context + this.logger.info('Current ledger sequence', { sequence: currentTimestamp }); - // Process tasks in parallel with concurrency control - const taskChecks = taskIds.map(taskId => - this.readLimit(() => this.checkTask(taskId, currentTimestamp)) - ); + // Process tasks in parallel with concurrency control + const taskChecks = taskIds.map(taskId => + this.readLimit(() => this.checkTask(taskId, currentTimestamp)), + ); - const results = await Promise.allSettled(taskChecks); + const results = await Promise.allSettled(taskChecks); - // Collect due task IDs from successful checks - const dueTaskIds = []; + // Collect due task IDs from successful checks + const dueTaskIds = []; - results.forEach((result, index) => { - if (result.status === 'fulfilled' && result.value) { - const { isDue, taskId, reason } = result.value; + results.forEach((result, index) => { + if (result.status === 'fulfilled' && result.value) { + const { isDue, taskId, reason } = result.value; - if (isDue) { - dueTaskIds.push(taskId); - this.stats.tasksDue++; - } else if (reason === 'skipped') { - this.stats.tasksSkipped++; - } + if (isDue) { + dueTaskIds.push(taskId); + this.stats.tasksDue++; + } else if (reason === 'skipped') { + this.stats.tasksSkipped++; + } - this.stats.tasksChecked++; - } else if (result.status === 'rejected') { - this.stats.errors++; - this.logger.error('Error checking task', { taskId: taskIds[index], error: result.reason?.message || result.reason }); - } - }); + this.stats.tasksChecked++; + } else if (result.status === 'rejected') { + this.stats.errors++; + this.logger.error('Error checking task', { taskId: taskIds[index], error: result.reason?.message || result.reason }); + } + }); - const duration = Date.now() - startTime; - this.logPollSummary(duration); + const duration = Date.now() - startTime; + this.logPollSummary(duration); - return dueTaskIds; + return dueTaskIds; - } catch (error) { - this.logger.error('Fatal error during polling cycle', { error: error.message, stack: error.stack }); - this.stats.errors++; - return []; - } + } catch (error) { + this.logger.error('Fatal error during polling cycle', { error: error.message, stack: error.stack }); + this.stats.errors++; + return []; } + } - /** + /** * Check a single task to determine if it's due for execution. - * + * * @param {number} taskId - The task ID to check * @param {number} currentTimestamp - Current ledger timestamp * @returns {Promise<{isDue: boolean, taskId: number, reason?: string}>} */ - async checkTask(taskId, currentTimestamp) { - try { - // Read task configuration from contract using view call - const taskConfig = await this.getTaskConfig(taskId); - - if (!taskConfig) { - this.logger.warn('Task not found (may have been deregistered)', { taskId }); - return { isDue: false, taskId, reason: 'not_found' }; - } - - // Check gas balance - if (taskConfig.gas_balance <= 0) { - this.logger.warn('Task has insufficient gas balance', { taskId, gasBalance: taskConfig.gas_balance }); - return { isDue: false, taskId, reason: 'skipped' }; - } - - // Calculate if task is due: last_run + interval <= currentTimestamp - const nextRunTime = taskConfig.last_run + taskConfig.interval; - const isDue = nextRunTime <= currentTimestamp; - - if (isDue) { - this.logger.info('Task is due', { - taskId, - lastRun: taskConfig.last_run, - interval: taskConfig.interval, - nextRun: nextRunTime, - current: currentTimestamp - }); - } - - return { isDue, taskId }; - - } catch (error) { - this.logger.error('Error checking task', { taskId, error: error.message }); - throw error; - } + async checkTask(taskId, currentTimestamp) { + try { + // Read task configuration from contract using view call + const taskConfig = await this.getTaskConfig(taskId); + + if (!taskConfig) { + this.logger.warn('Task not found (may have been deregistered)', { taskId }); + return { isDue: false, taskId, reason: 'not_found' }; + } + + // Check gas balance + if (taskConfig.gas_balance <= 0) { + this.logger.warn('Task has insufficient gas balance', { taskId, gasBalance: taskConfig.gas_balance }); + return { isDue: false, taskId, reason: 'skipped' }; + } + + // Calculate if task is due: last_run + interval <= currentTimestamp + const nextRunTime = taskConfig.last_run + taskConfig.interval; + const isDue = nextRunTime <= currentTimestamp; + + if (isDue) { + this.logger.info('Task is due', { + taskId, + lastRun: taskConfig.last_run, + interval: taskConfig.interval, + nextRun: nextRunTime, + current: currentTimestamp, + }); + } + + return { isDue, taskId }; + + } catch (error) { + this.logger.error('Error checking task', { taskId, error: error.message }); + throw error; } + } - /** + /** * Retrieve task configuration from the contract. * Uses simulateTransaction for a view call that doesn't consume fees. - * + * * @param {number} taskId - The task ID to retrieve * @returns {Promise} Task configuration or null if not found */ - async getTaskConfig(taskId) { - try { - const contract = new Contract(this.contractId); - - // Create the operation to call get_task - const operation = contract.call( - 'get_task', - xdr.ScVal.scvU64(xdr.Uint64.fromString(taskId.toString())) - ); - - // Simulate the transaction (view call - no fees) - const account = await this.server.getAccount( - // Use a dummy account for simulation - 'GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWHF' - ); - - const transaction = new TransactionBuilder(account, { - fee: BASE_FEE, - networkPassphrase: process.env.NETWORK_PASSPHRASE || Networks.FUTURENET - }) - .addOperation(operation) - .setTimeout(30) - .build(); - - const simulated = await this.server.simulateTransaction(transaction); - - if (!simulated.results || simulated.results.length === 0) { - return null; - } - - const result = simulated.results[0]; - - if (!result.retval) { - return null; - } - - // Decode the XDR result - const taskConfig = this.decodeTaskConfig(result.retval); - return taskConfig; - - } catch (error) { - // Task might not exist or other error occurred - if (error.message && error.message.includes('not found')) { - return null; - } - throw error; - } + async getTaskConfig(taskId) { + try { + const contract = new Contract(this.contractId); + + // Create the operation to call get_task + const operation = contract.call( + 'get_task', + xdr.ScVal.scvU64(xdr.Uint64.fromString(taskId.toString())), + ); + + // Simulate the transaction (view call - no fees) + const account = await this.server.getAccount( + // Use a dummy account for simulation + 'GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWHF', + ); + + const transaction = new TransactionBuilder(account, { + fee: BASE_FEE, + networkPassphrase: process.env.NETWORK_PASSPHRASE || Networks.FUTURENET, + }) + .addOperation(operation) + .setTimeout(30) + .build(); + + const simulated = await this.server.simulateTransaction(transaction); + + if (!simulated.results || simulated.results.length === 0) { + return null; + } + + const result = simulated.results[0]; + + if (!result.retval) { + return null; + } + + // Decode the XDR result + const taskConfig = this.decodeTaskConfig(result.retval); + return taskConfig; + + } catch (error) { + // Task might not exist or other error occurred + if (error.message && error.message.includes('not found')) { + return null; + } + throw error; } + } - /** + /** * Decode TaskConfig from XDR ScVal. - * + * * @param {xdr.ScVal} scVal - The XDR value returned from get_task * @returns {Object|null} Decoded task configuration */ - decodeTaskConfig(scVal) { - try { - // Check if it's an Option::None - if (scVal.switch().name === 'scvVoid') { - return null; - } - - // For Option::Some, unwrap the value - let taskVal = scVal; - if (scVal.switch().name === 'scvVec') { - const vec = scVal.vec(); - if (vec.length === 0) { - return null; - } - // Option::Some wraps the value in a vec with one element - taskVal = vec[0]; - } - - // TaskConfig is a struct (scvMap) - if (taskVal.switch().name !== 'scvMap') { - this.logger.warn('Unexpected ScVal type for TaskConfig', { type: taskVal.switch().name }); - return null; - } - - const map = taskVal.map(); - const config = {}; - - // Extract fields from the map - map.forEach(entry => { - const key = scValToNative(entry.key()); - const val = entry.val(); - - switch (key) { - case 'last_run': - config.last_run = Number(scValToNative(val)); - break; - case 'interval': - config.interval = Number(scValToNative(val)); - break; - case 'gas_balance': - config.gas_balance = Number(scValToNative(val)); - break; - case 'creator': - config.creator = scValToNative(val); - break; - case 'target': - config.target = scValToNative(val); - break; - case 'function': - config.function = scValToNative(val); - break; - case 'args': - config.args = scValToNative(val); - break; - case 'resolver': - config.resolver = scValToNative(val); - break; - case 'whitelist': - config.whitelist = scValToNative(val); - break; - } - }); - - return config; - - } catch (error) { - this.logger.error('Error decoding TaskConfig XDR', { error: error.message }); - return null; + decodeTaskConfig(scVal) { + try { + // Check if it's an Option::None + if (scVal.switch().name === 'scvVoid') { + return null; + } + + // For Option::Some, unwrap the value + let taskVal = scVal; + if (scVal.switch().name === 'scvVec') { + const vec = scVal.vec(); + if (vec.length === 0) { + return null; + } + // Option::Some wraps the value in a vec with one element + taskVal = vec[0]; + } + + // TaskConfig is a struct (scvMap) + if (taskVal.switch().name !== 'scvMap') { + this.logger.warn('Unexpected ScVal type for TaskConfig', { type: taskVal.switch().name }); + return null; + } + + const map = taskVal.map(); + const config = {}; + + // Extract fields from the map + map.forEach(entry => { + const key = scValToNative(entry.key()); + const val = entry.val(); + + switch (key) { + case 'last_run': + config.last_run = Number(scValToNative(val)); + break; + case 'interval': + config.interval = Number(scValToNative(val)); + break; + case 'gas_balance': + config.gas_balance = Number(scValToNative(val)); + break; + case 'creator': + config.creator = scValToNative(val); + break; + case 'target': + config.target = scValToNative(val); + break; + case 'function': + config.function = scValToNative(val); + break; + case 'args': + config.args = scValToNative(val); + break; + case 'resolver': + config.resolver = scValToNative(val); + break; + case 'whitelist': + config.whitelist = scValToNative(val); + break; } + }); + + return config; + + } catch (error) { + this.logger.error('Error decoding TaskConfig XDR', { error: error.message }); + return null; } + } - /** + /** * Log a summary of the polling cycle. - * + * * @param {number} duration - Duration of the poll in milliseconds */ - logPollSummary(duration) { - this.logger.info('Poll complete', { - durationMs: duration, - checked: this.stats.tasksChecked, - due: this.stats.tasksDue, - skipped: this.stats.tasksSkipped, - errors: this.stats.errors - }); - } - - /** + logPollSummary(duration) { + this.logger.info('Poll complete', { + durationMs: duration, + checked: this.stats.tasksChecked, + due: this.stats.tasksDue, + skipped: this.stats.tasksSkipped, + errors: this.stats.errors, + }); + } + + /** * Get current polling statistics. - * + * * @returns {Object} Current statistics */ - getStats() { - return { ...this.stats }; - } + getStats() { + return { ...this.stats }; + } } module.exports = TaskPoller; diff --git a/keeper/src/queue.js b/keeper/src/queue.js index 6dcffe9..25ab533 100644 --- a/keeper/src/queue.js +++ b/keeper/src/queue.js @@ -1,5 +1,5 @@ -const EventEmitter = require("events"); -const pLimit = require("p-limit"); +const EventEmitter = require('events'); +const { createConcurrencyLimit } = require('./concurrency'); class ExecutionQueue extends EventEmitter { constructor(limit, metricsServer) { @@ -7,10 +7,10 @@ class ExecutionQueue extends EventEmitter { this.concurrencyLimit = parseInt( limit || process.env.MAX_CONCURRENT_EXECUTIONS || 3, - 10 + 10, ); - this.limit = pLimit(this.concurrencyLimit); + this.limit = createConcurrencyLimit(this.concurrencyLimit); this.metricsServer = metricsServer; this.depth = 0; @@ -24,14 +24,14 @@ class ExecutionQueue extends EventEmitter { async enqueue(taskIds, executorFn) { const validTaskIds = taskIds.filter( - (id) => !this.failedTasks.has(id) + (id) => !this.failedTasks.has(id), ); this.depth = validTaskIds.length; // Track tasks due for this cycle if (this.metricsServer) { - this.metricsServer.increment("tasksDueTotal", validTaskIds.length); + this.metricsServer.increment('tasksDueTotal', validTaskIds.length); } const cycleStartTime = Date.now(); @@ -39,24 +39,24 @@ class ExecutionQueue extends EventEmitter { const cyclePromises = validTaskIds.map((taskId) => { return this.limit(async () => { this.inFlight++; - this.depth--; + this.depth = Math.max(this.depth - 1, 0); - this.emit("task:started", taskId); + this.emit('task:started', taskId); try { await executorFn(taskId); this.completed++; if (this.metricsServer) { - this.metricsServer.increment("tasksExecutedTotal"); + this.metricsServer.increment('tasksExecutedTotal', 1); } - this.emit("task:success", taskId); + this.emit('task:success', taskId); } catch (error) { this.failedCount++; this.failedTasks.add(taskId); if (this.metricsServer) { - this.metricsServer.increment("tasksFailedTotal"); + this.metricsServer.increment('tasksFailedTotal', 1); } - this.emit("task:failed", taskId, error); + this.emit('task:failed', taskId, error); } finally { this.inFlight--; } @@ -71,11 +71,11 @@ class ExecutionQueue extends EventEmitter { // already handled } finally { const cycleDuration = Date.now() - cycleStartTime; - if (this.metricsServer) { - this.metricsServer.record("lastCycleDurationMs", cycleDuration); + if (this.metricsServer?.record) { + this.metricsServer.record('lastCycleDurationMs', cycleDuration); } - this.emit("cycle:complete", { + this.emit('cycle:complete', { depth: this.depth, inFlight: this.inFlight, completed: this.completed, @@ -90,6 +90,11 @@ class ExecutionQueue extends EventEmitter { async drain() { this.limit.clearQueue(); + this.depth = 0; + + if (this.activePromises.length > 0) { + await Promise.allSettled(this.activePromises); + } while (this.inFlight > 0) { await new Promise((r) => setTimeout(r, 50)); diff --git a/keeper/src/registry.js b/keeper/src/registry.js index 122f0e6..621ef19 100644 --- a/keeper/src/registry.js +++ b/keeper/src/registry.js @@ -7,176 +7,176 @@ const DATA_DIR = path.join(__dirname, '..', 'data'); const TASKS_FILE = path.join(DATA_DIR, 'tasks.json'); class TaskRegistry { - constructor(server, contractId, options = {}) { - this.server = server; - this.contractId = contractId; - this.taskIds = new Set(); - this.lastSeenLedger = options.startLedger || 0; - this.logger = options.logger || createLogger('registry'); - this._ensureDataDir(); - this._loadFromDisk(); - } - - /** + constructor(server, contractId, options = {}) { + this.server = server; + this.contractId = contractId; + this.taskIds = new Set(); + this.lastSeenLedger = options.startLedger || 0; + this.logger = options.logger || createLogger('registry'); + this._ensureDataDir(); + this._loadFromDisk(); + } + + /** * Initialize the registry: load persisted state, then backfill any * historical events we may have missed since the last run. */ - async init() { - this.logger.info('Initializing task registry'); - await this._fetchEvents(); - this.logger.info('Registry initialized', { taskCount: this.taskIds.size }); - } + async init() { + this.logger.info('Initializing task registry'); + await this._fetchEvents(); + this.logger.info('Registry initialized', { taskCount: this.taskIds.size }); + } - /** + /** * Poll for new TaskRegistered events since last seen ledger. * Call this on every polling cycle. */ - async poll() { - await this._fetchEvents(); - } + async poll() { + await this._fetchEvents(); + } - /** + /** * Return the current list of known task IDs. * @returns {number[]} */ - getTaskIds() { - return Array.from(this.taskIds).sort((a, b) => a - b); - } + getTaskIds() { + return Array.from(this.taskIds).sort((a, b) => a - b); + } + + // ---- internal ---- + + async _fetchEvents() { + try { + // We need a valid startLedger. If we don't have one, grab the latest. + if (!this.lastSeenLedger) { + const info = await this.server.getLatestLedger(); + // Look back a reasonable window (default ~1 hour on testnet ≈ 720 ledgers) + this.lastSeenLedger = Math.max(info.sequence - 720, 0); + } + + const contractId = this.contractId; + + // Fetch events page by page + let cursor = undefined; + let hasMore = true; + + while (hasMore) { + const params = { + startLedger: cursor ? undefined : this.lastSeenLedger, + filters: [ + { + type: 'contract', + contractIds: [contractId], + topics: [ + ['AAAADwAAAA9UYXNrUmVnaXN0ZXJlZAA=', '*'], // Symbol("TaskRegistered"), * + ], + }, + ], + limit: 100, + }; + + if (cursor) { + params.cursor = cursor; + delete params.startLedger; + } - // ---- internal ---- + const response = await this.server.getEvents(params); - async _fetchEvents() { - try { - // We need a valid startLedger. If we don't have one, grab the latest. - if (!this.lastSeenLedger) { - const info = await this.server.getLatestLedger(); - // Look back a reasonable window (default ~1 hour on testnet ≈ 720 ledgers) - this.lastSeenLedger = Math.max(info.sequence - 720, 0); - } + if (!response || !response.events || response.events.length === 0) { + hasMore = false; + break; + } - const contractId = this.contractId; - - // Fetch events page by page - let cursor = undefined; - let hasMore = true; - - while (hasMore) { - const params = { - startLedger: cursor ? undefined : this.lastSeenLedger, - filters: [ - { - type: 'contract', - contractIds: [contractId], - topics: [ - ['AAAADwAAAA9UYXNrUmVnaXN0ZXJlZAA=', '*'] // Symbol("TaskRegistered"), * - ] - } - ], - limit: 100, - }; - - if (cursor) { - params.cursor = cursor; - delete params.startLedger; - } - - const response = await this.server.getEvents(params); - - if (!response || !response.events || response.events.length === 0) { - hasMore = false; - break; - } - - for (const event of response.events) { - try { - const taskId = this._extractTaskId(event); - if (taskId !== null && !this.taskIds.has(taskId)) { - this.taskIds.add(taskId); - this.logger.info('Discovered task ID', { taskId }); - } - } catch (err) { - this.logger.warn('Failed to decode event', { error: err.message }); - } - - // Track the latest ledger we've processed - if (event.ledger && event.ledger > this.lastSeenLedger) { - this.lastSeenLedger = event.ledger; - } - } - - // If we got fewer events than the limit, we're done - if (response.events.length < 100) { - hasMore = false; - } else { - cursor = response.cursor || response.events[response.events.length - 1].pagingToken; - } + for (const event of response.events) { + try { + const taskId = this._extractTaskId(event); + if (taskId !== null && !this.taskIds.has(taskId)) { + this.taskIds.add(taskId); + this.logger.info('Discovered task ID', { taskId }); } + } catch (err) { + this.logger.warn('Failed to decode event', { error: err.message }); + } + + // Track the latest ledger we've processed + if (event.ledger && event.ledger > this.lastSeenLedger) { + this.lastSeenLedger = event.ledger; + } + } - this._saveToDisk(); - } catch (err) { - // Don't crash on transient RPC errors — just log and keep going - this.logger.error('Error fetching events', { error: err.message }); + // If we got fewer events than the limit, we're done + if (response.events.length < 100) { + hasMore = false; + } else { + cursor = response.cursor || response.events[response.events.length - 1].pagingToken; } + } + + this._saveToDisk(); + } catch (err) { + // Don't crash on transient RPC errors — just log and keep going + this.logger.error('Error fetching events', { error: err.message }); } + } - /** + /** * Extract the u64 task ID from the second topic of a TaskRegistered event. */ - _extractTaskId(event) { - // event.topic is an array of base64-encoded XDR ScVal values - // topic[0] = Symbol("TaskRegistered"), topic[1] = task_id (u64) - if (!event.topic || event.topic.length < 2) { - return null; - } - - const taskIdXdr = event.topic[1]; + _extractTaskId(event) { + // event.topic is an array of base64-encoded XDR ScVal values + // topic[0] = Symbol("TaskRegistered"), topic[1] = task_id (u64) + if (!event.topic || event.topic.length < 2) { + return null; + } - // The topic values come as base64-encoded XDR - const scVal = xdr.ScVal.fromXDR(taskIdXdr, 'base64'); + const taskIdXdr = event.topic[1]; - // Extract the u64 value - if (scVal.switch().name === 'scvU64') { - return Number(scVal.u64()); - } + // The topic values come as base64-encoded XDR + const scVal = xdr.ScVal.fromXDR(taskIdXdr, 'base64'); - return null; + // Extract the u64 value + if (scVal.switch().name === 'scvU64') { + return Number(scVal.u64()); } - _ensureDataDir() { - if (!fs.existsSync(DATA_DIR)) { - fs.mkdirSync(DATA_DIR, { recursive: true }); - } - } + return null; + } - _loadFromDisk() { - try { - if (fs.existsSync(TASKS_FILE)) { - const data = JSON.parse(fs.readFileSync(TASKS_FILE, 'utf-8')); - if (Array.isArray(data.taskIds)) { - data.taskIds.forEach(id => this.taskIds.add(id)); - } - if (data.lastSeenLedger && data.lastSeenLedger > this.lastSeenLedger) { - this.lastSeenLedger = data.lastSeenLedger; - } - this.logger.info('Loaded tasks from disk', { taskCount: this.taskIds.size, ledger: this.lastSeenLedger }); - } - } catch (err) { - this.logger.warn('Could not load persisted tasks', { error: err.message }); - } + _ensureDataDir() { + if (!fs.existsSync(DATA_DIR)) { + fs.mkdirSync(DATA_DIR, { recursive: true }); } - - _saveToDisk() { - try { - const data = { - taskIds: Array.from(this.taskIds).sort((a, b) => a - b), - lastSeenLedger: this.lastSeenLedger, - updatedAt: new Date().toISOString() - }; - fs.writeFileSync(TASKS_FILE, JSON.stringify(data, null, 2)); - } catch (err) { - this.logger.warn('Could not persist tasks', { error: err.message }); + } + + _loadFromDisk() { + try { + if (fs.existsSync(TASKS_FILE)) { + const data = JSON.parse(fs.readFileSync(TASKS_FILE, 'utf-8')); + if (Array.isArray(data.taskIds)) { + data.taskIds.forEach(id => this.taskIds.add(id)); } + if (data.lastSeenLedger && data.lastSeenLedger > this.lastSeenLedger) { + this.lastSeenLedger = data.lastSeenLedger; + } + this.logger.info('Loaded tasks from disk', { taskCount: this.taskIds.size, ledger: this.lastSeenLedger }); + } + } catch (err) { + this.logger.warn('Could not load persisted tasks', { error: err.message }); + } + } + + _saveToDisk() { + try { + const data = { + taskIds: Array.from(this.taskIds).sort((a, b) => a - b), + lastSeenLedger: this.lastSeenLedger, + updatedAt: new Date().toISOString(), + }; + fs.writeFileSync(TASKS_FILE, JSON.stringify(data, null, 2)); + } catch (err) { + this.logger.warn('Could not persist tasks', { error: err.message }); } + } } module.exports = TaskRegistry; diff --git a/keeper/src/retry.js b/keeper/src/retry.js index 70492ee..597d351 100644 --- a/keeper/src/retry.js +++ b/keeper/src/retry.js @@ -56,43 +56,45 @@ function classifyError(error) { if (!error) return ErrorClassification.NON_RETRYABLE; const errorCode = error.code || error.errorCode || extractErrorCode(error); - const errorMessage = error.message || error.resultXdr || ''; + const normalizedCode = typeof errorCode === 'string' ? errorCode.toUpperCase() : errorCode; + const errorMessage = String(error.message || error.resultXdr || ''); + const normalizedMessage = errorMessage.toLowerCase(); // Check for duplicate transaction indicators - if (DUPLICATE_ERROR_CODES.some(code => - errorCode === code || - errorMessage.includes(code) || - errorMessage.includes('duplicate') || - errorMessage.includes('already in ledger') + if (DUPLICATE_ERROR_CODES.some(code => + normalizedCode === code || + normalizedMessage.includes(code.toLowerCase()) || + normalizedMessage.includes('duplicate') || + normalizedMessage.includes('already in ledger'), )) { return ErrorClassification.DUPLICATE; } // Check for non-retryable errors - if (NON_RETRYABLE_ERROR_CODES.some(code => - errorCode === code || - errorMessage.includes(code) + if (NON_RETRYABLE_ERROR_CODES.some(code => + normalizedCode === code || + normalizedMessage.includes(code.toLowerCase()), )) { return ErrorClassification.NON_RETRYABLE; } // Check for retryable errors - if (RETRYABLE_ERROR_CODES.some(code => - errorCode === code || - errorMessage.includes(code) + if (RETRYABLE_ERROR_CODES.some(code => + normalizedCode === code || + normalizedMessage.includes(code.toLowerCase()), )) { return ErrorClassification.RETRYABLE; } // Default to retryable for unknown network/transient errors // This includes generic network errors, timeouts, etc. - if (errorMessage.includes('timeout') || - errorMessage.includes('network') || - errorMessage.includes('ECONNREFUSED') || - errorMessage.includes('ETIMEDOUT') || - errorMessage.includes('ENOTFOUND') || - errorMessage.includes('socket hang up') || - errorMessage.includes('fetch failed')) { + if (normalizedMessage.includes('timeout') || + normalizedMessage.includes('network') || + normalizedMessage.includes('econnrefused') || + normalizedMessage.includes('etimedout') || + normalizedMessage.includes('enotfound') || + normalizedMessage.includes('socket hang up') || + normalizedMessage.includes('fetch failed')) { return ErrorClassification.RETRYABLE; } @@ -127,14 +129,14 @@ function extractErrorCode(error) { function calculateDelay(attempt, baseDelay, maxDelay) { // Exponential backoff: baseDelay * 2^attempt const exponentialDelay = baseDelay * Math.pow(2, attempt); - + // Cap at maxDelay const cappedDelay = Math.min(exponentialDelay, maxDelay); - + // Add jitter: random value between 0 and baseDelay // This prevents thundering herd on shared RPC nodes const jitter = Math.random() * baseDelay; - + return Math.floor(cappedDelay + jitter); } @@ -161,7 +163,7 @@ const DEFAULT_OPTIONS = { /** * Generic higher-order async retry wrapper with exponential backoff and jitter - * + * * @param {Function} fn - The async function to wrap * @param {Object} options - Retry configuration options * @param {number} options.maxRetries - Maximum number of retry attempts (default: 3) @@ -259,7 +261,7 @@ export async function withRetry(fn, options = {}) { /** * Legacy retry function for backward compatibility * Simple retry with fixed delay - * + * * @param {Function} fn - The async function to retry * @param {number} attempts - Maximum number of attempts * @param {number} delay - Delay between attempts in milliseconds diff --git a/keeper/src/rpc.js b/keeper/src/rpc.js index 511a166..bde6a90 100644 --- a/keeper/src/rpc.js +++ b/keeper/src/rpc.js @@ -1,12 +1,12 @@ -import pkg from "@stellar/stellar-sdk"; +import pkg from '@stellar/stellar-sdk'; const { SorobanRpc } = pkg; export async function createRpc(config, logger) { const server = new SorobanRpc.Server(config.rpcUrl, { - allowHttp: config.rpcUrl.startsWith("http://"), + allowHttp: config.rpcUrl.startsWith('http://'), }); - logger.info("Connecting to Soroban RPC...", { + logger.info('Connecting to Soroban RPC...', { rpcUrl: config.rpcUrl, }); @@ -19,11 +19,11 @@ export async function createRpc(config, logger) { ); } - logger.info("Successfully connected to Soroban RPC", { + logger.info('Successfully connected to Soroban RPC', { networkPassphrase: networkInfo.passphrase, }); } catch (err) { - logger.error("Failed to connect to Soroban RPC", { + logger.error('Failed to connect to Soroban RPC', { error: err.message, }); throw err;