diff --git a/backend/src/app.module.ts b/backend/src/app.module.ts index 0a26e95..d98ecca 100644 --- a/backend/src/app.module.ts +++ b/backend/src/app.module.ts @@ -34,6 +34,7 @@ import { MetricsModule } from "./metrics/metrics.module"; import { HealthModule } from "./health/health.module"; import { VersionModule } from "./version/version.module"; import { ArtistStatusModule } from "./artist-status/artist-status.module"; +import { WebSocketModule } from "./websocket/websocket.module"; import { CustomThrottlerRedisStorage } from "./custom-throttler-storage-redis"; import { VaryAcceptEncodingMiddleware } from "./common/middleware/vary-accept-encoding.middleware"; @@ -107,6 +108,7 @@ import { VaryAcceptEncodingMiddleware } from "./common/middleware/vary-accept-en HealthModule, VersionModule, ArtistStatusModule, + WebSocketModule, ], controllers: [], providers: [ diff --git a/backend/src/tips/create-tips.dto.ts b/backend/src/tips/create-tips.dto.ts index 677bb02..965d95d 100644 --- a/backend/src/tips/create-tips.dto.ts +++ b/backend/src/tips/create-tips.dto.ts @@ -1,7 +1,40 @@ -import { IsUUID, IsOptional, IsString, MaxLength } from 'class-validator'; +import { IsUUID, IsOptional, IsString, MaxLength, ValidateNested } from 'class-validator'; import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; +import { Type } from 'class-transformer'; import { SanitiseAsPlainText } from '../common/utils/sanitise.util'; +export class TipMetadataDto { + @ApiPropertyOptional({ description: 'Source of the tip', example: 'mobile_app' }) + @IsOptional() + @IsString() + @MaxLength(50) + source?: string; + + @ApiPropertyOptional({ description: 'Campaign ID', example: 'summer2023' }) + @IsOptional() + @IsString() + @MaxLength(100) + campaign?: string; + + @ApiPropertyOptional({ description: 'Platform (web/ios/android)', example: 'android' }) + @IsOptional() + @IsString() + @MaxLength(20) + platform?: string; + + @ApiPropertyOptional({ description: 'Operating System', example: 'Android 13' }) + @IsOptional() + @IsString() + @MaxLength(50) + os?: string; + + @ApiPropertyOptional({ description: 'Client App Version', example: '2.1.0' }) + @IsOptional() + @IsString() + @MaxLength(20) + version?: string; +} + export class CreateTipDto { @ApiProperty({ description: 'Artist ID receiving the tip', example: '550e8400-e29b-41d4-a716-446655440001' }) @IsUUID() @@ -35,4 +68,13 @@ export class CreateTipDto { @IsString() @MaxLength(128) idempotencyKey?: string; + + @ApiPropertyOptional({ + description: 'Additional structured metadata', + type: TipMetadataDto, + }) + @IsOptional() + @ValidateNested() + @Type(() => TipMetadataDto) + metadata?: TipMetadataDto; } diff --git a/backend/src/tips/tips.service.ts b/backend/src/tips/tips.service.ts index 7c71010..8dea59d 100644 --- a/backend/src/tips/tips.service.ts +++ b/backend/src/tips/tips.service.ts @@ -65,7 +65,7 @@ export class TipsService { ) {} async create(userId: string, createTipDto: CreateTipDto): Promise { - const { artistId, trackId, stellarTxHash, message, idempotencyKey } = createTipDto; + const { artistId, trackId, stellarTxHash, message, idempotencyKey, metadata } = createTipDto; // --- Idempotency key check: replay the original response if key already seen --- if (idempotencyKey) { @@ -179,6 +179,7 @@ export class TipsService { status: TipStatus.VERIFIED, verifiedAt: new Date(), stellarTimestamp: new Date(txDetails.created_at), + metadata: metadata ? JSON.stringify(metadata) : null, ...(idempotencyKey ? { idempotencyKey } : {}), }); diff --git a/backend/src/websocket/event-store.service.ts b/backend/src/websocket/event-store.service.ts new file mode 100644 index 0000000..92ca808 --- /dev/null +++ b/backend/src/websocket/event-store.service.ts @@ -0,0 +1,67 @@ +import { Injectable, Logger } from '@nestjs/common'; + +export interface StoredEvent { + sequenceId: number; + type: string; + data: any; + rooms: string[]; + timestamp: Date; +} + +@Injectable() +export class EventStoreService { + private readonly logger = new Logger(EventStoreService.name); + private events: StoredEvent[] = []; + private nextSequenceId = 1; + private readonly maxWindowSize = 1000; // Store up to 1000 recent events + + /** + * Store a new event and assign a sequence ID + */ + storeEvent(type: string, data: any, rooms: string[]): StoredEvent { + const event: StoredEvent = { + sequenceId: this.nextSequenceId++, + type, + data, + rooms, + timestamp: new Date(), + }; + + this.events.push(event); + + // Keep only the sliding window of events + if (this.events.length > this.maxWindowSize) { + this.events.shift(); + } + + this.logger.debug(`Stored event ${event.sequenceId} (${type}) for rooms: ${rooms.join(', ')}`); + return event; + } + + /** + * Get missed events for a client after a specific sequence ID + */ + getEventsAfter(sequenceId: number, targetRooms: string[]): StoredEvent[] { + // Only return events that the client should have access to based on their joined rooms + return this.events.filter( + (event) => + event.sequenceId > sequenceId && + event.rooms.some((room) => targetRooms.includes(room)) + ); + } + + /** + * Get the latest sequence ID + */ + getLatestSequenceId(): number { + return this.nextSequenceId - 1; + } + + /** + * Clear the event store (useful for testing) + */ + clear(): void { + this.events = []; + this.nextSequenceId = 1; + } +} diff --git a/backend/src/websocket/websocket.gateway.ts b/backend/src/websocket/websocket.gateway.ts index ad3fa16..d3c7190 100644 --- a/backend/src/websocket/websocket.gateway.ts +++ b/backend/src/websocket/websocket.gateway.ts @@ -9,10 +9,14 @@ import { } from '@nestjs/websockets'; import { Server, Socket } from 'socket.io'; import { Logger } from '@nestjs/common'; +import { OnEvent } from '@nestjs/event-emitter'; import { Tip } from '../tips/entities/tip.entity'; +import { EventStoreService } from './event-store.service'; +import { TipVerifiedEvent } from '../tips/events/tip-verified.event'; export interface TipNotificationData { type: 'tip_received'; + sequenceId: number; data: { tipId: string; artistId: string; @@ -43,7 +47,9 @@ export class WebSocketGateway implements OnGatewayConnection, OnGatewayDisconnec private readonly logger = new Logger(WebSocketGateway.name); private connectedClients: Map = new Map(); - handleConnection(client: Socket): void { + constructor(private readonly eventStore: EventStoreService) {} + + async handleConnection(client: Socket): Promise { this.logger.log(`Client connected: ${client.id}`); this.connectedClients.set(client.id, client); @@ -51,7 +57,16 @@ export class WebSocketGateway implements OnGatewayConnection, OnGatewayDisconnec client.emit('connected', { message: 'Connected to TipTune WebSocket', clientId: client.id, + latestSequenceId: this.eventStore.getLatestSequenceId(), }); + + // Handle initial catch-up if requested + const lastSequenceId = parseInt(client.handshake.query.lastSequenceId as string); + if (!isNaN(lastSequenceId)) { + this.logger.log(`Client ${client.id} requested catch-up from sequence: ${lastSequenceId}`); + // Replay missed events will be handled after rooms are joined or based on past rooms if stored + // For now, we'll allow an explicit 'request_missed_events' message or handle it post-authentication + } } handleDisconnect(client: Socket): void { @@ -62,13 +77,18 @@ export class WebSocketGateway implements OnGatewayConnection, OnGatewayDisconnec @SubscribeMessage('join_artist_room') handleJoinArtistRoom( @ConnectedSocket() client: Socket, - @MessageBody() data: { artistId: string }, + @MessageBody() data: { artistId: string; lastSequenceId?: number }, ): void { const room = `artist_${data.artistId}`; client.join(room); this.logger.log(`Client ${client.id} joined room: ${room}`); client.emit('joined_room', { room, artistId: data.artistId }); + + // Catch up if lastSequenceId provided + if (data.lastSequenceId !== undefined) { + this.replayMissedEvents(client, data.lastSequenceId, [room]); + } } @SubscribeMessage('leave_artist_room') @@ -86,13 +106,18 @@ export class WebSocketGateway implements OnGatewayConnection, OnGatewayDisconnec @SubscribeMessage('join_track_room') handleJoinTrackRoom( @ConnectedSocket() client: Socket, - @MessageBody() data: { trackId: string }, + @MessageBody() data: { trackId: string; lastSequenceId?: number }, ): void { const room = `track_${data.trackId}`; client.join(room); this.logger.log(`Client ${client.id} joined room: ${room}`); client.emit('joined_room', { room, trackId: data.trackId }); + + // Catch up if lastSequenceId provided + if (data.lastSequenceId !== undefined) { + this.replayMissedEvents(client, data.lastSequenceId, [room]); + } } @SubscribeMessage('leave_track_room') @@ -107,54 +132,98 @@ export class WebSocketGateway implements OnGatewayConnection, OnGatewayDisconnec client.emit('left_room', { room, trackId: data.trackId }); } + @SubscribeMessage('ack') + handleAck( + @ConnectedSocket() client: Socket, + @MessageBody() data: { sequenceId: number }, + ): void { + this.logger.debug(`Client ${client.id} acknowledged sequence ID: ${data.sequenceId}`); + } + @SubscribeMessage('ping') handlePing(@ConnectedSocket() client: Socket): void { client.emit('pong', { timestamp: new Date() }); } /** - * Send tip notification to relevant rooms + * Listen for tip.verified events from TipsService + */ + @OnEvent('tip.verified') + async handleTipVerifiedEvent(event: TipVerifiedEvent): Promise { + this.logger.log(`Received tip.verified event for tip ${event.tip.id}`); + await this.sendTipNotification(event.tip); + } + + /** + * Send tip notification to relevant rooms and store it */ async sendTipNotification(tip: Tip): Promise { try { + const artistRoom = `artist_${tip.artistId}`; + const trackRoom = tip.trackId ? `track_${tip.trackId}` : null; + const rooms = [artistRoom, 'global']; + if (trackRoom) rooms.push(trackRoom); + + const payload = { + tipId: tip.id, + artistId: tip.artistId, + trackId: tip.trackId, + amount: tip.amount, + asset: tip.asset, + message: tip.message, + senderAddress: tip.isAnonymous ? undefined : tip.senderAddress, + isAnonymous: tip.isAnonymous, + createdAt: tip.createdAt, + artist: tip.artist, + track: tip.track, + }; + + // Store event first to get sequence ID + const storedEvent = this.eventStore.storeEvent('tip_received', payload, rooms); + const notificationData: TipNotificationData = { type: 'tip_received', - data: { - tipId: tip.id, - artistId: tip.artistId, - trackId: tip.trackId, - amount: tip.amount, - asset: tip.asset, - message: tip.message, - senderAddress: tip.isAnonymous ? undefined : tip.senderAddress, - isAnonymous: tip.isAnonymous, - createdAt: tip.createdAt, - artist: tip.artist, - track: tip.track, - }, + sequenceId: storedEvent.sequenceId, + data: payload, }; // Send to artist room - const artistRoom = `artist_${tip.artistId}`; this.server.to(artistRoom).emit('tip_notification', notificationData); - this.logger.log(`Sent tip notification to room: ${artistRoom}`); + this.logger.log(`Sent tip notification ${storedEvent.sequenceId} to room: ${artistRoom}`); // Send to track room if track is specified - if (tip.trackId) { - const trackRoom = `track_${tip.trackId}`; + if (trackRoom) { this.server.to(trackRoom).emit('tip_notification', notificationData); - this.logger.log(`Sent tip notification to room: ${trackRoom}`); + this.logger.log(`Sent tip notification ${storedEvent.sequenceId} to room: ${trackRoom}`); } // Send to all connected clients for global notifications this.server.emit('global_tip_notification', notificationData); - this.logger.log(`Sent global tip notification`); + this.logger.log(`Sent global tip notification ${storedEvent.sequenceId}`); } catch (error) { this.logger.error(`Failed to send tip notification: ${error.message}`); } } + /** + * Replay missed events for a client + */ + private replayMissedEvents(client: Socket, lastSequenceId: number, rooms: string[]): void { + const missedEvents = this.eventStore.getEventsAfter(lastSequenceId, rooms); + if (missedEvents.length > 0) { + this.logger.log(`Replaying ${missedEvents.length} missed events for client ${client.id}`); + for (const event of missedEvents) { + const notificationData: TipNotificationData = { + type: 'tip_received', + sequenceId: event.sequenceId, + data: event.data, + }; + client.emit('tip_notification', notificationData); + } + } + } + /** * Send general notification */ diff --git a/backend/src/websocket/websocket.module.ts b/backend/src/websocket/websocket.module.ts index b739215..b1e497a 100644 --- a/backend/src/websocket/websocket.module.ts +++ b/backend/src/websocket/websocket.module.ts @@ -1,8 +1,9 @@ import { Module } from '@nestjs/common'; import { WebSocketGateway } from './websocket.gateway'; +import { EventStoreService } from './event-store.service'; @Module({ - providers: [WebSocketGateway], - exports: [WebSocketGateway], + providers: [WebSocketGateway, EventStoreService], + exports: [WebSocketGateway, EventStoreService], }) export class WebSocketModule {} diff --git a/backend/test/tips-validation.unit.spec.ts b/backend/test/tips-validation.unit.spec.ts new file mode 100644 index 0000000..cb4b5a3 --- /dev/null +++ b/backend/test/tips-validation.unit.spec.ts @@ -0,0 +1,87 @@ +import { ValidationPipe, BadRequestException } from '@nestjs/common'; +import { CreateTipDto } from '../src/tips/create-tips.dto'; + +describe('Tip Metadata Validation (DTO Unit Test)', () => { + let validationPipe: ValidationPipe; + + beforeEach(async () => { + validationPipe = new ValidationPipe({ + whitelist: true, + forbidNonWhitelisted: true, + transform: true, + }); + }); + + const validTipBase: CreateTipDto = { + artistId: '550e8400-e29b-41d4-a716-446655440001', + stellarTxHash: 'c6e0b3e5c8a4f2d1b9a7e6f3c5d8a2b1c4e7f0a9b3d6e9f2c5a8b1e4f7a0c3d6', + } as CreateTipDto; + + it('should accept valid metadata', async () => { + const dto = { + ...validTipBase, + metadata: { + source: 'web', + campaign: 'ref-code-123', + }, + }; + + // To test validation pipe manually, we need some metadata + const metadata = { + type: 'body', + metatype: CreateTipDto, + data: '', + } as any; + + const transformed = await validationPipe.transform(dto, metadata); + expect(transformed.metadata.source).toBe('web'); + }); + + it('should fail if metadata field exceeds MaxLength', async () => { + const dto = { + ...validTipBase, + metadata: { + source: 'a'.repeat(51), // Max is 50 + }, + }; + + const metadata = { + type: 'body', + metatype: CreateTipDto, + data: '', + } as any; + + try { + await validationPipe.transform(dto, metadata); + throw new Error('Should have thrown BadRequestException'); + } catch (e) { + expect(e).toBeInstanceOf(BadRequestException); + const response = e.getResponse(); + expect(response.message).toContain('metadata.source must be shorter than or equal to 50 characters'); + } + }); + + it('should fail if metadata contains unknown fields', async () => { + const dto = { + ...validTipBase, + metadata: { + unknown: 'value', + }, + }; + + const metadata = { + type: 'body', + metatype: CreateTipDto, + data: '', + } as any; + + try { + await validationPipe.transform(dto, metadata); + throw new Error('Should have thrown BadRequestException'); + } catch (e) { + expect(e).toBeInstanceOf(BadRequestException); + const response = e.getResponse(); + expect(response.message).toContain('metadata.property unknown should not exist'); + } + }); +}); diff --git a/backend/test/websocket-delivery.spec.ts b/backend/test/websocket-delivery.spec.ts new file mode 100644 index 0000000..7757027 --- /dev/null +++ b/backend/test/websocket-delivery.spec.ts @@ -0,0 +1,188 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { INestApplication } from '@nestjs/common'; +import { io, Socket as ClientSocket } from 'socket.io-client'; +import { AppModule } from '../src/app.module'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { TipVerifiedEvent } from '../src/tips/events/tip-verified.event'; +import { Tip } from '../src/tips/entities/tip.entity'; +import { EventStoreService } from '../src/websocket/event-store.service'; + +describe('WebSocket Delivery (Integration)', () => { + let app: INestApplication; + let eventEmitter: EventEmitter2; + let eventStore: EventStoreService; + let port: number; + + beforeAll(async () => { + const moduleFixture: TestingModule = await Test.createTestingModule({ + imports: [AppModule], + }).compile(); + + app = moduleFixture.createNestApplication(); + await app.init(); + + // Get a random available port or use a specific one for testing + const server = app.getHttpServer(); + await new Promise((resolve) => { + server.listen(0, () => { + port = server.address().port; + resolve(); + }); + }); + + eventEmitter = app.get(EventEmitter2); + eventStore = app.get(EventStoreService); + }); + + afterAll(async () => { + await app.close(); + }); + + beforeEach(() => { + eventStore.clear(); + }); + + const createClient = (namespace: string = '/tips'): ClientSocket => { + return io(`http://localhost:${port}${namespace}`, { + transports: ['websocket'], + autoConnect: true, + }); + }; + + it('should replay missed events upon reconnection with lastSequenceId', (done) => { + const artistId = 'artist-123'; + const client = createClient(); + let sequenceId1: number; + + client.on('connected', (data) => { + // Join artist room + client.emit('join_artist_room', { artistId }); + }); + + client.on('joined_room', (data) => { + if (data.artistId === artistId) { + // Trigger first tip + const tip1 = { + id: 'tip-1', + artistId: artistId, + amount: 10, + asset: 'XLM', + createdAt: new Date(), + } as Tip; + eventEmitter.emit('tip.verified', new TipVerifiedEvent(tip1, 'user-1')); + } + }); + + client.on('tip_notification', (notif) => { + if (notif.data.tipId === 'tip-1') { + sequenceId1 = notif.sequenceId; + expect(sequenceId1).toBe(1); + + // Disconnect client + client.disconnect(); + + // Trigger second tip while disconnected + const tip2 = { + id: 'tip-2', + artistId: artistId, + amount: 20, + asset: 'XLM', + createdAt: new Date(), + } as Tip; + + // Use a small timeout to ensure disconnect is processed if needed + setTimeout(() => { + eventEmitter.emit('tip.verified', new TipVerifiedEvent(tip2, 'user-1')); + + // Reconnect with lastSequenceId + const client2 = io(`http://localhost:${port}/tips`, { + transports: ['websocket'], + query: { lastSequenceId: sequenceId1.toString() } + }); + + client2.on('connected', () => { + // Join room again with lastSequenceId + client2.emit('join_artist_room', { artistId, lastSequenceId: sequenceId1 }); + }); + + client2.on('tip_notification', (notif2) => { + if (notif2.data.tipId === 'tip-2') { + expect(notif2.sequenceId).toBe(2); + client2.disconnect(); + done(); + } + }); + }, 100); + } + }); + }); + + it('should not replay events from other rooms', (done) => { + const artistId1 = 'artist-1'; + const artistId2 = 'artist-2'; + const client = createClient(); + + client.on('connected', () => { + client.emit('join_artist_room', { artistId: artistId1 }); + }); + + client.on('joined_room', () => { + // Trigger tip for artist 2 (should not be received by client in artist 1 room) + const tip2 = { + id: 'tip-artist-2', + artistId: artistId2, + amount: 5, + asset: 'XLM', + createdAt: new Date(), + } as Tip; + eventEmitter.emit('tip.verified', new TipVerifiedEvent(tip2, 'user-1')); + + // Trigger tip for artist 1 + const tip1 = { + id: 'tip-artist-1', + artistId: artistId1, + amount: 10, + asset: 'XLM', + createdAt: new Date(), + } as Tip; + + setTimeout(() => { + eventEmitter.emit('tip.verified', new TipVerifiedEvent(tip1, 'user-1')); + }, 100); + }); + + let receivedTips = 0; + client.on('tip_notification', (notif) => { + receivedTips++; + expect(notif.data.artistId).toBe(artistId1); + + if (receivedTips === 1) { + // Disconnect and test replay logic filter + client.disconnect(); + + const client2 = io(`http://localhost:${port}/tips`, { + transports: ['websocket'] + }); + + client2.on('connected', () => { + // Reconnect to artist 1 but with sequence 0 + client2.emit('join_artist_room', { artistId: artistId1, lastSequenceId: 0 }); + }); + + let replayedTips = 0; + client2.on('tip_notification', (notif2) => { + replayedTips++; + expect(notif2.data.artistId).toBe(artistId1); + // Only tip-artist-1 should be replayed for this room + if (replayedTips === 1) { + setTimeout(() => { + expect(replayedTips).toBe(1); + client2.disconnect(); + done(); + }, 500); + } + }); + } + }); + }); +});