Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: dev backend frontend infra stop clean seed test lint setup docs sync pr ship merge branches worktree-clean branch-clean
.PHONY: dev backend frontend infra events stop clean seed test lint setup docs sync pr ship merge branches worktree-clean branch-clean

# --- First time setup ---
setup:
Expand All @@ -11,6 +11,12 @@ infra:
@until docker compose exec -T postgres pg_isready -U tasktime >/dev/null 2>&1; do sleep 1; done
@echo "PostgreSQL ready on :5432, Redis ready on :6379"

events:
docker compose --profile events up -d postgres redis kafka backend-relay
@echo "Waiting for Kafka..."
@until docker compose --profile events exec -T kafka kafka-topics.sh --bootstrap-server localhost:9092 --list >/dev/null 2>&1; do sleep 1; done
@echo "Kafka ready on :9092, backend-relay started"

# --- Dev servers ---
backend: infra
cd backend && npm run dev
Expand Down
10 changes: 10 additions & 0 deletions backend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
"build": "tsc",
"typecheck": "tsc --noEmit",
"start": "node dist/server.js",
"events:relay": "node dist/relay.js",
"events:relay:dev": "tsx src/relay.ts",
"events:tail": "node dist/scripts/events-tail.js",
"events:tail:dev": "tsx src/scripts/events-tail.ts",
"user:promote-super-admin": "tsx src/scripts/promote-super-admin.ts",
"user:rotate-password": "tsx src/scripts/rotate-password.ts",
"db:bootstrap": "node dist/prisma/bootstrap.js",
Expand Down Expand Up @@ -45,6 +49,7 @@
"express": "^4.21.2",
"helmet": "^8.0.0",
"jsonwebtoken": "^9.0.2",
"kafkajs": "^2.2.4",
"node-cron": "^4.2.1",
"prom-client": "^15.1.3",
"redis": "^5.11.0",
Expand Down
12 changes: 12 additions & 0 deletions backend/src/config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import 'dotenv/config';
import { z } from 'zod';

const booleanFlag = z.enum(['true', 'false']).default('false').transform((value) => value === 'true');

const envSchema = z.object({
DATABASE_URL: z.string(),
JWT_SECRET: z.string().min(10),
Expand Down Expand Up @@ -30,6 +32,16 @@ const envSchema = z.object({
BURNDOWN_RETENTION_CRON: z.string().default('0 3 * * 0'),
BURNDOWN_RETENTION_DAYS_AFTER_DONE: z.coerce.number().min(7).max(3650).default(90),
BURNDOWN_WEEKLY_AGG_AFTER_DAYS: z.coerce.number().min(30).max(3650).default(365),

// TTBUS-0: Kafka event bus + transactional outbox relay.
NOTIFICATIONS_ENABLED: booleanFlag,
KAFKA_BROKERS: z.string().default('localhost:9092'),
KAFKA_CLIENT_ID: z.string().min(1).default('tasktime-backend'),
OUTBOX_RELAY_INTERVAL_MS: z.coerce.number().min(100).max(60000).default(500),
OUTBOX_RELAY_BATCH_SIZE: z.coerce.number().min(1).max(500).default(100),
OUTBOX_RELAY_MAX_ATTEMPTS: z.coerce.number().min(1).max(100).default(10),
OUTBOX_RELAY_TRANSACTION_TIMEOUT_MS: z.coerce.number().min(1000).max(300000).default(60000),
OUTBOX_CLEANUP_RETENTION_DAYS: z.coerce.number().min(1).max(365).default(7),
});

export const config = envSchema.parse(process.env);
Expand Down
36 changes: 36 additions & 0 deletions backend/src/relay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { config } from './config.js';
import { createEventBusProducer } from './shared/eventbus/producer.js';
import { PrismaOutboxRelayStore } from './shared/outbox/relay-store.js';
import { OutboxRelayWorker } from './shared/outbox/relay-worker.js';

if (!config.NOTIFICATIONS_ENABLED) {
console.log('Outbox relay disabled: NOTIFICATIONS_ENABLED=false');
process.exit(0);
}

const worker = new OutboxRelayWorker({
store: new PrismaOutboxRelayStore(config.OUTBOX_RELAY_TRANSACTION_TIMEOUT_MS),
producer: createEventBusProducer(),
intervalMs: config.OUTBOX_RELAY_INTERVAL_MS,
batchSize: config.OUTBOX_RELAY_BATCH_SIZE,
maxAttempts: config.OUTBOX_RELAY_MAX_ATTEMPTS,
cleanupRetentionDays: config.OUTBOX_CLEANUP_RETENTION_DAYS,
});

async function shutdown(signal: string) {
console.log(`Outbox relay received ${signal}, shutting down`);
await worker.stop();
process.exit(0);
}

process.once('SIGINT', () => {
void shutdown('SIGINT');
});
process.once('SIGTERM', () => {
void shutdown('SIGTERM');
});

worker.start().catch((err) => {
console.error('Outbox relay failed to start', err);
process.exit(1);
});
53 changes: 53 additions & 0 deletions backend/src/scripts/events-tail.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { createKafkaClient } from '../shared/eventbus/client.js';
import { eventEnvelopeSchema } from '../shared/eventbus/envelope.js';

const topics = process.argv.slice(2);

if (topics.length === 0) {
console.error('Usage: npm run events:tail -- <topic> [topic...]');
process.exit(1);
}

const groupId = `tasktime-events-tail-${Date.now()}`;
const kafka = createKafkaClient({ clientId: process.env.KAFKA_CLIENT_ID ?? 'tasktime-events-tail' });
const consumer = kafka.consumer({ groupId });

async function main() {
await consumer.connect();
for (const topic of topics) {
await consumer.subscribe({ topic, fromBeginning: false });
}

console.log(`Tailing Kafka topics: ${topics.join(', ')}`);
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (!message.value) {
console.log(JSON.stringify({ topic, partition, offset: message.offset, empty: true }));
return;
}

const parsed = JSON.parse(message.value.toString());
const envelope = eventEnvelopeSchema.parse(parsed);
console.log(JSON.stringify({ topic, partition, offset: message.offset, envelope }));
},
});
}

async function shutdown(signal: string) {
console.error(`events:tail received ${signal}, shutting down`);
await consumer.disconnect();
process.exit(0);
}

process.once('SIGINT', () => {
void shutdown('SIGINT');
});
process.once('SIGTERM', () => {
void shutdown('SIGTERM');
});

main().catch(async (err) => {
console.error(err);
await consumer.disconnect().catch(() => undefined);
process.exit(1);
});
22 changes: 22 additions & 0 deletions backend/src/shared/eventbus/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Kafka, logLevel, type KafkaConfig } from 'kafkajs';

export function parseKafkaBrokers(value: string): string[] {
return value
.split(',')
.map((broker) => broker.trim())
.filter(Boolean);
}

export function createKafkaClient(overrides: Partial<KafkaConfig> = {}): Kafka {
const brokers = overrides.brokers ?? parseKafkaBrokers(process.env.KAFKA_BROKERS ?? 'localhost:9092');
if (brokers.length === 0) {
throw new Error('KAFKA_BROKERS must contain at least one broker');
}

return new Kafka({
clientId: process.env.KAFKA_CLIENT_ID ?? 'tasktime-backend',
brokers,
logLevel: logLevel.INFO,
...overrides,
});
}
92 changes: 92 additions & 0 deletions backend/src/shared/eventbus/consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import type { Consumer, EachMessagePayload, Kafka, KafkaMessage } from 'kafkajs';

import { createKafkaClient } from './client.js';
import { type EventEnvelope, eventEnvelopeSchema } from './envelope.js';
import { hasProcessedMessage, markProcessedOnce } from '../outbox/processed-messages.service.js';

export type EventHandler<T = Record<string, unknown>> = (
envelope: EventEnvelope,
payload: T,
kafka: Pick<EachMessagePayload, 'topic' | 'partition'> & { offset: string },
) => Promise<void>;

export type ProcessKafkaMessageOptions<T = Record<string, unknown>> = {
consumerGroup: string;
message: Pick<KafkaMessage, 'value' | 'offset'>;
topic: string;
partition: number;
handler: EventHandler<T>;
};

export async function processKafkaMessage<T = Record<string, unknown>>({
consumerGroup,
message,
topic,
partition,
handler,
}: ProcessKafkaMessageOptions<T>): Promise<'processed' | 'skipped'> {
if (!message.value) {
throw new Error(`Kafka message on ${topic}[${partition}] offset ${message.offset} has no value`);
}

const envelope = eventEnvelopeSchema.parse(JSON.parse(message.value.toString()));
if (await hasProcessedMessage(consumerGroup, envelope.messageId)) {
return 'skipped';
}

await handler(envelope, envelope.payload as T, { topic, partition, offset: message.offset });
await markProcessedOnce(consumerGroup, envelope.messageId);
return 'processed';
}

export type EventBusConsumerOptions<T = Record<string, unknown>> = {
consumerGroup: string;
topics: string[];
handler: EventHandler<T>;
kafka?: Kafka;
consumer?: Consumer;
fromBeginning?: boolean;
};

export class EventBusConsumer<T = Record<string, unknown>> {
private readonly consumer: Consumer;
private readonly consumerGroup: string;
private readonly topics: string[];
private readonly handler: EventHandler<T>;
private readonly fromBeginning: boolean;
private connected = false;

constructor(options: EventBusConsumerOptions<T>) {
this.consumerGroup = options.consumerGroup;
this.topics = options.topics;
this.handler = options.handler;
this.fromBeginning = options.fromBeginning ?? false;
this.consumer = options.consumer ?? (options.kafka ?? createKafkaClient()).consumer({ groupId: options.consumerGroup });
}

async connectAndRun(): Promise<void> {
if (this.connected) return;
await this.consumer.connect();
for (const topic of this.topics) {
await this.consumer.subscribe({ topic, fromBeginning: this.fromBeginning });
}
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
await processKafkaMessage({
consumerGroup: this.consumerGroup,
message,
topic,
partition,
handler: this.handler,
});
},
});
this.connected = true;
}

async disconnect(): Promise<void> {
if (!this.connected) return;
await this.consumer.disconnect();
this.connected = false;
}
}
54 changes: 54 additions & 0 deletions backend/src/shared/eventbus/producer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import type { Kafka, Message, Producer } from 'kafkajs';

import { createKafkaClient } from './client.js';
import { type EventEnvelope, eventEnvelopeSchema } from './envelope.js';

export type EventBusProducerOptions = {
kafka?: Kafka;
producer?: Producer;
};

export class EventBusProducer {
private readonly producer: Producer;
private connected = false;

constructor(options: EventBusProducerOptions = {}) {
this.producer = options.producer ?? (options.kafka ?? createKafkaClient()).producer();
}

async connect(): Promise<void> {
if (this.connected) return;
await this.producer.connect();
this.connected = true;
}

async disconnect(): Promise<void> {
if (!this.connected) return;
await this.producer.disconnect();
this.connected = false;
}

async sendEnvelope(topic: string, envelope: EventEnvelope): Promise<void> {
const validEnvelope = eventEnvelopeSchema.parse(envelope);
await this.connect();

const message: Message = {
key: validEnvelope.messageId,
value: JSON.stringify(validEnvelope),
headers: {
type: validEnvelope.type,
v: String(validEnvelope.v),
occurredAt: validEnvelope.occurredAt,
},
};

await this.producer.send({
topic,
messages: [message],
});
}
}

export function createEventBusProducer(options?: EventBusProducerOptions): EventBusProducer {
return new EventBusProducer(options);
}
11 changes: 11 additions & 0 deletions backend/src/shared/outbox/processed-messages.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,14 @@ export async function markProcessedOnce(
throw err;
}
}

export async function hasProcessedMessage(
consumerGroup: string,
messageId: string,
client: PrismaLike = prisma,
): Promise<boolean> {
const count = await client.processedMessage.count({
where: { consumerGroup, messageId },
});
return count > 0;
}
Loading
Loading