diff --git a/backend/README.md b/backend/README.md index 1fdfe763..9f0b137b 100644 --- a/backend/README.md +++ b/backend/README.md @@ -44,6 +44,16 @@ $ npm run start:dev $ npm run start:prod ``` +## Chain replay CLI + +Recover missed historical chain events into DB: + +```bash +$ npm run replay:chain-events -- --start-cursor --end-cursor --dry-run +``` + +Detailed runbook: `docs/CHAIN_EVENT_REPLAY.md` + ## Run tests ```bash diff --git a/backend/docs/CHAIN_EVENT_REPLAY.md b/backend/docs/CHAIN_EVENT_REPLAY.md new file mode 100644 index 00000000..941263d8 --- /dev/null +++ b/backend/docs/CHAIN_EVENT_REPLAY.md @@ -0,0 +1,58 @@ +# Chain Event Replay CLI + +## Purpose + +Replay historical Stellar chain events into PostgreSQL to recover indexer gaps safely. + +## Command + +```bash +npm run replay:chain-events -- --start-cursor [--end-cursor ] [--dry-run] [--limit 200] +``` + +## Required Environment + +- `DATABASE_URL`: PostgreSQL connection string +- `HORIZON_URL` (optional): Horizon base URL (defaults to `https://horizon-testnet.stellar.org`) + +## Parameters + +- `--start-cursor` (required): Inclusive starting cursor for replay. +- `--end-cursor` (optional): Inclusive ending cursor; replay stops once this cursor is reached. +- `--dry-run` (optional): Reads and evaluates events but does not write to DB. +- `--limit` (optional): Page size per request (1..200, default `200`). + +## Idempotency and Safety + +- Replayed records are stored in table `chain_event_replay`. +- Each record is keyed by `paging_token` and deduplicated with `ON CONFLICT DO NOTHING`. +- Re-running the same range is safe; duplicates are skipped and reported in summary output. + +## Dry-Run Workflow (Recommended) + +1. Validate scope and expected volume: + +```bash +npm run replay:chain-events -- --start-cursor 123 --end-cursor 999 --dry-run +``` + +2. Execute replay: + +```bash +npm run replay:chain-events -- --start-cursor 123 --end-cursor 999 +``` + +## Output + +The CLI prints JSON summary: + +- `startCursor` +- `endCursor` +- `finalCursor` +- `pages` +- `fetched` +- `inserted` +- `duplicates` +- `dryRun` + +Use `finalCursor` as a checkpoint for subsequent replay windows. diff --git a/backend/package-lock.json b/backend/package-lock.json index d1e14968..54e9d4f3 100644 --- a/backend/package-lock.json +++ b/backend/package-lock.json @@ -64,10 +64,6 @@ "tsconfig-paths": "^4.2.0", "typescript": "^5.7.3", "typescript-eslint": "^8.20.0" - }, - "overrides": { - "minimatch": ">=9.0.6", - "multer": ">=2.1.1" } }, "node_modules/@angular-devkit/core": { diff --git a/backend/package.json b/backend/package.json index 20014dba..60dcad7d 100644 --- a/backend/package.json +++ b/backend/package.json @@ -17,7 +17,8 @@ "test:watch": "jest --watch", "test:cov": "jest --coverage", "test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand", - "test:e2e": "jest --config ./test/jest-e2e.json" + "test:e2e": "jest --config ./test/jest-e2e.json", + "replay:chain-events": "ts-node src/cli/replay-chain-events.ts" }, "dependencies": { "@nestjs/common": "^11.1.17", diff --git a/backend/src/cli/replay-chain-events.ts b/backend/src/cli/replay-chain-events.ts new file mode 100644 index 00000000..024dfa5a --- /dev/null +++ b/backend/src/cli/replay-chain-events.ts @@ -0,0 +1,88 @@ +import { ChainEventReplayRunner, ReplayOptions } from '../indexer/replay/chain-event-replay.runner'; +import { HorizonChainEventSource } from '../indexer/replay/horizon-chain-event.source'; +import { PostgresChainEventStore } from '../indexer/replay/postgres-chain-event.store'; + +interface CliArgs { + startCursor: string; + endCursor?: string; + dryRun: boolean; + limit: number; +} + +function readArg(args: string[], key: string): string | undefined { + const index = args.findIndex((arg) => arg === key); + if (index === -1) return undefined; + return args[index + 1]; +} + +function parseArgs(argv: string[]): CliArgs { + const startCursor = readArg(argv, '--start-cursor'); + if (!startCursor) { + throw new Error('Missing required argument: --start-cursor '); + } + + const endCursor = readArg(argv, '--end-cursor'); + const limitRaw = readArg(argv, '--limit'); + const dryRun = argv.includes('--dry-run'); + + const limit = limitRaw ? Number.parseInt(limitRaw, 10) : 200; + if (!Number.isFinite(limit) || limit < 1 || limit > 200) { + throw new Error('Invalid --limit value. Expected an integer between 1 and 200.'); + } + + return { + startCursor, + endCursor, + dryRun, + limit, + }; +} + +function printUsage(): void { + // Keep this concise for operators running incidents. + console.log( + [ + 'Usage:', + ' npm run replay:chain-events -- --start-cursor [--end-cursor ] [--dry-run] [--limit 200]', + '', + 'Environment:', + ' DATABASE_URL PostgreSQL DSN for replay storage (required)', + ' HORIZON_URL Horizon base URL (default: https://horizon-testnet.stellar.org)', + ].join('\n'), + ); +} + +async function main(): Promise { + try { + const args = parseArgs(process.argv.slice(2)); + const databaseUrl = process.env.DATABASE_URL?.trim(); + if (!databaseUrl) { + throw new Error('DATABASE_URL is required.'); + } + + const horizonUrl = + process.env.HORIZON_URL?.trim() || 'https://horizon-testnet.stellar.org'; + + const source = new HorizonChainEventSource(horizonUrl); + const store = new PostgresChainEventStore(databaseUrl); + const runner = new ChainEventReplayRunner(source, store); + + const replayOptions: ReplayOptions = { + startCursor: args.startCursor, + endCursor: args.endCursor, + dryRun: args.dryRun, + limit: args.limit, + }; + + const summary = await runner.replay(replayOptions); + + console.log(JSON.stringify({ ok: true, summary }, null, 2)); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.error(JSON.stringify({ ok: false, error: message }, null, 2)); + printUsage(); + process.exitCode = 1; + } +} + +void main(); diff --git a/backend/src/common/http/body-parser.config.ts b/backend/src/common/http/body-parser.config.ts new file mode 100644 index 00000000..af0c8845 --- /dev/null +++ b/backend/src/common/http/body-parser.config.ts @@ -0,0 +1,55 @@ +import { INestApplication } from '@nestjs/common'; +import { json, Request, Response, NextFunction, urlencoded } from 'express'; + +type RequestWithRawBody = Request & { rawBody?: Buffer }; + +const DEFAULT_JSON_LIMIT = '100kb'; +const DEFAULT_URLENCODED_LIMIT = '100kb'; +const DEFAULT_WEBHOOK_JSON_LIMIT = '1mb'; + +function captureRawBody( + req: Request, + _res: Response, + buffer: Buffer, +): void { + if (buffer.length > 0) { + (req as RequestWithRawBody).rawBody = Buffer.from(buffer); + } +} + +export function configureBodyParserLimits(app: INestApplication): void { + const jsonLimit = process.env.BODY_JSON_LIMIT ?? DEFAULT_JSON_LIMIT; + const urlencodedLimit = + process.env.BODY_URLENCODED_LIMIT ?? DEFAULT_URLENCODED_LIMIT; + const webhookJsonLimit = + process.env.BODY_WEBHOOK_JSON_LIMIT ?? DEFAULT_WEBHOOK_JSON_LIMIT; + + // Keep webhook payload support flexible without widening limits globally. + app.use('/v1/webhook', json({ limit: webhookJsonLimit, verify: captureRawBody })); + app.use(json({ limit: jsonLimit, verify: captureRawBody })); + app.use( + urlencoded({ + extended: true, + limit: urlencodedLimit, + verify: captureRawBody, + }), + ); + + app.use((err: unknown, _req: Request, res: Response, next: NextFunction) => { + const bodyParserError = err as { type?: string; status?: number } | undefined; + + if ( + bodyParserError?.type === 'entity.too.large' || + bodyParserError?.status === 413 + ) { + res.status(413).json({ + statusCode: 413, + error: 'Payload Too Large', + message: `Payload too large. Maximum request body size is ${jsonLimit}.`, + }); + return; + } + + next(err); + }); +} diff --git a/backend/src/creators/creators.controller.spec.ts b/backend/src/creators/creators.controller.spec.ts index b551d712..43b24046 100644 --- a/backend/src/creators/creators.controller.spec.ts +++ b/backend/src/creators/creators.controller.spec.ts @@ -14,6 +14,7 @@ describe('CreatorsController', () => { createPlan: jest.fn(), findAllPlans: jest.fn(), findCreatorPlans: jest.fn(), + getPayoutHistory: jest.fn(), }; beforeEach(async () => { @@ -374,4 +375,34 @@ describe('CreatorsController', () => { }); }); }); + + describe('getPayoutHistory', () => { + it('calls service with creator address and query', () => { + const mockResponse = { + data: [ + { + checkoutId: 'chk_1', + creatorAddress: 'GCREATOR', + fanAddress: 'GFAN', + amount: '10', + assetCode: 'XLM', + txHash: 'tx-1', + payoutAt: new Date().toISOString(), + }, + ], + nextCursor: null, + hasMore: false, + }; + mockCreatorsService.getPayoutHistory.mockReturnValue(mockResponse); + + const query = { from: '2026-03-01T00:00:00.000Z', limit: 20 }; + const result = controller.getPayoutHistory('GCREATOR', query); + + expect(mockCreatorsService.getPayoutHistory).toHaveBeenCalledWith( + 'GCREATOR', + query, + ); + expect(result).toEqual(mockResponse); + }); + }); }); diff --git a/backend/src/creators/creators.controller.ts b/backend/src/creators/creators.controller.ts index 14694910..fe109d2c 100644 --- a/backend/src/creators/creators.controller.ts +++ b/backend/src/creators/creators.controller.ts @@ -5,6 +5,8 @@ import { PaginationDto, PaginatedResponseDto } from '../common/dto'; import { PlanDto } from './dto/plan.dto'; import { SearchCreatorsDto } from './dto/search-creators.dto'; import { PublicCreatorDto } from './dto/public-creator.dto'; +import { CreatorPayoutHistoryQueryDto } from './dto'; +import { CreatorPayoutHistoryResult } from '../subscriptions/subscriptions.service'; @ApiTags('creators') @Controller({ path: 'creators', version: '1' }) @@ -66,4 +68,19 @@ export class CreatorsController { ): PaginatedResponseDto { return this.creatorsService.findCreatorPlans(address, pagination); } + + @Get(':address/payout-history') + @ApiOperation({ + summary: 'List creator payout history with date filters and cursor pagination', + }) + @ApiResponse({ + status: 200, + description: 'Creator payout history page', + }) + getPayoutHistory( + @Param('address') address: string, + @Query() query: CreatorPayoutHistoryQueryDto, + ): CreatorPayoutHistoryResult { + return this.creatorsService.getPayoutHistory(address, query); + } } diff --git a/backend/src/creators/creators.module.ts b/backend/src/creators/creators.module.ts index 8e675961..2bc9fa27 100644 --- a/backend/src/creators/creators.module.ts +++ b/backend/src/creators/creators.module.ts @@ -3,9 +3,10 @@ import { TypeOrmModule } from '@nestjs/typeorm'; import { CreatorsController } from './creators.controller'; import { CreatorsService } from './creators.service'; import { User } from '../users/entities/user.entity'; +import { SubscriptionsModule } from '../subscriptions/subscriptions.module'; @Module({ - imports: [TypeOrmModule.forFeature([User])], + imports: [TypeOrmModule.forFeature([User]), SubscriptionsModule], controllers: [CreatorsController], providers: [CreatorsService], exports: [CreatorsService], diff --git a/backend/src/creators/creators.service.spec.ts b/backend/src/creators/creators.service.spec.ts index 524163d5..e0ab5284 100644 --- a/backend/src/creators/creators.service.spec.ts +++ b/backend/src/creators/creators.service.spec.ts @@ -5,10 +5,12 @@ import { CreatorsService } from './creators.service'; import { User, UserRole } from '../users/entities/user.entity'; import { EventBus } from '../events/event-bus'; import { SearchCreatorsDto } from './dto/search-creators.dto'; +import { SubscriptionsService } from '../subscriptions/subscriptions.service'; describe('CreatorsService', () => { let service: CreatorsService; let mockQueryBuilder: Partial>; + let subscriptionsService: { getCreatorPayoutHistory: jest.Mock }; beforeEach(async () => { // Create mock query builder @@ -25,6 +27,8 @@ describe('CreatorsService', () => { getRawAndEntities: jest.fn(), }; + subscriptionsService = { getCreatorPayoutHistory: jest.fn() }; + const module: TestingModule = await Test.createTestingModule({ providers: [ CreatorsService, @@ -39,6 +43,10 @@ describe('CreatorsService', () => { provide: EventBus, useValue: { publish: jest.fn() }, }, + { + provide: SubscriptionsService, + useValue: subscriptionsService, + }, ], }).compile(); @@ -445,6 +453,28 @@ describe('CreatorsService', () => { }); }); }); + + describe('getPayoutHistory', () => { + it('delegates payout history query to subscriptions service', () => { + const response = { data: [], nextCursor: null, hasMore: false }; + subscriptionsService.getCreatorPayoutHistory.mockReturnValue(response); + + const result = service.getPayoutHistory('GCREATOR', { + from: '2026-01-01T00:00:00.000Z', + to: '2026-01-31T23:59:59.999Z', + limit: 20, + }); + + expect(subscriptionsService.getCreatorPayoutHistory).toHaveBeenCalledWith({ + creatorAddress: 'GCREATOR', + from: '2026-01-01T00:00:00.000Z', + to: '2026-01-31T23:59:59.999Z', + cursor: undefined, + limit: 20, + }); + expect(result).toEqual(response); + }); + }); }); // Helper function to create mock users diff --git a/backend/src/creators/creators.service.ts b/backend/src/creators/creators.service.ts index 5cfdf586..dfd8dee1 100644 --- a/backend/src/creators/creators.service.ts +++ b/backend/src/creators/creators.service.ts @@ -5,6 +5,11 @@ import { PaginationDto, PaginatedResponseDto } from '../common/dto'; import { SearchCreatorsDto } from './dto/search-creators.dto'; import { PublicCreatorDto } from './dto/public-creator.dto'; import { User } from '../users/entities/user.entity'; +import { + CreatorPayoutHistoryResult, + SubscriptionsService, +} from '../subscriptions/subscriptions.service'; +import { CreatorPayoutHistoryQueryDto } from './dto'; export interface Plan { id: number; @@ -22,6 +27,7 @@ export class CreatorsService { constructor( @InjectRepository(User) private readonly userRepository: Repository, + private readonly subscriptionsService: SubscriptionsService, ) {} createPlan(creator: string, asset: string, amount: string, intervalDays: number): Plan { @@ -83,4 +89,17 @@ export class CreatorsService { return new PaginatedResponseDto(data, total, page, limit); } + + getPayoutHistory( + creatorAddress: string, + query: CreatorPayoutHistoryQueryDto, + ): CreatorPayoutHistoryResult { + return this.subscriptionsService.getCreatorPayoutHistory({ + creatorAddress, + from: query.from, + to: query.to, + cursor: query.cursor, + limit: query.limit, + }); + } } diff --git a/backend/src/creators/dto/creator-payout-history-query.dto.ts b/backend/src/creators/dto/creator-payout-history-query.dto.ts new file mode 100644 index 00000000..82654ad0 --- /dev/null +++ b/backend/src/creators/dto/creator-payout-history-query.dto.ts @@ -0,0 +1,41 @@ +import { ApiPropertyOptional } from '@nestjs/swagger'; +import { Type } from 'class-transformer'; +import { IsISO8601, IsInt, IsOptional, IsString, Max, Min } from 'class-validator'; + +export class CreatorPayoutHistoryQueryDto { + @ApiPropertyOptional({ + description: 'Inclusive start of payout window (ISO-8601)', + example: '2026-03-01T00:00:00.000Z', + }) + @IsOptional() + @IsISO8601() + from?: string; + + @ApiPropertyOptional({ + description: 'Inclusive end of payout window (ISO-8601)', + example: '2026-03-31T23:59:59.999Z', + }) + @IsOptional() + @IsISO8601() + to?: string; + + @ApiPropertyOptional({ + description: 'Opaque cursor returned by previous response', + }) + @IsOptional() + @IsString() + cursor?: string; + + @ApiPropertyOptional({ + description: 'Number of payouts per page', + default: 20, + minimum: 1, + maximum: 100, + }) + @IsOptional() + @Type(() => Number) + @IsInt() + @Min(1) + @Max(100) + limit?: number = 20; +} diff --git a/backend/src/creators/dto/index.ts b/backend/src/creators/dto/index.ts index ef3d7f0b..5dac791a 100644 --- a/backend/src/creators/dto/index.ts +++ b/backend/src/creators/dto/index.ts @@ -1,3 +1,4 @@ export * from './plan.dto'; export * from './search-creators.dto'; export * from './public-creator.dto'; +export * from './creator-payout-history-query.dto'; diff --git a/backend/src/indexer/replay/chain-event-replay.runner.spec.ts b/backend/src/indexer/replay/chain-event-replay.runner.spec.ts new file mode 100644 index 00000000..d03deb9c --- /dev/null +++ b/backend/src/indexer/replay/chain-event-replay.runner.spec.ts @@ -0,0 +1,104 @@ +import { + ChainEventRecord, + ChainEventReplayRunner, + ChainEventSource, + ChainEventStore, +} from './chain-event-replay.runner'; + +class InMemorySource implements ChainEventSource { + constructor(private readonly pages: ChainEventRecord[][]) {} + private pageIndex = 0; + + async fetchPage(cursor: string, _limit: number) { + const events = this.pages[this.pageIndex] ?? []; + this.pageIndex += 1; + const nextCursor = + events.length > 0 ? String(events[events.length - 1].paging_token) : cursor; + return { events, nextCursor }; + } +} + +class InMemoryStore implements ChainEventStore { + private readonly seen = new Set(); + + async init() {} + + async persistBatch(events: ChainEventRecord[], dryRun: boolean) { + let inserted = 0; + for (const event of events) { + if (!this.seen.has(event.paging_token)) { + if (!dryRun) { + this.seen.add(event.paging_token); + } + inserted += 1; + } + } + return { inserted, duplicates: events.length - inserted }; + } + + async close() {} +} + +function event(cursor: string): ChainEventRecord { + return { + id: `event-${cursor}`, + paging_token: cursor, + type: 'payment', + }; +} + +describe('ChainEventReplayRunner', () => { + it('replays until source is exhausted', async () => { + const source = new InMemorySource([[event('10'), event('11')], [event('12')], []]); + const store = new InMemoryStore(); + const runner = new ChainEventReplayRunner(source, store); + + const summary = await runner.replay({ + startCursor: '9', + limit: 200, + dryRun: false, + }); + + expect(summary.fetched).toBe(3); + expect(summary.inserted).toBe(3); + expect(summary.duplicates).toBe(0); + expect(summary.finalCursor).toBe('12'); + }); + + it('stops at end cursor and only processes <= end cursor', async () => { + const source = new InMemorySource([[event('10'), event('11')], [event('12')], []]); + const store = new InMemoryStore(); + const runner = new ChainEventReplayRunner(source, store); + + const summary = await runner.replay({ + startCursor: '9', + endCursor: '11', + limit: 200, + dryRun: false, + }); + + expect(summary.fetched).toBe(2); + expect(summary.inserted).toBe(2); + expect(summary.finalCursor).toBe('11'); + }); + + it('reports dry-run inserts without mutating state', async () => { + const source = new InMemorySource([[event('10'), event('11')], []]); + const store = new InMemoryStore(); + const runner = new ChainEventReplayRunner(source, store); + + const dryRunSummary = await runner.replay({ + startCursor: '9', + limit: 200, + dryRun: true, + }); + const replaySummary = await runner.replay({ + startCursor: '9', + limit: 200, + dryRun: false, + }); + + expect(dryRunSummary.inserted).toBe(2); + expect(replaySummary.inserted).toBe(2); + }); +}); diff --git a/backend/src/indexer/replay/chain-event-replay.runner.ts b/backend/src/indexer/replay/chain-event-replay.runner.ts new file mode 100644 index 00000000..a5e5b795 --- /dev/null +++ b/backend/src/indexer/replay/chain-event-replay.runner.ts @@ -0,0 +1,127 @@ +export interface ChainEventRecord { + id: string; + paging_token: string; + type: string; + transaction_hash?: string; + source_account?: string; + created_at?: string; + [key: string]: unknown; +} + +export interface FetchPageResult { + events: ChainEventRecord[]; + nextCursor: string; +} + +export interface ChainEventSource { + fetchPage(cursor: string, limit: number): Promise; +} + +export interface PersistResult { + inserted: number; + duplicates: number; +} + +export interface ChainEventStore { + init(): Promise; + persistBatch(events: ChainEventRecord[], dryRun: boolean): Promise; + close(): Promise; +} + +export interface ReplayOptions { + startCursor: string; + endCursor?: string; + limit: number; + dryRun: boolean; +} + +export interface ReplaySummary { + startCursor: string; + endCursor?: string; + finalCursor: string; + pages: number; + fetched: number; + inserted: number; + duplicates: number; + dryRun: boolean; +} + +function compareCursor(a: string, b: string): number { + try { + const left = BigInt(a); + const right = BigInt(b); + if (left === right) return 0; + return left < right ? -1 : 1; + } catch { + if (a === b) return 0; + return a < b ? -1 : 1; + } +} + +export class ChainEventReplayRunner { + constructor( + private readonly source: ChainEventSource, + private readonly store: ChainEventStore, + ) {} + + async replay(options: ReplayOptions): Promise { + await this.store.init(); + + let cursor = options.startCursor; + let pages = 0; + let fetched = 0; + let inserted = 0; + let duplicates = 0; + let done = false; + + while (!done) { + const page = await this.source.fetchPage(cursor, options.limit); + pages += 1; + + if (page.events.length === 0) { + done = true; + break; + } + + const selectedEvents = + options.endCursor === undefined + ? page.events + : page.events.filter( + (event) => compareCursor(event.paging_token, options.endCursor as string) <= 0, + ); + + fetched += selectedEvents.length; + + if (selectedEvents.length > 0) { + const result = await this.store.persistBatch(selectedEvents, options.dryRun); + inserted += result.inserted; + duplicates += result.duplicates; + } + + const pageLastCursor = page.nextCursor; + cursor = pageLastCursor; + + if ( + options.endCursor !== undefined && + compareCursor(pageLastCursor, options.endCursor) >= 0 + ) { + done = true; + } else if (page.events.length < options.limit) { + done = true; + } + } + + await this.store.close(); + + return { + startCursor: options.startCursor, + endCursor: options.endCursor, + finalCursor: cursor, + pages, + fetched, + inserted, + duplicates, + dryRun: options.dryRun, + }; + } +} diff --git a/backend/src/indexer/replay/horizon-chain-event.source.ts b/backend/src/indexer/replay/horizon-chain-event.source.ts new file mode 100644 index 00000000..a375ece3 --- /dev/null +++ b/backend/src/indexer/replay/horizon-chain-event.source.ts @@ -0,0 +1,33 @@ +import { ChainEventRecord, ChainEventSource, FetchPageResult } from './chain-event-replay.runner'; + +interface HorizonOperationResponse { + _embedded?: { + records?: ChainEventRecord[]; + }; +} + +export class HorizonChainEventSource implements ChainEventSource { + constructor(private readonly horizonBaseUrl: string) {} + + async fetchPage(cursor: string, limit: number): Promise { + const url = new URL('/operations', this.horizonBaseUrl); + url.searchParams.set('order', 'asc'); + url.searchParams.set('cursor', cursor); + url.searchParams.set('limit', String(limit)); + + const response = await fetch(url.toString(), { + headers: { Accept: 'application/json' }, + }); + + if (!response.ok) { + throw new Error(`Horizon request failed (${response.status} ${response.statusText})`); + } + + const payload = (await response.json()) as HorizonOperationResponse; + const events = payload._embedded?.records ?? []; + const nextCursor = + events.length > 0 ? String(events[events.length - 1].paging_token) : cursor; + + return { events, nextCursor }; + } +} diff --git a/backend/src/indexer/replay/postgres-chain-event.store.ts b/backend/src/indexer/replay/postgres-chain-event.store.ts new file mode 100644 index 00000000..d1746fc2 --- /dev/null +++ b/backend/src/indexer/replay/postgres-chain-event.store.ts @@ -0,0 +1,93 @@ +import { Pool } from 'pg'; +import { + ChainEventRecord, + ChainEventStore, + PersistResult, +} from './chain-event-replay.runner'; + +export class PostgresChainEventStore implements ChainEventStore { + private readonly pool: Pool; + + constructor(databaseUrl: string) { + this.pool = new Pool({ connectionString: databaseUrl }); + } + + async init(): Promise { + await this.pool.query(` + CREATE TABLE IF NOT EXISTS chain_event_replay ( + paging_token TEXT PRIMARY KEY, + event_id TEXT NOT NULL UNIQUE, + event_type TEXT NOT NULL, + tx_hash TEXT, + source_account TEXT, + ledger_closed_at TIMESTAMPTZ, + payload JSONB NOT NULL, + replayed_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + `); + } + + async persistBatch( + events: ChainEventRecord[], + dryRun: boolean, + ): Promise { + if (events.length === 0) { + return { inserted: 0, duplicates: 0 }; + } + + if (dryRun) { + const duplicates = await this.countExisting(events.map((event) => event.paging_token)); + return { inserted: events.length - duplicates, duplicates }; + } + + let inserted = 0; + for (const event of events) { + const result = await this.pool.query( + ` + INSERT INTO chain_event_replay ( + paging_token, + event_id, + event_type, + tx_hash, + source_account, + ledger_closed_at, + payload + ) + VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb) + ON CONFLICT (paging_token) DO NOTHING + `, + [ + String(event.paging_token), + String(event.id), + String(event.type), + event.transaction_hash ? String(event.transaction_hash) : null, + event.source_account ? String(event.source_account) : null, + event.created_at ? new Date(String(event.created_at)) : null, + JSON.stringify(event), + ], + ); + + inserted += result.rowCount ?? 0; + } + + return { inserted, duplicates: events.length - inserted }; + } + + async close(): Promise { + await this.pool.end(); + } + + private async countExisting(pagingTokens: string[]): Promise { + const uniqueTokens = Array.from(new Set(pagingTokens)); + const result = await this.pool.query<{ total: string }>( + ` + SELECT COUNT(*)::text AS total + FROM chain_event_replay + WHERE paging_token = ANY($1::text[]) + `, + [uniqueTokens], + ); + const total = result.rows[0]?.total ?? '0'; + return Number.parseInt(total, 10); + } +} diff --git a/backend/src/main.ts b/backend/src/main.ts index 0ec10b27..d156660a 100644 --- a/backend/src/main.ts +++ b/backend/src/main.ts @@ -5,12 +5,13 @@ import { StartupProbeService } from './health/startup-probe.service'; import { getDataSourceToken } from '@nestjs/typeorm'; import { DataSource } from 'typeorm'; import { validateRequiredSecrets } from './common/secrets-validation'; +import { configureBodyParserLimits } from './common/http/body-parser.config'; async function bootstrap() { // Fail fast if any required secret is absent — before the app is created. validateRequiredSecrets(); - const app = await NestFactory.create(AppModule); + const app = await NestFactory.create(AppModule, { bodyParser: false }); // Enable versioning (URI versioning like /v1/...) app.enableVersioning({ @@ -20,6 +21,7 @@ async function bootstrap() { // Global validation pipe app.useGlobalPipes(new ValidationPipe({ whitelist: true, transform: true })); + configureBodyParserLimits(app); const probeService = app.get(StartupProbeService); diff --git a/backend/src/subscriptions/subscriptions.service.spec.ts b/backend/src/subscriptions/subscriptions.service.spec.ts index 91408007..31441497 100644 --- a/backend/src/subscriptions/subscriptions.service.spec.ts +++ b/backend/src/subscriptions/subscriptions.service.spec.ts @@ -259,4 +259,74 @@ describe('SubscriptionsService', () => { expect(r.indexed?.planId).toBe(1); }); }); + + describe('getCreatorPayoutHistory', () => { + const creatorAddress = 'GAAAAAAAAAAAAAAA'; + const fanAddress = `G${'C'.repeat(55)}`; + + beforeEach(() => { + jest.useFakeTimers(); + jest.setSystemTime(new Date('2026-03-27T12:00:00.000Z')); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + it('returns completed payouts including txHash references', () => { + const first = service.createCheckout(fanAddress, creatorAddress, 1); + service.confirmSubscription(first.id, 'tx-first'); + jest.advanceTimersByTime(1000); + const second = service.createCheckout(fanAddress, creatorAddress, 1); + service.confirmSubscription(second.id, 'tx-second'); + + const result = service.getCreatorPayoutHistory({ creatorAddress, limit: 10 }); + + expect(result.data).toHaveLength(2); + expect(result.data[0].txHash).toBe('tx-second'); + expect(result.data[1].txHash).toBe('tx-first'); + expect(result.hasMore).toBe(false); + expect(result.nextCursor).toBeNull(); + }); + + it('applies date-range filters', () => { + const older = service.createCheckout(fanAddress, creatorAddress, 1); + service.confirmSubscription(older.id, 'tx-old'); + jest.setSystemTime(new Date('2026-03-28T12:00:00.000Z')); + const newer = service.createCheckout(fanAddress, creatorAddress, 1); + service.confirmSubscription(newer.id, 'tx-new'); + + const result = service.getCreatorPayoutHistory({ + creatorAddress, + from: '2026-03-28T00:00:00.000Z', + to: '2026-03-29T00:00:00.000Z', + limit: 10, + }); + + expect(result.data).toHaveLength(1); + expect(result.data[0].txHash).toBe('tx-new'); + }); + + it('supports cursor pagination', () => { + const first = service.createCheckout(fanAddress, creatorAddress, 1); + service.confirmSubscription(first.id, 'tx-first'); + jest.advanceTimersByTime(1000); + const second = service.createCheckout(fanAddress, creatorAddress, 1); + service.confirmSubscription(second.id, 'tx-second'); + + const pageOne = service.getCreatorPayoutHistory({ creatorAddress, limit: 1 }); + expect(pageOne.data).toHaveLength(1); + expect(pageOne.hasMore).toBe(true); + expect(pageOne.nextCursor).toBeTruthy(); + + const pageTwo = service.getCreatorPayoutHistory({ + creatorAddress, + limit: 1, + cursor: pageOne.nextCursor as string, + }); + + expect(pageTwo.data).toHaveLength(1); + expect(pageTwo.data[0].txHash).toBe('tx-first'); + }); + }); }); diff --git a/backend/src/subscriptions/subscriptions.service.ts b/backend/src/subscriptions/subscriptions.service.ts index c58c458b..09c46807 100644 --- a/backend/src/subscriptions/subscriptions.service.ts +++ b/backend/src/subscriptions/subscriptions.service.ts @@ -61,6 +61,23 @@ interface Checkout { updatedAt: Date; } +export interface CreatorPayoutHistoryItem { + checkoutId: string; + creatorAddress: string; + fanAddress: string; + amount: string; + assetCode: string; + assetIssuer?: string; + txHash: string; + payoutAt: string; +} + +export interface CreatorPayoutHistoryResult { + data: CreatorPayoutHistoryItem[]; + nextCursor: string | null; + hasMore: boolean; +} + interface Plan { id: number; creator: string; @@ -453,6 +470,66 @@ export class SubscriptionsService { }; } + getCreatorPayoutHistory(params: { + creatorAddress: string; + from?: string; + to?: string; + cursor?: string; + limit?: number; + }): CreatorPayoutHistoryResult { + const limit = params.limit ?? 20; + const fromDate = params.from ? new Date(params.from) : undefined; + const toDate = params.to ? new Date(params.to) : undefined; + + const filtered = Array.from(this.checkouts.values()) + .filter((checkout) => checkout.creatorAddress === params.creatorAddress) + .filter((checkout) => checkout.status === CheckoutStatus.COMPLETED) + .filter((checkout) => Boolean(checkout.txHash)) + .filter((checkout) => { + if (!fromDate) return true; + return checkout.updatedAt >= fromDate; + }) + .filter((checkout) => { + if (!toDate) return true; + return checkout.updatedAt <= toDate; + }) + .sort((a, b) => { + const byDate = b.updatedAt.getTime() - a.updatedAt.getTime(); + if (byDate !== 0) return byDate; + return b.id.localeCompare(a.id); + }); + + const cursorData = params.cursor ? this.decodeCursor(params.cursor) : null; + const paged = cursorData + ? filtered.filter((checkout) => { + const ts = checkout.updatedAt.getTime(); + if (ts < cursorData.timestamp) return true; + if (ts > cursorData.timestamp) return false; + return checkout.id < cursorData.checkoutId; + }) + : filtered; + + const selected = paged.slice(0, limit); + const hasMore = paged.length > limit; + const last = selected[selected.length - 1]; + const nextCursor = hasMore && last ? this.encodeCursor(last.updatedAt, last.id) : null; + + return { + data: selected.map((checkout) => ({ + checkoutId: checkout.id, + creatorAddress: checkout.creatorAddress, + fanAddress: checkout.fanAddress, + amount: checkout.amount, + assetCode: checkout.assetCode, + assetIssuer: checkout.assetIssuer, + txHash: checkout.txHash as string, + payoutAt: checkout.updatedAt.toISOString(), + })), + nextCursor, + hasMore, + }; + } + confirmSubscription(checkoutId: string, txHash?: string) { const checkout = this.getCheckout(checkoutId); @@ -565,4 +642,22 @@ export class SubscriptionsService { this.logger.error(`Failed to emit renewal failure event: ${message}`); }); } + + private encodeCursor(payoutDate: Date, checkoutId: string): string { + return Buffer.from(`${payoutDate.getTime()}:${checkoutId}`).toString('base64url'); + } + + private decodeCursor(cursor: string): { timestamp: number; checkoutId: string } { + try { + const decoded = Buffer.from(cursor, 'base64url').toString('utf8'); + const [timestampRaw, checkoutId] = decoded.split(':'); + const timestamp = Number.parseInt(timestampRaw, 10); + if (!Number.isFinite(timestamp) || !checkoutId) { + throw new Error('Invalid cursor'); + } + return { timestamp, checkoutId }; + } catch { + throw new BadRequestException('Invalid payout history cursor'); + } + } } diff --git a/backend/test/body-parser-limits.e2e-spec.ts b/backend/test/body-parser-limits.e2e-spec.ts new file mode 100644 index 00000000..86ccfa35 --- /dev/null +++ b/backend/test/body-parser-limits.e2e-spec.ts @@ -0,0 +1,75 @@ +import { Body, Controller, INestApplication, Module, Post } from '@nestjs/common'; +import { Test, TestingModule } from '@nestjs/testing'; +import request from 'supertest'; +import { configureBodyParserLimits } from '../src/common/http/body-parser.config'; + +@Controller() +class PayloadController { + @Post('payload') + handlePayload(@Body() body: { data: string }) { + return { received: body.data.length }; + } + + @Post('v1/webhook') + handleWebhook(@Body() body: { data: string }) { + return { received: body.data.length }; + } +} + +@Module({ + controllers: [PayloadController], +}) +class PayloadTestModule {} + +describe('Body parser limits (e2e)', () => { + let app: INestApplication; + + beforeEach(async () => { + const moduleFixture: TestingModule = await Test.createTestingModule({ + imports: [PayloadTestModule], + }).compile(); + + app = moduleFixture.createNestApplication({ bodyParser: false }); + configureBodyParserLimits(app); + await app.init(); + }); + + afterEach(async () => { + await app.close(); + }); + + it('accepts normal payloads', async () => { + const response = await request(app.getHttpServer()) + .post('/payload') + .send({ data: 'safe-body' }) + .expect(201); + + expect(response.body).toEqual({ received: 9 }); + }); + + it('rejects oversized payloads with explicit 413 error', async () => { + const oversized = 'a'.repeat(120 * 1024); + + const response = await request(app.getHttpServer()) + .post('/payload') + .send({ data: oversized }) + .expect(413); + + expect(response.body).toEqual({ + statusCode: 413, + error: 'Payload Too Large', + message: 'Payload too large. Maximum request body size is 100kb.', + }); + }); + + it('allows larger payloads on webhook override endpoint', async () => { + const webhookSizedPayload = 'a'.repeat(200 * 1024); + + const response = await request(app.getHttpServer()) + .post('/v1/webhook') + .send({ data: webhookSizedPayload }) + .expect(201); + + expect(response.body).toEqual({ received: 200 * 1024 }); + }); +});