diff --git a/src/modules/journeys/journeys.service.spec.ts b/src/modules/journeys/journeys.service.spec.ts index 479a498..eaee2ab 100644 --- a/src/modules/journeys/journeys.service.spec.ts +++ b/src/modules/journeys/journeys.service.spec.ts @@ -112,3 +112,145 @@ describe('JourneysService.getJourneyVariables — EVO-1836 cache-hit preserves v expect(result).toEqual(vars); }); }); + +describe('JourneysService.findActive — EVO-1927 read-through on empty cache', () => { + const dbJourney = { + id: 'journey-1', + name: 'J1', + description: '', + isActive: true, + flowData: {}, + flowTriggers: [{ type: 'event' }], + variables: [], + createdAt: new Date('2026-01-01T00:00:00Z'), + updatedAt: new Date('2026-01-02T00:00:00Z'), + }; + + const buildService = ( + cached: any[], + dbActive: any[], + ): { + service: JourneysService; + getActiveJourneys: jest.Mock; + set: jest.Mock; + find: jest.Mock; + warn: jest.Mock; + } => { + const getActiveJourneys = jest.fn().mockResolvedValue(cached); + const set = jest.fn().mockResolvedValue(undefined); + const journeyCacheService = { getActiveJourneys, set }; + + const find = jest.fn().mockResolvedValue(dbActive); + const repo = { find }; + const db = { getRepository: () => repo }; + + const service = new JourneysService( + db as any, + journeyCacheService as any, + {} as any, + ); + const warn = jest + .spyOn((service as any).logger, 'warn') + .mockImplementation(() => undefined) as unknown as jest.Mock; + jest.spyOn((service as any).logger, 'log').mockImplementation(() => undefined); + + return { service, getActiveJourneys, set, find, warn }; + }; + + it('returns cached active journeys without hitting the DB when the cache is warm', async () => { + const { service, find } = buildService([dbJourney], []); + + const result = await service.findActive(); + + expect(result).toHaveLength(1); + expect(result[0].id).toBe('journey-1'); + expect(find).not.toHaveBeenCalled(); + }); + + it('falls through to the DB and returns active journeys when the cache is empty (post-restart regression)', async () => { + const { service, find, set } = buildService([], [dbJourney]); + + const result = await service.findActive(); + + expect(find).toHaveBeenCalledWith({ + where: { isActive: true }, + order: { createdAt: 'DESC' }, + }); + expect(result).toHaveLength(1); + expect(result[0].id).toBe('journey-1'); + // Cache repopulated so the next read is a hit. + expect(set).toHaveBeenCalledWith(dbJourney); + }); + + it('warns when the cache returns 0 but the DB has active journeys (observability)', async () => { + const { service, warn } = buildService([], [dbJourney]); + + await service.findActive(); + + expect(warn).toHaveBeenCalledWith( + expect.stringContaining('findActive cache miss'), + ); + }); + + it('returns [] without warning when both cache and DB are empty', async () => { + const { service, warn, set } = buildService([], []); + + const result = await service.findActive(); + + expect(result).toEqual([]); + expect(warn).not.toHaveBeenCalled(); + expect(set).not.toHaveBeenCalled(); + }); + + it('still returns DB journeys when repopulating the cache fails (best-effort)', async () => { + const { service, set } = buildService([], [dbJourney]); + set.mockRejectedValue(new Error('redis down')); + + const result = await service.findActive(); + + expect(result).toHaveLength(1); + expect(result[0].id).toBe('journey-1'); + }); +}); + +describe('JourneysService.warmActiveJourneysCache — EVO-1927 boot warm-up', () => { + const buildService = ( + dbActive: any[], + ): { service: JourneysService; set: jest.Mock; find: jest.Mock } => { + const set = jest.fn().mockResolvedValue(undefined); + const find = jest.fn().mockResolvedValue(dbActive); + const db = { getRepository: () => ({ find }) }; + const service = new JourneysService( + db as any, + { set } as any, + {} as any, + ); + jest.spyOn((service as any).logger, 'log').mockImplementation(() => undefined); + jest.spyOn((service as any).logger, 'warn').mockImplementation(() => undefined); + return { service, set, find }; + }; + + it('loads active journeys from the DB and populates the cache', async () => { + const j1 = { id: 'j1', isActive: true }; + const j2 = { id: 'j2', isActive: true }; + const { service, set, find } = buildService([j1, j2]); + + const count = await service.warmActiveJourneysCache(); + + expect(find).toHaveBeenCalledWith({ + where: { isActive: true }, + order: { createdAt: 'DESC' }, + }); + expect(set).toHaveBeenCalledTimes(2); + expect(set).toHaveBeenCalledWith(j1); + expect(set).toHaveBeenCalledWith(j2); + expect(count).toBe(2); + }); + + it('does not throw when a per-journey cache write fails', async () => { + const { service, set } = buildService([{ id: 'j1', isActive: true }]); + set.mockRejectedValue(new Error('redis down')); + + await expect(service.warmActiveJourneysCache()).resolves.toBe(1); + }); +}); diff --git a/src/modules/journeys/journeys.service.ts b/src/modules/journeys/journeys.service.ts index a8a9321..c3eeddd 100644 --- a/src/modules/journeys/journeys.service.ts +++ b/src/modules/journeys/journeys.service.ts @@ -179,17 +179,85 @@ export class JourneysService { async findActive(): Promise { const cachedJourneys = await this.journeyCacheService.getActiveJourneys(); - return cachedJourneys.map(cached => ({ - id: cached.id, - name: cached.name, - description: cached.description, - isActive: cached.isActive, - flowData: cached.flowData, - flowTriggers: cached.flowTriggers, - variables: cached.variables, - createdAt: cached.createdAt instanceof Date ? cached.createdAt : new Date(cached.createdAt), - updatedAt: cached.updatedAt instanceof Date ? cached.updatedAt : new Date(cached.updatedAt), - } as Journey)); + if (cachedJourneys.length > 0) { + return cachedJourneys.map(cached => ({ + id: cached.id, + name: cached.name, + description: cached.description, + isActive: cached.isActive, + flowData: cached.flowData, + flowTriggers: cached.flowTriggers, + variables: cached.variables, + createdAt: cached.createdAt instanceof Date ? cached.createdAt : new Date(cached.createdAt), + updatedAt: cached.updatedAt instanceof Date ? cached.updatedAt : new Date(cached.updatedAt), + } as Journey)); + } + + // EVO-1927: read-through fallback. After an evo-flow restart the Redis + // active-journey index can be empty/stale and there is no implicit warm-up, + // so `getActiveJourneys()` returns []. The JourneyTriggerProcessor would + // then match every event against ZERO journeys and silently drop + // event-based triggers (Postgres has active journeys; cache reports none). + // On a cache miss, fall through to the DB as the source of truth and + // repopulate the cache so subsequent reads are served from Redis again. + const dbActiveJourneys = await this.journeyRepository.find({ + where: { isActive: true }, + order: { createdAt: 'DESC' }, + }); + + if (dbActiveJourneys.length > 0) { + // Observability: cache returned 0 while the DB has active journeys — + // this is the regression signature, surface it. + this.logger.warn( + `findActive cache miss: active-journey cache returned 0 but DB has ${dbActiveJourneys.length} active journeys — serving from DB and repopulating cache (EVO-1927)`, + ); + + // Repopulate the cache (index + per-journey keys) so the next read is a + // cache hit. Best-effort: a cache write failure must not stop us returning + // the journeys we already have from the DB. + for (const journey of dbActiveJourneys) { + try { + await this.journeyCacheService.set(journey); + } catch (error) { + this.logger.warn( + `Failed to repopulate journey cache for ${journey.id}: ${error.message}`, + ); + } + } + } + + return dbActiveJourneys; + } + + /** + * EVO-1927: warm the active-journey cache from Postgres. Called at boot by + * the JourneyTriggerProcessor BEFORE it starts consuming `journey-triggers`, + * so the very first event matches against the real set of active journeys + * instead of an empty (post-restart) Redis index. Best-effort and idempotent + * — `set()` upserts into the index — so a partial failure just degrades to + * the read-through fallback in `findActive`. + */ + async warmActiveJourneysCache(): Promise { + const dbActiveJourneys = await this.journeyRepository.find({ + where: { isActive: true }, + order: { createdAt: 'DESC' }, + }); + + for (const journey of dbActiveJourneys) { + try { + await this.journeyCacheService.set(journey); + } catch (error) { + this.logger.warn( + `Failed to warm journey cache for ${journey.id}: ${error.message}`, + ); + } + } + + this.logger.log( + `Warmed active-journey cache with ${dbActiveJourneys.length} journeys from DB (EVO-1927)`, + ); + + return dbActiveJourneys.length; } async validateFlowData(journey: Journey): Promise { diff --git a/src/modules/journeys/services/journey-trigger-processor.service.spec.ts b/src/modules/journeys/services/journey-trigger-processor.service.spec.ts index f611197..114a4a2 100644 --- a/src/modules/journeys/services/journey-trigger-processor.service.spec.ts +++ b/src/modules/journeys/services/journey-trigger-processor.service.spec.ts @@ -186,10 +186,12 @@ describe('JourneyTriggerProcessor consumer gating (EVO-1764 A1)', () => { let processor: JourneyTriggerProcessor; let initializeKafkaConsumer: jest.Mock; let startConsuming: jest.Mock; + let warmActiveJourneysCache: jest.Mock; beforeEach(async () => { + warmActiveJourneysCache = jest.fn().mockResolvedValue(0); processor = new JourneyTriggerProcessor( - {} as any, + { warmActiveJourneysCache } as any, {} as any, {} as any, {} as any, @@ -229,4 +231,46 @@ describe('JourneyTriggerProcessor consumer gating (EVO-1764 A1)', () => { expect(initializeKafkaConsumer).not.toHaveBeenCalled(); expect(startConsuming).not.toHaveBeenCalled(); }); + + // EVO-1927: warm the active-journey cache from the DB before consuming so a + // post-restart event matches against the real journey set, not an empty cache. + it('warms the active-journey cache BEFORE subscribing to journey-triggers (EVO-1927)', async () => { + jest.spyOn(AppFactory, 'shouldStartJourneyWorker').mockReturnValue(true); + + const callOrder: string[] = []; + warmActiveJourneysCache.mockImplementation(async () => { + callOrder.push('warm'); + return 3; + }); + initializeKafkaConsumer.mockImplementation(async () => { + callOrder.push('init'); + }); + startConsuming.mockImplementation(async () => { + callOrder.push('consume'); + }); + + await processor.onModuleInit(); + + expect(warmActiveJourneysCache).toHaveBeenCalledTimes(1); + // Warm-up must run before the consumer is wired up and starts. + expect(callOrder).toEqual(['warm', 'init', 'consume']); + }); + + it('still starts consuming when the warm-up fails — read-through fallback covers it (EVO-1927)', async () => { + jest.spyOn(AppFactory, 'shouldStartJourneyWorker').mockReturnValue(true); + warmActiveJourneysCache.mockRejectedValue(new Error('db down at boot')); + + await processor.onModuleInit(); + + expect(initializeKafkaConsumer).toHaveBeenCalledTimes(1); + expect(startConsuming).toHaveBeenCalledTimes(1); + }); + + it('does NOT warm the cache in a non-journey-worker mode', async () => { + jest.spyOn(AppFactory, 'shouldStartJourneyWorker').mockReturnValue(false); + + await processor.onModuleInit(); + + expect(warmActiveJourneysCache).not.toHaveBeenCalled(); + }); }); diff --git a/src/modules/journeys/services/journey-trigger-processor.service.ts b/src/modules/journeys/services/journey-trigger-processor.service.ts index 5965425..23ef07f 100644 --- a/src/modules/journeys/services/journey-trigger-processor.service.ts +++ b/src/modules/journeys/services/journey-trigger-processor.service.ts @@ -133,6 +133,22 @@ export class JourneyTriggerProcessor implements OnModuleInit, OnModuleDestroy { // reason to consume journey-triggers at all. if (AppFactory.shouldStartJourneyWorker()) { this.logger.log('🚀 Starting Journey Trigger Processor...'); + // EVO-1927: warm the active-journey cache from Postgres BEFORE consuming. + // After a restart the Redis active-journey index is empty/stale and there + // is no implicit warm-up, so the first events would otherwise match + // against ZERO journeys and event-based triggers would silently die. + // Best-effort: a warm-up failure falls back to the read-through in + // JourneysService.findActive, so it must not block the consumer. + try { + const warmed = await this.journeysService.warmActiveJourneysCache(); + this.logger.log( + `🔥 Warmed active-journey cache with ${warmed} journeys before consuming (EVO-1927)`, + ); + } catch (error) { + this.logger.warn( + `⚠️ Failed to warm active-journey cache at boot (read-through fallback will cover this): ${error.message}`, + ); + } await this.initializeKafkaConsumer(); await this.startConsuming(); } else {