Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 68 additions & 5 deletions src/modules/cache/services/journey-session-cache.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,18 @@ jest.mock('./redis-singleton.service', () => ({

import { JourneySessionCacheService } from './journey-session-cache.service';

function makeRepository(): jest.Mocked<
type MockRepository = jest.Mocked<
Pick<Repository<JourneySession>, 'findOne' | 'find' | 'save'>
> {
> & { manager: { query: jest.Mock } };

function makeRepository(): MockRepository {
return {
findOne: jest.fn().mockResolvedValue(null),
find: jest.fn().mockResolvedValue([]),
save: jest.fn().mockImplementation((e) => Promise.resolve(e)),
} as unknown as jest.Mocked<
Pick<Repository<JourneySession>, 'findOne' | 'find' | 'save'>
>;
// EVO-1929: the lazy contact upsert runs through the entity manager.
manager: { query: jest.fn().mockResolvedValue([]) },
} as unknown as MockRepository;
}

function makeSession(id: string): JourneySession {
Expand Down Expand Up @@ -224,3 +226,64 @@ describe('JourneySessionCacheService — Postgres write-through durability (EVO-
await expect(service.set(makeSession('sess-3'))).rejects.toThrow('db down');
});
});

describe('JourneySessionCacheService — lazy contact upsert satisfies FK (EVO-1929)', () => {
beforeEach(() => {
mockKv.clear();
mockSets.clear();
jest.clearAllMocks();
});

it('upserts a minimal contacts row before saving a session for a CRM-only contact', async () => {
const repo = makeRepository();
const service = makeService(repo);

const session = makeSession('sess-fk');
(session as unknown as { contactId: string }).contactId = 'crm-only-contact';

await service.set(session);

// The contact row is ensured BEFORE the session is persisted, so the
// FK_journey_sessions_contact_id constraint is always satisfied.
expect(repo.manager.query).toHaveBeenCalledWith(
'INSERT INTO contacts (id) VALUES ($1) ON CONFLICT (id) DO NOTHING',
['crm-only-contact'],
);
const upsertOrder = repo.manager.query.mock.invocationCallOrder[0];
const saveOrder = repo.save.mock.invocationCallOrder[0];
expect(upsertOrder).toBeLessThan(saveOrder);
expect(repo.save).toHaveBeenCalledTimes(1);
});

it('runs the idempotent upsert on per-node runtime writes (not just start)', async () => {
const repo = makeRepository();
const service = makeService(repo);

// First write creates the session (and ensures the contact)...
await service.set(makeSession('sess-runtime'));
repo.manager.query.mockClear();

// ...a subsequent per-node status transition (the runtime path that used
// to FK-fail) also runs the idempotent upsert.
await service.updateSessionStatus('sess-runtime', 'completed', {
completedAt: new Date('2026-06-03T00:00:00Z'),
});

expect(repo.manager.query).toHaveBeenCalledWith(
'INSERT INTO contacts (id) VALUES ($1) ON CONFLICT (id) DO NOTHING',
['contact-1'],
);
});

it('skips the upsert when the session has no contact id', async () => {
const repo = makeRepository();
const service = makeService(repo);

const session = makeSession('sess-no-contact');
(session as unknown as { contactId?: string }).contactId = undefined;

await service.set(session);

expect(repo.manager.query).not.toHaveBeenCalled();
});
});
43 changes: 43 additions & 0 deletions src/modules/cache/services/journey-session-cache.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,27 @@ export class JourneySessionCacheService extends BaseCacheService<
private async persistToDatabase(value: JourneySession): Promise<void> {
const v = value as unknown as CachedJourneySession;
try {
// EVO-1929: `journey_sessions.contact_id` carries a FK to
// `evo_campaign.contacts` (FK_journey_sessions_contact_id, ON DELETE
// CASCADE), but the evo-flow community surface never populates that table
// — `ContactsService`/`ContactsClientService` are HTTP proxies to the CRM
// (Rails), so a contact that exists only in the CRM has no row here.
// EVO-1892 made the journey *start* path swallow the resulting FK
// violation (best-effort), but the per-node runtime writes from the
// Temporal activity still go through this durable (throwing) path, so the
// first per-node session persistence aborts the journey at node 1 for any
// contact not pre-seeded into evo_campaign.contacts.
//
// Fix (option b — lazy upsert): guarantee a minimal `contacts` row exists
// before saving the session. The contacts table requires only `id`; every
// other column has a DB default ('', false, '{}'::jsonb) or is nullable
// (see init-base-tables migration), so an id-only insert yields a valid
// row. `ON CONFLICT (id) DO NOTHING` keeps it idempotent and never
// clobbers a real CRM-synced contact. This resolves the root cause on ALL
// write paths while preserving referential integrity (vs. relaxing the
// FK).
await this.ensureContactRow(v.contactId);

await this.repository.save({
id: v.id,
journeyId: v.journeyId,
Expand Down Expand Up @@ -116,6 +137,28 @@ export class JourneySessionCacheService extends BaseCacheService<
}
}

/**
* EVO-1929: idempotently ensure a minimal `contacts` row exists so the
* `journey_sessions.contact_id` FK is satisfied before persisting a session.
*
* The contacts table is owned by the CRM (Rails) and only the `id` column is
* mandatory — all other columns have DB-level defaults or are nullable — so
* an id-only insert is a valid, minimal row. `ON CONFLICT (id) DO NOTHING`
* makes this a no-op when the contact already exists (CRM-synced or seeded),
* so it never overwrites real contact data and is safe to call on every
* session write. A missing/non-uuid id is skipped (the save itself will then
* fail loudly, preserving the existing error contract).
*/
private async ensureContactRow(contactId?: string): Promise<void> {
if (!contactId) {
return;
}
await this.repository.manager.query(
'INSERT INTO contacts (id) VALUES ($1) ON CONFLICT (id) DO NOTHING',
[contactId],
);
}

async getActiveSessionsByJourney(
journeyId: string,
): Promise<CachedJourneySession[]> {
Expand Down
Loading