Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ $ npm run start:dev
$ npm run start:prod
```

## Chain replay CLI

Recover missed historical chain events into DB:

```bash
$ npm run replay:chain-events -- --start-cursor <cursor> --end-cursor <cursor> --dry-run
```

Detailed runbook: `docs/CHAIN_EVENT_REPLAY.md`

## Run tests

```bash
Expand Down
58 changes: 58 additions & 0 deletions backend/docs/CHAIN_EVENT_REPLAY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Chain Event Replay CLI

## Purpose

Replay historical Stellar chain events into PostgreSQL to recover indexer gaps safely.

## Command

```bash
npm run replay:chain-events -- --start-cursor <cursor> [--end-cursor <cursor>] [--dry-run] [--limit 200]
```

## Required Environment

- `DATABASE_URL`: PostgreSQL connection string
- `HORIZON_URL` (optional): Horizon base URL (defaults to `https://horizon-testnet.stellar.org`)

## Parameters

- `--start-cursor` (required): Inclusive starting cursor for replay.
- `--end-cursor` (optional): Inclusive ending cursor; replay stops once this cursor is reached.
- `--dry-run` (optional): Reads and evaluates events but does not write to DB.
- `--limit` (optional): Page size per request (1..200, default `200`).

## Idempotency and Safety

- Replayed records are stored in table `chain_event_replay`.
- Each record is keyed by `paging_token` and deduplicated with `ON CONFLICT DO NOTHING`.
- Re-running the same range is safe; duplicates are skipped and reported in summary output.

## Dry-Run Workflow (Recommended)

1. Validate scope and expected volume:

```bash
npm run replay:chain-events -- --start-cursor 123 --end-cursor 999 --dry-run
```

2. Execute replay:

```bash
npm run replay:chain-events -- --start-cursor 123 --end-cursor 999
```

## Output

The CLI prints JSON summary:

- `startCursor`
- `endCursor`
- `finalCursor`
- `pages`
- `fetched`
- `inserted`
- `duplicates`
- `dryRun`

Use `finalCursor` as a checkpoint for subsequent replay windows.
4 changes: 0 additions & 4 deletions backend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"test:watch": "jest --watch",
"test:cov": "jest --coverage",
"test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand",
"test:e2e": "jest --config ./test/jest-e2e.json"
"test:e2e": "jest --config ./test/jest-e2e.json",
"replay:chain-events": "ts-node src/cli/replay-chain-events.ts"
},
"dependencies": {
"@nestjs/common": "^11.1.17",
Expand Down
88 changes: 88 additions & 0 deletions backend/src/cli/replay-chain-events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { ChainEventReplayRunner, ReplayOptions } from '../indexer/replay/chain-event-replay.runner';
import { HorizonChainEventSource } from '../indexer/replay/horizon-chain-event.source';
import { PostgresChainEventStore } from '../indexer/replay/postgres-chain-event.store';

interface CliArgs {
startCursor: string;
endCursor?: string;
dryRun: boolean;
limit: number;
}

function readArg(args: string[], key: string): string | undefined {
const index = args.findIndex((arg) => arg === key);
if (index === -1) return undefined;
return args[index + 1];
}

function parseArgs(argv: string[]): CliArgs {
const startCursor = readArg(argv, '--start-cursor');
if (!startCursor) {
throw new Error('Missing required argument: --start-cursor <cursor>');
}

const endCursor = readArg(argv, '--end-cursor');
const limitRaw = readArg(argv, '--limit');
const dryRun = argv.includes('--dry-run');

const limit = limitRaw ? Number.parseInt(limitRaw, 10) : 200;
if (!Number.isFinite(limit) || limit < 1 || limit > 200) {
throw new Error('Invalid --limit value. Expected an integer between 1 and 200.');
}

return {
startCursor,
endCursor,
dryRun,
limit,
};
}

function printUsage(): void {
// Keep this concise for operators running incidents.
console.log(
[
'Usage:',
' npm run replay:chain-events -- --start-cursor <cursor> [--end-cursor <cursor>] [--dry-run] [--limit 200]',
'',
'Environment:',
' DATABASE_URL PostgreSQL DSN for replay storage (required)',
' HORIZON_URL Horizon base URL (default: https://horizon-testnet.stellar.org)',
].join('\n'),
);
}

async function main(): Promise<void> {
try {
const args = parseArgs(process.argv.slice(2));
const databaseUrl = process.env.DATABASE_URL?.trim();
if (!databaseUrl) {
throw new Error('DATABASE_URL is required.');
}

const horizonUrl =
process.env.HORIZON_URL?.trim() || 'https://horizon-testnet.stellar.org';

const source = new HorizonChainEventSource(horizonUrl);
const store = new PostgresChainEventStore(databaseUrl);
const runner = new ChainEventReplayRunner(source, store);

const replayOptions: ReplayOptions = {
startCursor: args.startCursor,
endCursor: args.endCursor,
dryRun: args.dryRun,
limit: args.limit,
};

const summary = await runner.replay(replayOptions);

console.log(JSON.stringify({ ok: true, summary }, null, 2));
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.error(JSON.stringify({ ok: false, error: message }, null, 2));
printUsage();
process.exitCode = 1;
}
}

void main();
55 changes: 55 additions & 0 deletions backend/src/common/http/body-parser.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { INestApplication } from '@nestjs/common';
import { json, Request, Response, NextFunction, urlencoded } from 'express';

type RequestWithRawBody = Request & { rawBody?: Buffer };

const DEFAULT_JSON_LIMIT = '100kb';
const DEFAULT_URLENCODED_LIMIT = '100kb';
const DEFAULT_WEBHOOK_JSON_LIMIT = '1mb';

function captureRawBody(
req: Request,
_res: Response,
buffer: Buffer,
): void {
if (buffer.length > 0) {
(req as RequestWithRawBody).rawBody = Buffer.from(buffer);
}
}

export function configureBodyParserLimits(app: INestApplication): void {
const jsonLimit = process.env.BODY_JSON_LIMIT ?? DEFAULT_JSON_LIMIT;
const urlencodedLimit =
process.env.BODY_URLENCODED_LIMIT ?? DEFAULT_URLENCODED_LIMIT;
const webhookJsonLimit =
process.env.BODY_WEBHOOK_JSON_LIMIT ?? DEFAULT_WEBHOOK_JSON_LIMIT;

// Keep webhook payload support flexible without widening limits globally.
app.use('/v1/webhook', json({ limit: webhookJsonLimit, verify: captureRawBody }));
app.use(json({ limit: jsonLimit, verify: captureRawBody }));
app.use(
urlencoded({
extended: true,
limit: urlencodedLimit,
verify: captureRawBody,
}),
);

app.use((err: unknown, _req: Request, res: Response, next: NextFunction) => {
const bodyParserError = err as { type?: string; status?: number } | undefined;

if (
bodyParserError?.type === 'entity.too.large' ||
bodyParserError?.status === 413
) {
res.status(413).json({
statusCode: 413,
error: 'Payload Too Large',
message: `Payload too large. Maximum request body size is ${jsonLimit}.`,
});
return;
}

next(err);
});
}
31 changes: 31 additions & 0 deletions backend/src/creators/creators.controller.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ describe('CreatorsController', () => {
createPlan: jest.fn(),
findAllPlans: jest.fn(),
findCreatorPlans: jest.fn(),
getPayoutHistory: jest.fn(),
};

beforeEach(async () => {
Expand Down Expand Up @@ -374,4 +375,34 @@ describe('CreatorsController', () => {
});
});
});

describe('getPayoutHistory', () => {
it('calls service with creator address and query', () => {
const mockResponse = {
data: [
{
checkoutId: 'chk_1',
creatorAddress: 'GCREATOR',
fanAddress: 'GFAN',
amount: '10',
assetCode: 'XLM',
txHash: 'tx-1',
payoutAt: new Date().toISOString(),
},
],
nextCursor: null,
hasMore: false,
};
mockCreatorsService.getPayoutHistory.mockReturnValue(mockResponse);

const query = { from: '2026-03-01T00:00:00.000Z', limit: 20 };
const result = controller.getPayoutHistory('GCREATOR', query);

expect(mockCreatorsService.getPayoutHistory).toHaveBeenCalledWith(
'GCREATOR',
query,
);
expect(result).toEqual(mockResponse);
});
});
});
17 changes: 17 additions & 0 deletions backend/src/creators/creators.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { PaginationDto, PaginatedResponseDto } from '../common/dto';
import { PlanDto } from './dto/plan.dto';
import { SearchCreatorsDto } from './dto/search-creators.dto';
import { PublicCreatorDto } from './dto/public-creator.dto';
import { CreatorPayoutHistoryQueryDto } from './dto';
import { CreatorPayoutHistoryResult } from '../subscriptions/subscriptions.service';

@ApiTags('creators')
@Controller({ path: 'creators', version: '1' })
Expand Down Expand Up @@ -66,4 +68,19 @@ export class CreatorsController {
): PaginatedResponseDto<PlanDto> {
return this.creatorsService.findCreatorPlans(address, pagination);
}

@Get(':address/payout-history')
@ApiOperation({
summary: 'List creator payout history with date filters and cursor pagination',
})
@ApiResponse({
status: 200,
description: 'Creator payout history page',
})
getPayoutHistory(
@Param('address') address: string,
@Query() query: CreatorPayoutHistoryQueryDto,
): CreatorPayoutHistoryResult {
return this.creatorsService.getPayoutHistory(address, query);
}
}
3 changes: 2 additions & 1 deletion backend/src/creators/creators.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ import { TypeOrmModule } from '@nestjs/typeorm';
import { CreatorsController } from './creators.controller';
import { CreatorsService } from './creators.service';
import { User } from '../users/entities/user.entity';
import { SubscriptionsModule } from '../subscriptions/subscriptions.module';

@Module({
imports: [TypeOrmModule.forFeature([User])],
imports: [TypeOrmModule.forFeature([User]), SubscriptionsModule],
controllers: [CreatorsController],
providers: [CreatorsService],
exports: [CreatorsService],
Expand Down
30 changes: 30 additions & 0 deletions backend/src/creators/creators.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import { CreatorsService } from './creators.service';
import { User, UserRole } from '../users/entities/user.entity';
import { EventBus } from '../events/event-bus';
import { SearchCreatorsDto } from './dto/search-creators.dto';
import { SubscriptionsService } from '../subscriptions/subscriptions.service';

describe('CreatorsService', () => {
let service: CreatorsService;
let mockQueryBuilder: Partial<SelectQueryBuilder<User>>;
let subscriptionsService: { getCreatorPayoutHistory: jest.Mock };

beforeEach(async () => {
// Create mock query builder
Expand All @@ -25,6 +27,8 @@ describe('CreatorsService', () => {
getRawAndEntities: jest.fn(),
};

subscriptionsService = { getCreatorPayoutHistory: jest.fn() };

const module: TestingModule = await Test.createTestingModule({
providers: [
CreatorsService,
Expand All @@ -39,6 +43,10 @@ describe('CreatorsService', () => {
provide: EventBus,
useValue: { publish: jest.fn() },
},
{
provide: SubscriptionsService,
useValue: subscriptionsService,
},
],
}).compile();

Expand Down Expand Up @@ -445,6 +453,28 @@ describe('CreatorsService', () => {
});
});
});

describe('getPayoutHistory', () => {
it('delegates payout history query to subscriptions service', () => {
const response = { data: [], nextCursor: null, hasMore: false };
subscriptionsService.getCreatorPayoutHistory.mockReturnValue(response);

const result = service.getPayoutHistory('GCREATOR', {
from: '2026-01-01T00:00:00.000Z',
to: '2026-01-31T23:59:59.999Z',
limit: 20,
});

expect(subscriptionsService.getCreatorPayoutHistory).toHaveBeenCalledWith({
creatorAddress: 'GCREATOR',
from: '2026-01-01T00:00:00.000Z',
to: '2026-01-31T23:59:59.999Z',
cursor: undefined,
limit: 20,
});
expect(result).toEqual(response);
});
});
});

// Helper function to create mock users
Expand Down
Loading