Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,43 @@ Before writing any code that uses a library API:

Key versions to double-check: Prisma client API, Express 4 vs 5 routing, Zod v3 schema methods.
<!-- END:framework-versions -->

<!-- BEGIN:codex-adapted-from-claude -->
# Codex workflow notes

These notes adapt the local `CLAUDE.md` guidance for Codex.

## Tooling

- Prefer `rg` / `rg --files` for search and file discovery.
- Prefer `sed`, `nl`, and other focused shell reads for file inspection.
- Use `multi_tool_use.parallel` when independent reads or searches can run at the same time.
- Use `apply_patch` for manual file edits.
- Do not rely on Claude-only `lean-ctx` tools unless they are explicitly available in the current session.

## UI — Modal/Drawer close must refresh parent page

Whenever you add or modify a modal/drawer (Ant Design `Modal`, `Drawer`, or any custom
overlay), both `onCancel` and `onClose` handlers MUST trigger a refresh of the data on
the page from which the modal/drawer was opened. Closing via the × button, Esc key,
backdrop click, or a "Cancel" footer button must all call the parent's data-loading
function (`load()`, `fetchX()`, `loadX(page)`, etc.).

Rationale: the modal may have side effects (nested actions, auto-save, cascading
updates) that change server state even when the user "cancels". Forcing a refresh
keeps the page consistent with the server without requiring manual F5.

Pattern:

```tsx
// BAD
<Modal onCancel={() => setOpen(false)} ... />

// GOOD
<Modal onCancel={() => { setOpen(false); void load(); }} ... />
```

Applies equally to custom footer "Отмена" buttons inside a Modal/Drawer form.
If the load function is defined inside a `useEffect` closure, extract it as a
top-level `useCallback` so it can be invoked from close handlers.
<!-- END:codex-adapted-from-claude -->
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- TTBUS-0.1: transactional outbox and consumer deduplication tables.

CREATE TABLE "event_outbox" (
"id" TEXT NOT NULL,
"topic" TEXT NOT NULL,
"message_id" TEXT NOT NULL,
"type" TEXT NOT NULL,
"envelope" JSONB NOT NULL,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"sent_at" TIMESTAMP(3),
"attempts" INTEGER NOT NULL DEFAULT 0,
"last_error" TEXT,

CONSTRAINT "event_outbox_pkey" PRIMARY KEY ("id")
);

CREATE TABLE "processed_messages" (
"consumer_group" TEXT NOT NULL,
"message_id" TEXT NOT NULL,
"processed_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,

CONSTRAINT "processed_messages_pkey" PRIMARY KEY ("consumer_group", "message_id")
);

CREATE UNIQUE INDEX "event_outbox_message_id_key" ON "event_outbox"("message_id");
CREATE INDEX "event_outbox_sent_at_created_at_idx" ON "event_outbox"("sent_at", "created_at");
CREATE INDEX "event_outbox_message_id_idx" ON "event_outbox"("message_id");
CREATE INDEX "processed_messages_processed_at_idx" ON "processed_messages"("processed_at");
28 changes: 28 additions & 0 deletions backend/src/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -1467,3 +1467,31 @@ model UserGroupSystemRole {
@@index([groupId])
@@map("user_group_system_roles")
}

// ===== EVENT BUS / TRANSACTIONAL OUTBOX =====

model EventOutbox {
id String @id @default(uuid())
topic String
messageId String @unique @default(uuid()) @map("message_id")
type String
envelope Json
createdAt DateTime @default(now()) @map("created_at")
sentAt DateTime? @map("sent_at")
attempts Int @default(0)
lastError String? @map("last_error")

@@index([sentAt, createdAt])
@@index([messageId])
@@map("event_outbox")
}

model ProcessedMessage {
consumerGroup String @map("consumer_group")
messageId String @map("message_id")
processedAt DateTime @default(now()) @map("processed_at")

@@id([consumerGroup, messageId])
@@index([processedAt])
@@map("processed_messages")
}
34 changes: 34 additions & 0 deletions backend/src/shared/eventbus/envelope.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { z } from 'zod';

export const EVENT_TOPICS = {
issues: 'tt.issues',
comments: 'tt.comments',
releases: 'tt.releases',
workflow: 'tt.workflow',
notificationsDlq: 'tt.notifications.dlq',
} as const;

export type EventTopic = (typeof EVENT_TOPICS)[keyof typeof EVENT_TOPICS];

export const eventActorSchema = z.object({
userId: z.string().uuid().nullable(),
ip: z.string().optional(),
userAgent: z.string().optional(),
});

export type EventActor = z.infer<typeof eventActorSchema>;

export const eventEnvelopeSchema = z.object({
v: z.literal(1),
messageId: z.string().uuid(),
type: z.string().min(1),
occurredAt: z.string().datetime(),
actor: eventActorSchema,
payload: z.record(z.unknown()),
meta: z.object({
tenantId: z.string().nullable(),
correlationId: z.string().optional(),
}),
});

export type EventEnvelope = z.infer<typeof eventEnvelopeSchema>;
91 changes: 91 additions & 0 deletions backend/src/shared/outbox/outbox.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { randomUUID } from 'node:crypto';
import type { Prisma } from '@prisma/client';

import { type EventActor, type EventEnvelope, eventEnvelopeSchema } from '../eventbus/envelope.js';

export type PublishInTxOptions = {
correlationId?: string;
occurredAt?: Date;
};

const FORBIDDEN_PAYLOAD_KEYS = new Set([
'password',
'passwordHash',
'password_hash',
'token',
'accessToken',
'access_token',
'refreshToken',
'refresh_token',
'apiKey',
'api_key',
'secret',
]);

const FORBIDDEN_PAYLOAD_KEYS_NORMALIZED = new Set(
[...FORBIDDEN_PAYLOAD_KEYS].map((key) => key.replace(/[_-]/g, '').toLowerCase()),
);

function toJsonPayload(payload: Record<string, unknown>): Record<string, unknown> {
const serialized = JSON.stringify(payload);
if (serialized === undefined) {
throw new Error('Event payload must be JSON-serializable');
}
return JSON.parse(serialized) as Record<string, unknown>;
}

function assertNoSecretKeys(value: unknown, path = 'payload') {
if (Array.isArray(value)) {
value.forEach((item, index) => assertNoSecretKeys(item, `${path}[${index}]`));
return;
}

if (typeof value !== 'object' || value === null) {
return;
}

for (const [key, nestedValue] of Object.entries(value)) {
const normalizedKey = key.replace(/[_-]/g, '').toLowerCase();
if (FORBIDDEN_PAYLOAD_KEYS_NORMALIZED.has(normalizedKey)) {
throw new Error(`Event payload must not contain secret field "${path}.${key}"`);
}
assertNoSecretKeys(nestedValue, `${path}.${key}`);
}
}

export async function publishInTx(
tx: Prisma.TransactionClient,
topic: string,
type: string,
payload: Record<string, unknown>,
actor: EventActor,
options: PublishInTxOptions = {},
): Promise<string> {
assertNoSecretKeys(payload);
const jsonPayload = toJsonPayload(payload);

const messageId = randomUUID();
const envelope: EventEnvelope = eventEnvelopeSchema.parse({
v: 1,
messageId,
type,
occurredAt: (options.occurredAt ?? new Date()).toISOString(),
actor,
payload: jsonPayload,
meta: {
tenantId: null,
...(options.correlationId && { correlationId: options.correlationId }),
},
});

await tx.eventOutbox.create({
data: {
topic,
messageId,
type,
envelope: envelope as unknown as Prisma.InputJsonObject,
},
});

return messageId;
}
28 changes: 28 additions & 0 deletions backend/src/shared/outbox/processed-messages.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import type { Prisma } from '@prisma/client';

import { prisma } from '../../prisma/client.js';

type PrismaLike = Pick<typeof prisma, 'processedMessage'> | Prisma.TransactionClient;

export async function markProcessedOnce(
consumerGroup: string,
messageId: string,
client: PrismaLike = prisma,
): Promise<boolean> {
try {
await client.processedMessage.create({
data: { consumerGroup, messageId },
});
return true;
} catch (err) {
if (
typeof err === 'object' &&
err !== null &&
'code' in err &&
(err as { code?: string }).code === 'P2002'
) {
return false;
}
throw err;
}
}
86 changes: 86 additions & 0 deletions backend/tests/outbox.service.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { describe, expect, it } from 'vitest';

import { prisma } from '../src/prisma/client.js';
import { EVENT_TOPICS, eventEnvelopeSchema } from '../src/shared/eventbus/envelope.js';
import { publishInTx } from '../src/shared/outbox/outbox.service.js';
import { markProcessedOnce } from '../src/shared/outbox/processed-messages.service.js';

describe('transactional outbox', () => {
it('writes a valid envelope inside the caller transaction', async () => {
const messageId = await prisma.$transaction((tx) =>
publishInTx(
tx,
EVENT_TOPICS.issues,
'ISSUE_CREATED',
{ issueId: 'issue-1', projectId: 'project-1' },
{ userId: null },
{ occurredAt: new Date('2026-04-26T00:00:00.000Z'), correlationId: 'corr-1' },
),
);

const row = await prisma.eventOutbox.findUniqueOrThrow({ where: { messageId } });
expect(row.topic).toBe(EVENT_TOPICS.issues);
expect(row.type).toBe('ISSUE_CREATED');
expect(row.sentAt).toBeNull();
expect(row.attempts).toBe(0);

const envelope = eventEnvelopeSchema.parse(row.envelope);
expect(envelope).toMatchObject({
v: 1,
messageId,
type: 'ISSUE_CREATED',
occurredAt: '2026-04-26T00:00:00.000Z',
actor: { userId: null },
payload: { issueId: 'issue-1', projectId: 'project-1' },
meta: { tenantId: null, correlationId: 'corr-1' },
});
});

it('rolls back the outbox row when the caller transaction fails', async () => {
await expect(
prisma.$transaction(async (tx) => {
await publishInTx(
tx,
EVENT_TOPICS.issues,
'ISSUE_UPDATED',
{ issueId: 'rollback-issue' },
{ userId: null },
);
throw new Error('force rollback');
}),
).rejects.toThrow('force rollback');

await expect(
prisma.eventOutbox.findFirstOrThrow({
where: {
type: 'ISSUE_UPDATED',
envelope: { path: ['payload', 'issueId'], equals: 'rollback-issue' },
},
}),
).rejects.toThrow();
});

it('rejects payloads with secret-like fields before writing', async () => {
await expect(
prisma.$transaction((tx) =>
publishInTx(
tx,
EVENT_TOPICS.issues,
'ISSUE_UPDATED',
{ issueId: 'issue-1', actorSnapshot: { password_hash: 'secret' } },
{ userId: null },
),
),
).rejects.toThrow('payload.actorSnapshot.password_hash');
});
});

describe('processed message deduplication', () => {
it('marks the first message and skips duplicates for the same consumer group', async () => {
const messageId = '8d6763ae-ded0-4cb7-9cc5-8bb6cbbd5f87';

await expect(markProcessedOnce('notifications-service', messageId)).resolves.toBe(true);
await expect(markProcessedOnce('notifications-service', messageId)).resolves.toBe(false);
await expect(markProcessedOnce('webhooks-service', messageId)).resolves.toBe(true);
});
});
2 changes: 2 additions & 0 deletions backend/tests/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ beforeAll(async () => {
});

// Clean test database (respect foreign keys)
await prisma.processedMessage.deleteMany();
await prisma.eventOutbox.deleteMany();
await prisma.auditLog.deleteMany();
await prisma.timeLog.deleteMany();
await prisma.comment.deleteMany();
Expand Down
Loading
Loading