Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions src/modules/journeys/journeys.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,145 @@ describe('JourneysService.getJourneyVariables — EVO-1836 cache-hit preserves v
expect(result).toEqual(vars);
});
});

describe('JourneysService.findActive — EVO-1927 read-through on empty cache', () => {
const dbJourney = {
id: 'journey-1',
name: 'J1',
description: '',
isActive: true,
flowData: {},
flowTriggers: [{ type: 'event' }],
variables: [],
createdAt: new Date('2026-01-01T00:00:00Z'),
updatedAt: new Date('2026-01-02T00:00:00Z'),
};

const buildService = (
cached: any[],
dbActive: any[],
): {
service: JourneysService;
getActiveJourneys: jest.Mock;
set: jest.Mock;
find: jest.Mock;
warn: jest.Mock;
} => {
const getActiveJourneys = jest.fn().mockResolvedValue(cached);
const set = jest.fn().mockResolvedValue(undefined);
const journeyCacheService = { getActiveJourneys, set };

const find = jest.fn().mockResolvedValue(dbActive);
const repo = { find };
const db = { getRepository: () => repo };

const service = new JourneysService(
db as any,
journeyCacheService as any,
{} as any,
);
const warn = jest
.spyOn((service as any).logger, 'warn')
.mockImplementation(() => undefined) as unknown as jest.Mock;
jest.spyOn((service as any).logger, 'log').mockImplementation(() => undefined);

return { service, getActiveJourneys, set, find, warn };
};

it('returns cached active journeys without hitting the DB when the cache is warm', async () => {
const { service, find } = buildService([dbJourney], []);

const result = await service.findActive();

expect(result).toHaveLength(1);
expect(result[0].id).toBe('journey-1');
expect(find).not.toHaveBeenCalled();
});

it('falls through to the DB and returns active journeys when the cache is empty (post-restart regression)', async () => {
const { service, find, set } = buildService([], [dbJourney]);

const result = await service.findActive();

expect(find).toHaveBeenCalledWith({
where: { isActive: true },
order: { createdAt: 'DESC' },
});
expect(result).toHaveLength(1);
expect(result[0].id).toBe('journey-1');
// Cache repopulated so the next read is a hit.
expect(set).toHaveBeenCalledWith(dbJourney);
});

it('warns when the cache returns 0 but the DB has active journeys (observability)', async () => {
const { service, warn } = buildService([], [dbJourney]);

await service.findActive();

expect(warn).toHaveBeenCalledWith(
expect.stringContaining('findActive cache miss'),
);
});

it('returns [] without warning when both cache and DB are empty', async () => {
const { service, warn, set } = buildService([], []);

const result = await service.findActive();

expect(result).toEqual([]);
expect(warn).not.toHaveBeenCalled();
expect(set).not.toHaveBeenCalled();
});

it('still returns DB journeys when repopulating the cache fails (best-effort)', async () => {
const { service, set } = buildService([], [dbJourney]);
set.mockRejectedValue(new Error('redis down'));

const result = await service.findActive();

expect(result).toHaveLength(1);
expect(result[0].id).toBe('journey-1');
});
});

describe('JourneysService.warmActiveJourneysCache — EVO-1927 boot warm-up', () => {
const buildService = (
dbActive: any[],
): { service: JourneysService; set: jest.Mock; find: jest.Mock } => {
const set = jest.fn().mockResolvedValue(undefined);
const find = jest.fn().mockResolvedValue(dbActive);
const db = { getRepository: () => ({ find }) };
const service = new JourneysService(
db as any,
{ set } as any,
{} as any,
);
jest.spyOn((service as any).logger, 'log').mockImplementation(() => undefined);
jest.spyOn((service as any).logger, 'warn').mockImplementation(() => undefined);
return { service, set, find };
};

it('loads active journeys from the DB and populates the cache', async () => {
const j1 = { id: 'j1', isActive: true };
const j2 = { id: 'j2', isActive: true };
const { service, set, find } = buildService([j1, j2]);

const count = await service.warmActiveJourneysCache();

expect(find).toHaveBeenCalledWith({
where: { isActive: true },
order: { createdAt: 'DESC' },
});
expect(set).toHaveBeenCalledTimes(2);
expect(set).toHaveBeenCalledWith(j1);
expect(set).toHaveBeenCalledWith(j2);
expect(count).toBe(2);
});

it('does not throw when a per-journey cache write fails', async () => {
const { service, set } = buildService([{ id: 'j1', isActive: true }]);
set.mockRejectedValue(new Error('redis down'));

await expect(service.warmActiveJourneysCache()).resolves.toBe(1);
});
});
90 changes: 79 additions & 11 deletions src/modules/journeys/journeys.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,17 +179,85 @@ export class JourneysService {
async findActive(): Promise<Journey[]> {
const cachedJourneys = await this.journeyCacheService.getActiveJourneys();

return cachedJourneys.map(cached => ({
id: cached.id,
name: cached.name,
description: cached.description,
isActive: cached.isActive,
flowData: cached.flowData,
flowTriggers: cached.flowTriggers,
variables: cached.variables,
createdAt: cached.createdAt instanceof Date ? cached.createdAt : new Date(cached.createdAt),
updatedAt: cached.updatedAt instanceof Date ? cached.updatedAt : new Date(cached.updatedAt),
} as Journey));
if (cachedJourneys.length > 0) {
return cachedJourneys.map(cached => ({
id: cached.id,
name: cached.name,
description: cached.description,
isActive: cached.isActive,
flowData: cached.flowData,
flowTriggers: cached.flowTriggers,
variables: cached.variables,
createdAt: cached.createdAt instanceof Date ? cached.createdAt : new Date(cached.createdAt),
updatedAt: cached.updatedAt instanceof Date ? cached.updatedAt : new Date(cached.updatedAt),
} as Journey));
}

// EVO-1927: read-through fallback. After an evo-flow restart the Redis
// active-journey index can be empty/stale and there is no implicit warm-up,
// so `getActiveJourneys()` returns []. The JourneyTriggerProcessor would
// then match every event against ZERO journeys and silently drop
// event-based triggers (Postgres has active journeys; cache reports none).
// On a cache miss, fall through to the DB as the source of truth and
// repopulate the cache so subsequent reads are served from Redis again.
const dbActiveJourneys = await this.journeyRepository.find({
where: { isActive: true },
order: { createdAt: 'DESC' },
});

if (dbActiveJourneys.length > 0) {
// Observability: cache returned 0 while the DB has active journeys —
// this is the regression signature, surface it.
this.logger.warn(
`findActive cache miss: active-journey cache returned 0 but DB has ${dbActiveJourneys.length} active journeys — serving from DB and repopulating cache (EVO-1927)`,
);

// Repopulate the cache (index + per-journey keys) so the next read is a
// cache hit. Best-effort: a cache write failure must not stop us returning
// the journeys we already have from the DB.
for (const journey of dbActiveJourneys) {
try {
await this.journeyCacheService.set(journey);
} catch (error) {
this.logger.warn(
`Failed to repopulate journey cache for ${journey.id}: ${error.message}`,
);
}
}
}

return dbActiveJourneys;
}

/**
* EVO-1927: warm the active-journey cache from Postgres. Called at boot by
* the JourneyTriggerProcessor BEFORE it starts consuming `journey-triggers`,
* so the very first event matches against the real set of active journeys
* instead of an empty (post-restart) Redis index. Best-effort and idempotent
* — `set()` upserts into the index — so a partial failure just degrades to
* the read-through fallback in `findActive`.
*/
async warmActiveJourneysCache(): Promise<number> {
const dbActiveJourneys = await this.journeyRepository.find({
where: { isActive: true },
order: { createdAt: 'DESC' },
});

for (const journey of dbActiveJourneys) {
try {
await this.journeyCacheService.set(journey);
} catch (error) {
this.logger.warn(
`Failed to warm journey cache for ${journey.id}: ${error.message}`,
);
}
}

this.logger.log(
`Warmed active-journey cache with ${dbActiveJourneys.length} journeys from DB (EVO-1927)`,
);

return dbActiveJourneys.length;
}

async validateFlowData(journey: Journey): Promise<boolean> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,12 @@ describe('JourneyTriggerProcessor consumer gating (EVO-1764 A1)', () => {
let processor: JourneyTriggerProcessor;
let initializeKafkaConsumer: jest.Mock;
let startConsuming: jest.Mock;
let warmActiveJourneysCache: jest.Mock;

beforeEach(async () => {
warmActiveJourneysCache = jest.fn().mockResolvedValue(0);
processor = new JourneyTriggerProcessor(
{} as any,
{ warmActiveJourneysCache } as any,
{} as any,
{} as any,
{} as any,
Expand Down Expand Up @@ -229,4 +231,46 @@ describe('JourneyTriggerProcessor consumer gating (EVO-1764 A1)', () => {
expect(initializeKafkaConsumer).not.toHaveBeenCalled();
expect(startConsuming).not.toHaveBeenCalled();
});

// EVO-1927: warm the active-journey cache from the DB before consuming so a
// post-restart event matches against the real journey set, not an empty cache.
it('warms the active-journey cache BEFORE subscribing to journey-triggers (EVO-1927)', async () => {
jest.spyOn(AppFactory, 'shouldStartJourneyWorker').mockReturnValue(true);

const callOrder: string[] = [];
warmActiveJourneysCache.mockImplementation(async () => {
callOrder.push('warm');
return 3;
});
initializeKafkaConsumer.mockImplementation(async () => {
callOrder.push('init');
});
startConsuming.mockImplementation(async () => {
callOrder.push('consume');
});

await processor.onModuleInit();

expect(warmActiveJourneysCache).toHaveBeenCalledTimes(1);
// Warm-up must run before the consumer is wired up and starts.
expect(callOrder).toEqual(['warm', 'init', 'consume']);
});

it('still starts consuming when the warm-up fails — read-through fallback covers it (EVO-1927)', async () => {
jest.spyOn(AppFactory, 'shouldStartJourneyWorker').mockReturnValue(true);
warmActiveJourneysCache.mockRejectedValue(new Error('db down at boot'));

await processor.onModuleInit();

expect(initializeKafkaConsumer).toHaveBeenCalledTimes(1);
expect(startConsuming).toHaveBeenCalledTimes(1);
});

it('does NOT warm the cache in a non-journey-worker mode', async () => {
jest.spyOn(AppFactory, 'shouldStartJourneyWorker').mockReturnValue(false);

await processor.onModuleInit();

expect(warmActiveJourneysCache).not.toHaveBeenCalled();
});
});
16 changes: 16 additions & 0 deletions src/modules/journeys/services/journey-trigger-processor.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,22 @@ export class JourneyTriggerProcessor implements OnModuleInit, OnModuleDestroy {
// reason to consume journey-triggers at all.
if (AppFactory.shouldStartJourneyWorker()) {
this.logger.log('🚀 Starting Journey Trigger Processor...');
// EVO-1927: warm the active-journey cache from Postgres BEFORE consuming.
// After a restart the Redis active-journey index is empty/stale and there
// is no implicit warm-up, so the first events would otherwise match
// against ZERO journeys and event-based triggers would silently die.
// Best-effort: a warm-up failure falls back to the read-through in
// JourneysService.findActive, so it must not block the consumer.
try {
const warmed = await this.journeysService.warmActiveJourneysCache();
this.logger.log(
`🔥 Warmed active-journey cache with ${warmed} journeys before consuming (EVO-1927)`,
);
} catch (error) {
this.logger.warn(
`⚠️ Failed to warm active-journey cache at boot (read-through fallback will cover this): ${error.message}`,
);
}
await this.initializeKafkaConsumer();
await this.startConsuming();
} else {
Expand Down
Loading