Skip to content

Commit 4eb36e6

Browse files
committed
Merge PR #260: fix/137-event-listener-health-retry
2 parents 1a36353 + a18ffc6 commit 4eb36e6

3 files changed

Lines changed: 186 additions & 3 deletions

File tree

backend/src/index.ts

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ app.get('/api/admin/health', createAdminLimiter(), adminAuth, async (req, res) =
129129
try {
130130
const includeHistory = req.query.history !== 'false';
131131
const health = await healthService.getAdminHealth(includeHistory);
132+
const health = await healthService.getAdminHealth(includeHistory, eventListener.getHealth());
132133
const statusCode = health.status === 'unhealthy' ? 503 : 200;
133134
res.status(statusCode).json(health);
134135
} catch (error) {
@@ -779,6 +780,38 @@ function startHealthSnapshotInterval() {
779780
setTimeout(() => healthService.recordSnapshot().catch(() => {}), 5000);
780781
}
781782

783+
app.post('/api/admin/expiry/process', createAdminLimiter(), adminAuth, async (req, res) => {
784+
try {
785+
const result = await expiryService.processExpiries();
786+
res.json({ success: true, data: result });
787+
} catch (error) {
788+
logger.error('Error processing expiries:', error);
789+
res.status(500).json({
790+
success: false,
791+
error: error instanceof Error ? error.message : String(error),
792+
});
793+
}
794+
});
795+
796+
// Start server
797+
const server = app.listen(PORT, async () => {
798+
logger.info(`Server running on port ${PORT}`);
799+
logger.info(`Environment: ${process.env.NODE_ENV || 'development'}`);
800+
801+
// Validate critical env vars at startup — warn clearly so operators know what's missing
802+
const criticalEnvVars = ['SOROBAN_CONTRACT_ADDRESS', 'STELLAR_NETWORK_URL'];
803+
for (const envVar of criticalEnvVars) {
804+
if (!process.env[envVar]) {
805+
logger.warn(`${envVar} not configured — EventListener will be disabled`);
806+
}
807+
}
808+
809+
// Initialize rate limiting Redis store
810+
try {
811+
await RateLimiterFactory.initializeRedisStore();
812+
logger.info('Rate limiting initialized successfully');
813+
} catch (error) {
814+
logger.warn('Rate limiting initialization failed, using memory store:', error);
782815
/**
783816
* @openapi
784817
* /api/admin/expiry/process:
@@ -818,5 +851,42 @@ export function validateMnemonic(mnemonic: string): boolean {
818851
return false;
819852
}
820853

854+
// Start health metrics snapshot loop
855+
startHealthSnapshotInterval();
856+
857+
// Start event listener (no-op if disabled due to missing config)
858+
await eventListener.start();
859+
const elHealth = eventListener.getHealth();
860+
if (elHealth.status === 'disabled') {
861+
logger.warn('EventListener is disabled', { reason: elHealth.reason });
862+
} else {
863+
logger.info('EventListener started', { status: elHealth.status });
864+
}
865+
866+
scheduleAutoResume();
867+
});
868+
869+
870+
871+
// Graceful shutdown
872+
process.on('SIGTERM', () => {
873+
logger.info('SIGTERM received, shutting down gracefully');
874+
schedulerService.stop();
875+
eventListener.stop();
876+
server.close(() => {
877+
logger.info('Server closed');
878+
process.exit(0);
879+
});
880+
});
881+
882+
process.on('SIGINT', () => {
883+
logger.info('SIGINT received, shutting down gracefully');
884+
schedulerService.stop();
885+
eventListener.stop();
886+
server.close(() => {
887+
logger.info('Server closed');
888+
process.exit(0);
889+
});
890+
});
821891
return bip39.validateMnemonic(words.join(' '));
822-
}
892+
}

backend/src/services/event-listener.ts

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { supabase } from '../config/database';
33
import { reorgHandler } from './reorg-handler';
44
import { generateCycleId } from '../utils/cycle-id';
55
import { renewalCooldownService } from './renewal-cooldown-service';
6+
import { calculateBackoffDelay, NonRetryableError } from '../utils/retry';
67

78
interface ContractEvent {
89
type: string;
@@ -31,6 +32,15 @@ export interface EventListenerHealth {
3132

3233
const ALERT_THRESHOLD = 10;
3334
const MAX_BACKOFF_MS = 300_000; // 5 minutes
35+
export type EventListenerStatus = 'running' | 'stopped' | 'disabled' | 'retrying' | 'failed';
36+
status: EventListenerStatus;
37+
reason?: string;
38+
lastProcessedLedger: number | null;
39+
retryCount?: number;
40+
nextRetryAt?: string | null;
41+
const MAX_RETRY_ATTEMPTS = 10;
42+
const RETRY_INITIAL_DELAY_MS = 5000;
43+
const RETRY_MAX_DELAY_MS = 5 * 60 * 1000; // 5 minutes
3444

3545
export class EventListener {
3646
private contractId: string;
@@ -49,20 +59,49 @@ export class EventListener {
4959
private consecutiveErrors: number = 0;
5060
private lastSuccessfulPoll: Date | null = null;
5161

62+
// Health tracking
63+
private _status: EventListenerStatus = 'stopped';
64+
private _disabledReason?: string;
65+
private _retryCount: number = 0;
66+
private _nextRetryAt: Date | null = null;
67+
5268
constructor() {
5369
this.contractId = process.env.SOROBAN_CONTRACT_ADDRESS || '';
5470
this.rpcUrl =
5571
process.env.STELLAR_NETWORK_URL || 'https://soroban-testnet.stellar.org';
5672

5773
if (!this.contractId) {
58-
throw new Error('SOROBAN_CONTRACT_ADDRESS not configured');
74+
// Don't throw — mark as disabled so the process can still start
75+
this._status = 'disabled';
76+
this._disabledReason = 'SOROBAN_CONTRACT_ADDRESS not configured';
77+
logger.warn('EventListener disabled: SOROBAN_CONTRACT_ADDRESS not configured');
5978
}
6079
}
6180

81+
getHealth(): EventListenerHealth {
82+
return {
83+
status: this._status,
84+
reason: this._disabledReason,
85+
lastProcessedLedger: this.lastProcessedLedger || null,
86+
retryCount: this._retryCount,
87+
nextRetryAt: this._nextRetryAt?.toISOString() ?? null,
88+
};
89+
}
90+
6291
async start() {
92+
if (this._status === 'disabled') {
93+
logger.warn('EventListener.start() called but listener is disabled', {
94+
reason: this._disabledReason,
95+
});
96+
return;
97+
}
98+
6399
if (this.isRunning) return;
64100

65101
this.isRunning = true;
102+
this._status = 'running';
103+
this._retryCount = 0;
104+
this._nextRetryAt = null;
66105
this.lastProcessedLedger = await this.getLastProcessedLedger();
67106
logger.info('Event listener started', { lastLedger: this.lastProcessedLedger });
68107

@@ -71,6 +110,9 @@ export class EventListener {
71110

72111
stop() {
73112
this.isRunning = false;
113+
if (this._status !== 'disabled') {
114+
this._status = 'stopped';
115+
}
74116
logger.info('Event listener stopped');
75117
}
76118

@@ -141,6 +183,15 @@ export class EventListener {
141183
} finally {
142184
// Always release the mutex, even if fetchAndProcessEvents throws
143185
this.isProcessing = false;
186+
// Reset retry count on success
187+
if (this._retryCount > 0) {
188+
logger.info('EventListener recovered after retries', { retryCount: this._retryCount });
189+
this._retryCount = 0;
190+
this._nextRetryAt = null;
191+
this._status = 'running';
192+
logger.error('Event polling error:', error);
193+
await this.handlePollError(error);
194+
if (!this.isRunning) break;
144195
}
145196

146197
await this.sleep(backoffMs);
@@ -150,6 +201,40 @@ export class EventListener {
150201
private async fetchAndProcessEvents() {
151202
logger.info('Polling for events...');
152203
const currentLedger = await this.getCurrentLedger();
204+
private async handlePollError(error: unknown) {
205+
this._retryCount++;
206+
207+
if (this._retryCount >= MAX_RETRY_ATTEMPTS) {
208+
this._status = 'failed';
209+
this._disabledReason = `Exceeded max retry attempts (${MAX_RETRY_ATTEMPTS}). Last error: ${error instanceof Error ? error.message : String(error)}`;
210+
logger.error('EventListener permanently failed after max retries', {
211+
retryCount: this._retryCount,
212+
error: this._disabledReason,
213+
});
214+
this.isRunning = false;
215+
return;
216+
}
217+
218+
const delay = calculateBackoffDelay(this._retryCount, {
219+
initialDelay: RETRY_INITIAL_DELAY_MS,
220+
maxDelay: RETRY_MAX_DELAY_MS,
221+
multiplier: 2,
222+
jitter: true,
223+
});
224+
225+
this._status = 'retrying';
226+
this._nextRetryAt = new Date(Date.now() + delay);
227+
logger.warn('EventListener will retry', {
228+
attempt: this._retryCount,
229+
delayMs: delay,
230+
nextRetryAt: this._nextRetryAt.toISOString(),
231+
});
232+
233+
await this.sleep(delay);
234+
}
235+
236+
private async fetchAndProcessEvents() {
237+
const currentLedger = await this.getCurrentLedger();
153238

154239
// Check for reorg
155240
if (currentLedger < this.lastProcessedLedger) {

backend/src/services/health-service.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { supabase } from '../config/database';
22
import logger from '../config/logger';
33
import { monitoringService } from './monitoring-service';
44
import { eventListener, EventListenerHealth } from './event-listener';
5+
import type { EventListenerHealth } from './event-listener';
56

67
export interface HealthThresholds {
78
failedRenewalsPerHour: number;
@@ -250,8 +251,9 @@ export class HealthService {
250251
* listener unhealthy + base healthy → degraded
251252
* listener unhealthy + base degraded → unhealthy
252253
* base unhealthy (any listener) → unhealthy
254+
* Full admin health: current metrics, alerts, status, event listener state, optional history.
253255
*/
254-
async getAdminHealth(includeHistory: boolean = true): Promise<AdminHealthResponse> {
256+
async getAdminHealth(includeHistory: boolean = true, eventListenerHealth?: EventListenerHealth): Promise<AdminHealthResponse> {
255257
const metrics = await this.getCurrentMetrics();
256258
const alerts = this.evaluateAlerts(metrics);
257259
const listenerHealth = eventListener.getHealth();
@@ -264,6 +266,31 @@ export class HealthService {
264266
? 'unhealthy'
265267
: baseStatus;
266268

269+
// Degrade overall status if event listener is not running
270+
const elHealth: EventListenerHealth = eventListenerHealth ?? {
271+
status: 'stopped',
272+
lastProcessedLedger: null,
273+
};
274+
if (elHealth.status === 'disabled' || elHealth.status === 'failed') {
275+
alerts.push({
276+
id: 'event_listener',
277+
message: `EventListener is ${elHealth.status}: ${elHealth.reason ?? 'unknown reason'}`,
278+
severity: elHealth.status === 'failed' ? 'critical' : 'warning',
279+
value: 0,
280+
threshold: 0,
281+
triggeredAt: new Date().toISOString(),
282+
});
283+
} else if (elHealth.status === 'retrying') {
284+
alerts.push({
285+
id: 'event_listener',
286+
message: `EventListener is retrying (attempt ${elHealth.retryCount ?? '?'})`,
287+
severity: 'warning',
288+
value: elHealth.retryCount ?? 0,
289+
threshold: 0,
290+
triggeredAt: new Date().toISOString(),
291+
});
292+
}
293+
const status = this.getStatus(alerts);
267294
const history = includeHistory ? await this.getHistory(24) : undefined;
268295

269296
return {
@@ -273,6 +300,7 @@ export class HealthService {
273300
alerts,
274301
thresholds: this.getThresholds(),
275302
eventListener: listenerHealth,
303+
eventListener: elHealth,
276304
history,
277305
};
278306
}

0 commit comments

Comments
 (0)