diff --git a/apps/api/src/Analytics-Export/analytics-export.controller.spec.ts b/apps/api/src/Analytics-Export/analytics-export.controller.spec.ts new file mode 100644 index 0000000..d64f207 --- /dev/null +++ b/apps/api/src/Analytics-Export/analytics-export.controller.spec.ts @@ -0,0 +1,209 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { AnalyticsExportController } from '../analytics-export.controller'; +import { AnalyticsExportService } from '../analytics-export.service'; +import { ExportAnalyticsDto } from '../dto/export-analytics.dto'; +import { ExportJobStatusDto } from '../dto/export-response.dto'; +import { AnalyticsMetric } from '../enums/analytics-metric.enum'; +import { ExportStatus } from '../enums/export-status.enum'; + +// ─── Mocks ─────────────────────────────────────────────────────────────────── + +const mockService = () => ({ + initiateExport: jest.fn(), + getJobStatus: jest.fn(), + listUserJobs: jest.fn(), + buildSyncCsv: jest.fn(), + repository: { + findJobById: jest.fn(), + }, +}); + +const makeDto = (overrides: Partial = {}): ExportAnalyticsDto => ({ + metrics: [AnalyticsMetric.GAS_PRICE], + startDate: '2024-01-01T00:00:00.000Z', + endDate: '2024-01-31T23:59:59.000Z', + includeMetadata: true, + delimiter: ',', + dateFormat: 'iso', + timezone: 'UTC', + async: false, + ...overrides, +}); + +const mockRequest = (userId = 'user-123') => ({ + user: { id: userId }, +}); + +const mockResponse = () => { + const res: Record = {}; + res.setHeader = jest.fn().mockReturnValue(res); + res.status = jest.fn().mockReturnValue(res); + res.json = jest.fn().mockReturnValue(res); + res.send = jest.fn().mockReturnValue(res); + return res; +}; + +// ─── Tests ─────────────────────────────────────────────────────────────────── + +describe('AnalyticsExportController', () => { + let controller: AnalyticsExportController; + let service: ReturnType; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + controllers: [AnalyticsExportController], + providers: [{ provide: AnalyticsExportService, useFactory: mockService }], + }) + .overrideGuard(Object) // bypass AuthGuard + .useValue({ canActivate: () => true }) + .compile(); + + controller = module.get(AnalyticsExportController); + service = module.get(AnalyticsExportService); + }); + + // ─── exportAnalytics (sync) ─────────────────────────────────────────────── + + describe('exportAnalytics() — synchronous', () => { + it('should set CSV headers and send the CSV body', async () => { + const csvPayload = { + csv: 'ID,Metric\nrec-001,gas_price', + filename: 'analytics_gas_price_2024-01-01_to_2024-01-31.csv', + }; + service.initiateExport.mockResolvedValue(csvPayload); + + const req = mockRequest() as never; + const res = mockResponse() as never; + + await controller.exportAnalytics(makeDto(), req, res); + + expect((res as ReturnType).setHeader).toHaveBeenCalledWith('Content-Type', 'text/csv; charset=utf-8'); + expect((res as ReturnType).setHeader).toHaveBeenCalledWith( + 'Content-Disposition', + `attachment; filename="${csvPayload.filename}"`, + ); + expect((res as ReturnType).send).toHaveBeenCalledWith(csvPayload.csv); + }); + + it('should set X-Export-Row-Count header', async () => { + service.initiateExport.mockResolvedValue({ + csv: 'ID\nrec-001\nrec-002', + filename: 'test.csv', + }); + + const res = mockResponse() as never; + await controller.exportAnalytics(makeDto(), mockRequest() as never, res); + + expect((res as ReturnType).setHeader).toHaveBeenCalledWith('X-Export-Row-Count', '2'); + }); + }); + + // ─── exportAnalytics (async) ────────────────────────────────────────────── + + describe('exportAnalytics() — asynchronous', () => { + it('should return 202 ACCEPTED with job info for async export', async () => { + const jobResponse = { + jobId: 'job-abc', + status: ExportStatus.PENDING, + message: 'Export job queued successfully.', + statusUrl: '/analytics/export/jobs/job-abc', + createdAt: new Date().toISOString(), + }; + service.initiateExport.mockResolvedValue(jobResponse); + + const res = mockResponse() as never; + await controller.exportAnalytics(makeDto({ async: true }), mockRequest() as never, res); + + expect((res as ReturnType).status).toHaveBeenCalledWith(202); + expect((res as ReturnType).json).toHaveBeenCalledWith(jobResponse); + }); + }); + + // ─── listJobs ───────────────────────────────────────────────────────────── + + describe('listJobs()', () => { + it('should return the list of user jobs', async () => { + const jobs: ExportJobStatusDto[] = [ + { + jobId: 'job-1', + status: ExportStatus.COMPLETED, + rowCount: 200, + fileSizeBytes: 8192, + downloadUrl: '/download', + errorMessage: null, + createdAt: new Date().toISOString(), + completedAt: new Date().toISOString(), + }, + ]; + service.listUserJobs.mockResolvedValue(jobs); + + const result = await controller.listJobs(mockRequest() as never); + + expect(result).toEqual(jobs); + expect(service.listUserJobs).toHaveBeenCalledWith('user-123'); + }); + }); + + // ─── getJobStatus ───────────────────────────────────────────────────────── + + describe('getJobStatus()', () => { + it('should delegate to service and return status DTO', async () => { + const statusDto: ExportJobStatusDto = { + jobId: 'job-xyz', + status: ExportStatus.PROCESSING, + rowCount: null, + fileSizeBytes: null, + downloadUrl: null, + errorMessage: null, + createdAt: new Date().toISOString(), + completedAt: null, + }; + service.getJobStatus.mockResolvedValue(statusDto); + + const result = await controller.getJobStatus('job-xyz', mockRequest() as never); + + expect(result).toEqual(statusDto); + expect(service.getJobStatus).toHaveBeenCalledWith('job-xyz', 'user-123'); + }); + }); + + // ─── downloadJobResult ──────────────────────────────────────────────────── + + describe('downloadJobResult()', () => { + it('should return CSV for a completed job', async () => { + service.getJobStatus.mockResolvedValue({ + jobId: 'job-done', + status: ExportStatus.COMPLETED, + }); + service.repository.findJobById.mockResolvedValue({ + id: 'job-done', + options: {}, + }); + service.buildSyncCsv.mockResolvedValue({ + csv: 'ID,Metric\nrec-001,gas_price', + filename: 'test.csv', + }); + + const res = mockResponse() as never; + await controller.downloadJobResult('job-done', mockRequest() as never, res); + + expect((res as ReturnType).setHeader).toHaveBeenCalledWith('Content-Type', 'text/csv; charset=utf-8'); + expect((res as ReturnType).send).toHaveBeenCalled(); + }); + + it('should return 409 Conflict when job is not completed', async () => { + service.getJobStatus.mockResolvedValue({ + jobId: 'job-pending', + status: ExportStatus.PROCESSING, + }); + + const res = mockResponse() as never; + await controller.downloadJobResult('job-pending', mockRequest() as never, res); + + expect((res as ReturnType).status).toHaveBeenCalledWith(409); + expect((res as ReturnType).json).toHaveBeenCalledWith( + expect.objectContaining({ status: ExportStatus.PROCESSING }), + ); + }); + }); +}); diff --git a/apps/api/src/Analytics-Export/analytics-export.controller.ts b/apps/api/src/Analytics-Export/analytics-export.controller.ts new file mode 100644 index 0000000..c32a781 --- /dev/null +++ b/apps/api/src/Analytics-Export/analytics-export.controller.ts @@ -0,0 +1,142 @@ +import { + Controller, + Get, + Header, + HttpCode, + HttpStatus, + Param, + ParseUUIDPipe, + Post, + Query, + Req, + Res, + UseGuards, +} from '@nestjs/common'; +import { Response, Request } from 'express'; +import { ExportAnalyticsDto } from './dto/export-analytics.dto'; +import { + ExportJobResponseDto, + ExportJobStatusDto, +} from './dto/export-response.dto'; +import { AnalyticsExportService } from './analytics-export.service'; +import { ExportStatus } from './enums/export-status.enum'; + +/** + * Placeholder JWT guard — swap for your actual AuthGuard. + * e.g. @UseGuards(JwtAuthGuard) + */ +import { CanActivate, ExecutionContext, Injectable } from '@nestjs/common'; + +@Injectable() +class AuthGuard implements CanActivate { + canActivate(context: ExecutionContext): boolean { + // Replace with real JWT validation + const req = context.switchToHttp().getRequest(); + return !!(req as Request & { user?: unknown }).user; + } +} + +@Controller('analytics/export') +@UseGuards(AuthGuard) +export class AnalyticsExportController { + constructor(private readonly exportService: AnalyticsExportService) {} + + /** + * POST /analytics/export + * + * If async=false (default): returns a CSV file download. + * If async=true: enqueues a background job and returns a job reference. + */ + @Post() + @HttpCode(HttpStatus.OK) + async exportAnalytics( + @Query() dto: ExportAnalyticsDto, + @Req() req: Request & { user: { id: string } }, + @Res() res: Response, + ): Promise { + const userId = req.user.id; + const result = await this.exportService.initiateExport(userId, dto); + + if ('csv' in result) { + // Synchronous: stream CSV directly + const { csv, filename } = result; + res.setHeader('Content-Type', 'text/csv; charset=utf-8'); + res.setHeader( + 'Content-Disposition', + `attachment; filename="${filename}"`, + ); + res.setHeader('Content-Length', Buffer.byteLength(csv, 'utf8')); + res.setHeader('X-Export-Row-Count', String(csv.split('\n').length - 1)); + res.send(csv); + } else { + // Async: return job info as JSON + res.setHeader('Content-Type', 'application/json'); + res.status(HttpStatus.ACCEPTED).json(result); + } + } + + /** + * GET /analytics/export/jobs + * + * List recent export jobs for the authenticated user. + */ + @Get('jobs') + async listJobs( + @Req() req: Request & { user: { id: string } }, + ): Promise { + return this.exportService.listUserJobs(req.user.id); + } + + /** + * GET /analytics/export/jobs/:jobId + * + * Poll the status of an async export job. + */ + @Get('jobs/:jobId') + async getJobStatus( + @Param('jobId', ParseUUIDPipe) jobId: string, + @Req() req: Request & { user: { id: string } }, + ): Promise { + return this.exportService.getJobStatus(jobId, req.user.id); + } + + /** + * GET /analytics/export/jobs/:jobId/download + * + * Re-generate and download the CSV for a completed async job. + * In production, redirect to a presigned S3/GCS URL instead. + */ + @Get('jobs/:jobId/download') + async downloadJobResult( + @Param('jobId', ParseUUIDPipe) jobId: string, + @Req() req: Request & { user: { id: string } }, + @Res() res: Response, + ): Promise { + const job = await this.exportService.getJobStatus(jobId, req.user.id); + + if (job.status !== ExportStatus.COMPLETED) { + res.status(HttpStatus.CONFLICT).json({ + message: `Job is not completed yet. Current status: ${job.status}`, + status: job.status, + }); + return; + } + + // Retrieve the job entity and re-run the CSV build + const jobEntity = await this.exportService['repository'].findJobById(jobId); + if (!jobEntity) { + res.status(HttpStatus.NOT_FOUND).json({ message: 'Job not found' }); + return; + } + + const { csv, filename } = await this.exportService.buildSyncCsv( + req.user.id, + jobEntity.options, + ); + + res.setHeader('Content-Type', 'text/csv; charset=utf-8'); + res.setHeader('Content-Disposition', `attachment; filename="${filename}"`); + res.setHeader('Content-Length', Buffer.byteLength(csv, 'utf8')); + res.send(csv); + } +} diff --git a/apps/api/src/Analytics-Export/analytics-export.e2e.spec.ts b/apps/api/src/Analytics-Export/analytics-export.e2e.spec.ts new file mode 100644 index 0000000..83c1d27 --- /dev/null +++ b/apps/api/src/Analytics-Export/analytics-export.e2e.spec.ts @@ -0,0 +1,363 @@ +import { INestApplication, ValidationPipe } from '@nestjs/common'; +import { getQueueToken } from '@nestjs/bull'; +import { Test, TestingModule } from '@nestjs/testing'; +import { getRepositoryToken } from '@nestjs/typeorm'; +import * as request from 'supertest'; +import { AnalyticsExportController } from '../analytics-export.controller'; +import { AnalyticsExportRepository } from '../analytics-export.repository'; +import { + AnalyticsExportService, + ANALYTICS_EXPORT_QUEUE, +} from '../analytics-export.service'; +import { ExportJobEntity } from '../entities/export-job.entity'; +import { AnalyticsMetric } from '../enums/analytics-metric.enum'; +import { ExportStatus } from '../enums/export-status.enum'; +import { AnalyticsRecord } from '../interfaces/analytics-record.interface'; +import { ExportOptions } from '../interfaces/export-options.interface'; +import { CsvBuilderUtil } from '../utils/csv-builder.util'; + +// ─── Shared fixtures ────────────────────────────────────────────────────────── + +const USER_ID = 'e2e-user-001'; + +const makeRecord = (overrides: Partial = {}): AnalyticsRecord => ({ + id: 'rec-e2e-001', + metric: AnalyticsMetric.GAS_PRICE, + value: 55.0, + unit: 'Gwei', + metadata: { + baseFee: 50, + priorityFee: 5, + blockNumber: 20_000_000, + networkName: 'mainnet', + gasLimit: 21_000, + }, + networkId: 'net-e2e', + userId: USER_ID, + timestamp: new Date('2024-06-15T08:00:00.000Z'), + createdAt: new Date('2024-06-15T08:00:01.000Z'), + ...overrides, +}); + +const makeJobEntity = (overrides: Partial = {}): ExportJobEntity => + ({ + id: 'e2e-job-001', + userId: USER_ID, + status: ExportStatus.PENDING, + options: {} as ExportOptions, + rowCount: null, + fileSizeBytes: null, + downloadUrl: null, + errorMessage: null, + bullJobId: null, + createdAt: new Date('2024-06-15T08:00:00.000Z'), + updatedAt: new Date('2024-06-15T08:00:00.000Z'), + completedAt: null, + ...overrides, + } as ExportJobEntity); + +// ─── App bootstrap ──────────────────────────────────────────────────────────── + +async function createTestApp(): Promise<{ + app: INestApplication; + analyticsRepo: jest.Mocked; +}> { + const analyticsRepo = { + createJob: jest.fn(), + findJobById: jest.fn(), + findJobsByUser: jest.fn(), + updateJobStatus: jest.fn(), + markJobCompleted: jest.fn(), + markJobFailed: jest.fn(), + fetchAnalyticsRecords: jest.fn(), + countAnalyticsRecords: jest.fn(), + } as unknown as jest.Mocked; + + const mockQueue = { add: jest.fn().mockResolvedValue({ id: 'bull-001' }) }; + + const moduleRef: TestingModule = await Test.createTestingModule({ + controllers: [AnalyticsExportController], + providers: [ + AnalyticsExportService, + CsvBuilderUtil, + { provide: AnalyticsExportRepository, useValue: analyticsRepo }, + { provide: getRepositoryToken(ExportJobEntity), useValue: {} }, + { provide: getQueueToken(ANALYTICS_EXPORT_QUEUE), useValue: mockQueue }, + ], + }) + .overrideGuard(Object) + .useValue({ + canActivate: (ctx: import('@nestjs/common').ExecutionContext) => { + const req = ctx.switchToHttp().getRequest(); + req.user = { id: USER_ID }; + return true; + }, + }) + .compile(); + + const app = moduleRef.createNestApplication(); + app.useGlobalPipes(new ValidationPipe({ transform: true, whitelist: true })); + await app.init(); + + return { app, analyticsRepo }; +} + +// ─── Test suite ─────────────────────────────────────────────────────────────── + +describe('AnalyticsExport — E2E', () => { + let app: INestApplication; + let analyticsRepo: jest.Mocked; + + beforeAll(async () => { + ({ app, analyticsRepo } = await createTestApp()); + }); + + afterAll(async () => { + await app.close(); + }); + + beforeEach(() => jest.clearAllMocks()); + + // ─── POST /analytics/export (synchronous) ────────────────────────────────── + + describe('POST /analytics/export (sync)', () => { + const baseQuery = { + metrics: AnalyticsMetric.GAS_PRICE, + startDate: '2024-01-01T00:00:00.000Z', + endDate: '2024-01-31T23:59:59.000Z', + }; + + it('should return 200 with CSV content-type for a small dataset', async () => { + analyticsRepo.countAnalyticsRecords.mockResolvedValue(2); + analyticsRepo.fetchAnalyticsRecords.mockResolvedValue([makeRecord(), makeRecord({ id: 'rec-002' })]); + + const res = await request(app.getHttpServer()) + .post('/analytics/export') + .query(baseQuery) + .expect(200); + + expect(res.headers['content-type']).toContain('text/csv'); + expect(res.headers['content-disposition']).toContain('.csv'); + expect(res.text).toContain('ID,Metric'); + expect(res.text).toContain('rec-e2e-001'); + }); + + it('should include metadata columns in the CSV', async () => { + analyticsRepo.countAnalyticsRecords.mockResolvedValue(1); + analyticsRepo.fetchAnalyticsRecords.mockResolvedValue([makeRecord()]); + + const res = await request(app.getHttpServer()) + .post('/analytics/export') + .query({ ...baseQuery, includeMetadata: 'true' }) + .expect(200); + + expect(res.text).toContain('Base Fee (Gwei)'); + expect(res.text).toContain('Block Number'); + }); + + it('should use semicolon delimiter when requested', async () => { + analyticsRepo.countAnalyticsRecords.mockResolvedValue(1); + analyticsRepo.fetchAnalyticsRecords.mockResolvedValue([makeRecord()]); + + const res = await request(app.getHttpServer()) + .post('/analytics/export') + .query({ ...baseQuery, delimiter: ';', includeMetadata: 'false' }) + .expect(200); + + const header = res.text.split('\n')[0]; + expect(header).toContain(';'); + }); + + it('should include unix timestamps when dateFormat=unix', async () => { + analyticsRepo.countAnalyticsRecords.mockResolvedValue(1); + analyticsRepo.fetchAnalyticsRecords.mockResolvedValue([makeRecord()]); + + const res = await request(app.getHttpServer()) + .post('/analytics/export') + .query({ ...baseQuery, dateFormat: 'unix', includeMetadata: 'false' }) + .expect(200); + + // Unix timestamp for 2024-06-15T08:00:00Z + expect(res.text).toContain('1718438400'); + }); + + it('should return 400 when dataset exceeds sync limit', async () => { + analyticsRepo.countAnalyticsRecords.mockResolvedValue(50_000); + + await request(app.getHttpServer()) + .post('/analytics/export') + .query(baseQuery) + .expect(400); + }); + + it('should set X-Export-Row-Count response header', async () => { + analyticsRepo.countAnalyticsRecords.mockResolvedValue(1); + analyticsRepo.fetchAnalyticsRecords.mockResolvedValue([makeRecord()]); + + const res = await request(app.getHttpServer()) + .post('/analytics/export') + .query({ ...baseQuery, includeMetadata: 'false' }) + .expect(200); + + expect(res.headers['x-export-row-count']).toBeDefined(); + }); + + it('should return 400 for invalid metric value', async () => { + await request(app.getHttpServer()) + .post('/analytics/export') + .query({ ...baseQuery, metrics: 'invalid_metric' }) + .expect(400); + }); + + it('should return 400 when endDate is before startDate', async () => { + await request(app.getHttpServer()) + .post('/analytics/export') + .query({ + ...baseQuery, + startDate: '2024-12-31T00:00:00.000Z', + endDate: '2024-01-01T00:00:00.000Z', + }) + .expect(400); + }); + + it('should export multiple metric types in a single CSV', async () => { + analyticsRepo.countAnalyticsRecords.mockResolvedValue(2); + analyticsRepo.fetchAnalyticsRecords.mockResolvedValue([ + makeRecord({ metric: AnalyticsMetric.GAS_PRICE }), + makeRecord({ id: 'rec-002', metric: AnalyticsMetric.ALERT_TRIGGERED }), + ]); + + const res = await request(app.getHttpServer()) + .post('/analytics/export') + .query({ + metrics: [AnalyticsMetric.GAS_PRICE, AnalyticsMetric.ALERT_TRIGGERED], + startDate: '2024-01-01T00:00:00.000Z', + endDate: '2024-01-31T23:59:59.000Z', + includeMetadata: 'true', + }) + .expect(200); + + expect(res.text).toContain('Base Fee (Gwei)'); + expect(res.text).toContain('Alert Name'); + }); + }); + + // ─── POST /analytics/export (async) ─────────────────────────────────────── + + describe('POST /analytics/export (async)', () => { + it('should return 202 with job reference for async export', async () => { + analyticsRepo.createJob.mockResolvedValue(makeJobEntity()); + analyticsRepo.updateJobStatus.mockResolvedValue(undefined); + + const res = await request(app.getHttpServer()) + .post('/analytics/export') + .query({ + metrics: AnalyticsMetric.GAS_PRICE, + startDate: '2024-01-01T00:00:00.000Z', + endDate: '2024-01-31T23:59:59.000Z', + async: 'true', + }) + .expect(202); + + expect(res.body.jobId).toBe('e2e-job-001'); + expect(res.body.status).toBe(ExportStatus.PENDING); + expect(res.body.statusUrl).toContain('/analytics/export/jobs/'); + }); + }); + + // ─── GET /analytics/export/jobs ─────────────────────────────────────────── + + describe('GET /analytics/export/jobs', () => { + it('should list user export jobs', async () => { + analyticsRepo.findJobsByUser.mockResolvedValue([ + makeJobEntity({ status: ExportStatus.COMPLETED, rowCount: 100 }), + ]); + + const res = await request(app.getHttpServer()) + .get('/analytics/export/jobs') + .expect(200); + + expect(Array.isArray(res.body)).toBe(true); + expect(res.body[0].jobId).toBe('e2e-job-001'); + expect(res.body[0].status).toBe(ExportStatus.COMPLETED); + }); + + it('should return empty array when no jobs exist', async () => { + analyticsRepo.findJobsByUser.mockResolvedValue([]); + + const res = await request(app.getHttpServer()) + .get('/analytics/export/jobs') + .expect(200); + + expect(res.body).toEqual([]); + }); + }); + + // ─── GET /analytics/export/jobs/:jobId ──────────────────────────────────── + + describe('GET /analytics/export/jobs/:jobId', () => { + it('should return job status for a valid job ID', async () => { + analyticsRepo.findJobById.mockResolvedValue(makeJobEntity()); + + const res = await request(app.getHttpServer()) + .get('/analytics/export/jobs/e2e-job-001') + .expect(200); + + expect(res.body.jobId).toBe('e2e-job-001'); + expect(res.body.status).toBe(ExportStatus.PENDING); + }); + + it('should return 404 for an unknown job ID', async () => { + analyticsRepo.findJobById.mockResolvedValue(null); + + await request(app.getHttpServer()) + .get('/analytics/export/jobs/00000000-0000-0000-0000-000000000000') + .expect(404); + }); + + it('should return 400 for a non-UUID job ID', async () => { + await request(app.getHttpServer()) + .get('/analytics/export/jobs/not-a-uuid') + .expect(400); + }); + }); + + // ─── GET /analytics/export/jobs/:jobId/download ─────────────────────────── + + describe('GET /analytics/export/jobs/:jobId/download', () => { + it('should return CSV for a completed job', async () => { + analyticsRepo.findJobById.mockResolvedValue( + makeJobEntity({ + status: ExportStatus.COMPLETED, + rowCount: 1, + options: { + metrics: [AnalyticsMetric.GAS_PRICE], + startDate: new Date('2024-01-01'), + endDate: new Date('2024-01-31'), + includeMetadata: false, + delimiter: ',', + dateFormat: 'iso', + timezone: 'UTC', + } as ExportOptions, + }), + ); + analyticsRepo.fetchAnalyticsRecords.mockResolvedValue([makeRecord()]); + + const res = await request(app.getHttpServer()) + .get('/analytics/export/jobs/e2e-job-001/download') + .expect(200); + + expect(res.headers['content-type']).toContain('text/csv'); + }); + + it('should return 409 when job is still processing', async () => { + analyticsRepo.findJobById.mockResolvedValue(makeJobEntity({ status: ExportStatus.PROCESSING })); + + const res = await request(app.getHttpServer()) + .get('/analytics/export/jobs/e2e-job-001/download') + .expect(409); + + expect(res.body.status).toBe(ExportStatus.PROCESSING); + }); + }); +}); diff --git a/apps/api/src/Analytics-Export/analytics-export.module.ts b/apps/api/src/Analytics-Export/analytics-export.module.ts new file mode 100644 index 0000000..736c674 --- /dev/null +++ b/apps/api/src/Analytics-Export/analytics-export.module.ts @@ -0,0 +1,35 @@ +import { BullModule } from '@nestjs/bull'; +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { AnalyticsExportController } from './analytics-export.controller'; +import { AnalyticsExportRepository } from './analytics-export.repository'; +import { + AnalyticsExportService, + ANALYTICS_EXPORT_QUEUE, +} from './analytics-export.service'; +import { ExportJobEntity } from './entities/export-job.entity'; +import { AnalyticsExportProcessor } from './processors/analytics-export.processor'; +import { CsvBuilderUtil } from './utils/csv-builder.util'; + +@Module({ + imports: [ + TypeOrmModule.forFeature([ExportJobEntity]), + BullModule.registerQueue({ + name: ANALYTICS_EXPORT_QUEUE, + defaultJobOptions: { + attempts: 3, + removeOnComplete: 100, + removeOnFail: 50, + }, + }), + ], + controllers: [AnalyticsExportController], + providers: [ + AnalyticsExportService, + AnalyticsExportRepository, + AnalyticsExportProcessor, + CsvBuilderUtil, + ], + exports: [AnalyticsExportService], +}) +export class AnalyticsExportModule {} diff --git a/apps/api/src/Analytics-Export/analytics-export.processor.spec.ts b/apps/api/src/Analytics-Export/analytics-export.processor.spec.ts new file mode 100644 index 0000000..f65586b --- /dev/null +++ b/apps/api/src/Analytics-Export/analytics-export.processor.spec.ts @@ -0,0 +1,99 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { Job } from 'bull'; +import { + AnalyticsExportService, + ANALYTICS_EXPORT_QUEUE, +} from '../analytics-export.service'; +import { AnalyticsMetric } from '../enums/analytics-metric.enum'; +import { ExportJobPayload } from '../interfaces/export-options.interface'; +import { AnalyticsExportProcessor } from '../processors/analytics-export.processor'; + +// ─── Fixtures ───────────────────────────────────────────────────────────────── + +const makePayload = (): ExportJobPayload => ({ + jobId: 'proc-job-001', + userId: 'user-proc', + options: { + metrics: [AnalyticsMetric.GAS_PRICE], + startDate: new Date('2024-01-01'), + endDate: new Date('2024-01-31'), + includeMetadata: true, + delimiter: ',', + dateFormat: 'iso', + timezone: 'UTC', + }, + requestedAt: new Date().toISOString(), +}); + +const makeJob = (data: ExportJobPayload): Partial> => ({ + data, + attemptsMade: 0, + progress: jest.fn(), +}); + +// ─── Tests ──────────────────────────────────────────────────────────────────── + +describe('AnalyticsExportProcessor', () => { + let processor: AnalyticsExportProcessor; + let service: { processExportJob: jest.Mock }; + + beforeEach(async () => { + service = { processExportJob: jest.fn() }; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + AnalyticsExportProcessor, + { provide: AnalyticsExportService, useValue: service }, + ], + }).compile(); + + processor = module.get(AnalyticsExportProcessor); + }); + + it('should call processExportJob with the job payload', async () => { + service.processExportJob.mockResolvedValue(undefined); + const job = makeJob(makePayload()) as Job; + + await processor.handleExport(job); + + expect(service.processExportJob).toHaveBeenCalledWith(job.data); + }); + + it('should set job progress to 5 before processing', async () => { + service.processExportJob.mockResolvedValue(undefined); + const job = makeJob(makePayload()) as Job; + + await processor.handleExport(job); + + expect(job.progress).toHaveBeenCalledWith(5); + }); + + it('should set job progress to 100 on success', async () => { + service.processExportJob.mockResolvedValue(undefined); + const job = makeJob(makePayload()) as Job; + + await processor.handleExport(job); + + expect(job.progress).toHaveBeenCalledWith(100); + }); + + it('should rethrow errors so Bull can handle retries', async () => { + service.processExportJob.mockRejectedValue(new Error('Disk full')); + const job = makeJob(makePayload()) as Job; + + await expect(processor.handleExport(job)).rejects.toThrow('Disk full'); + }); + + it('should not set progress to 100 when an error occurs', async () => { + service.processExportJob.mockRejectedValue(new Error('fail')); + const job = makeJob(makePayload()) as Job; + + try { + await processor.handleExport(job); + } catch { + // expected + } + + expect(job.progress).not.toHaveBeenCalledWith(100); + }); +}); diff --git a/apps/api/src/Analytics-Export/analytics-export.processor.ts b/apps/api/src/Analytics-Export/analytics-export.processor.ts new file mode 100644 index 0000000..fabf827 --- /dev/null +++ b/apps/api/src/Analytics-Export/analytics-export.processor.ts @@ -0,0 +1,38 @@ +import { Process, Processor } from '@nestjs/bull'; +import { Logger } from '@nestjs/common'; +import { Job } from 'bull'; +import { + AnalyticsExportService, + ANALYTICS_EXPORT_QUEUE, +} from '../analytics-export.service'; +import { ExportJobPayload } from '../interfaces/export-options.interface'; + +@Processor(ANALYTICS_EXPORT_QUEUE) +export class AnalyticsExportProcessor { + private readonly logger = new Logger(AnalyticsExportProcessor.name); + + constructor(private readonly exportService: AnalyticsExportService) {} + + @Process('export') + async handleExport(job: Job): Promise { + const { jobId, userId } = job.data; + + this.logger.log( + `Processing export job: jobId=${jobId}, userId=${userId}, attempt=${job.attemptsMade + 1}`, + ); + + await job.progress(5); + + try { + await this.exportService.processExportJob(job.data); + await job.progress(100); + this.logger.log(`Export job completed: jobId=${jobId}`); + } catch (error) { + this.logger.error( + `Export job failed: jobId=${jobId}`, + error instanceof Error ? error.stack : String(error), + ); + throw error; // Re-throw so Bull handles retries + } + } +} diff --git a/apps/api/src/Analytics-Export/analytics-export.repository.spec.ts b/apps/api/src/Analytics-Export/analytics-export.repository.spec.ts new file mode 100644 index 0000000..0dbcc19 --- /dev/null +++ b/apps/api/src/Analytics-Export/analytics-export.repository.spec.ts @@ -0,0 +1,279 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { getRepositoryToken } from '@nestjs/typeorm'; +import { SelectQueryBuilder } from 'typeorm'; +import { AnalyticsExportRepository } from '../analytics-export.repository'; +import { ExportJobEntity } from '../entities/export-job.entity'; +import { AnalyticsMetric } from '../enums/analytics-metric.enum'; +import { ExportStatus } from '../enums/export-status.enum'; +import { ExportOptions } from '../interfaces/export-options.interface'; + +// ─── Fixtures ───────────────────────────────────────────────────────────────── + +const USER_ID = 'user-aaa'; + +const makeOptions = (overrides: Partial = {}): ExportOptions => ({ + metrics: [AnalyticsMetric.GAS_PRICE], + startDate: new Date('2024-01-01'), + endDate: new Date('2024-01-31'), + includeMetadata: true, + delimiter: ',', + dateFormat: 'iso', + timezone: 'UTC', + ...overrides, +}); + +const makeJobEntity = (overrides: Partial = {}): ExportJobEntity => + ({ + id: 'job-1', + userId: USER_ID, + status: ExportStatus.PENDING, + options: makeOptions(), + rowCount: null, + fileSizeBytes: null, + downloadUrl: null, + errorMessage: null, + bullJobId: null, + createdAt: new Date(), + updatedAt: new Date(), + completedAt: null, + ...overrides, + } as ExportJobEntity); + +// ─── QueryBuilder mock ──────────────────────────────────────────────────────── + +const makeQb = (rawResult: unknown[] = [], countResult: { count: string } = { count: '0' }) => { + const qb: Partial> = { + select: jest.fn().mockReturnThis(), + from: jest.fn().mockReturnThis(), + where: jest.fn().mockReturnThis(), + andWhere: jest.fn().mockReturnThis(), + orderBy: jest.fn().mockReturnThis(), + limit: jest.fn().mockReturnThis(), + getRawMany: jest.fn().mockResolvedValue(rawResult), + getRawOne: jest.fn().mockResolvedValue(countResult), + }; + return qb as SelectQueryBuilder; +}; + +// ─── Tests ──────────────────────────────────────────────────────────────────── + +describe('AnalyticsExportRepository', () => { + let repo: AnalyticsExportRepository; + let typeormRepo: { + create: jest.Mock; + save: jest.Mock; + findOne: jest.Mock; + find: jest.Mock; + update: jest.Mock; + manager: { createQueryBuilder: jest.Mock }; + }; + + beforeEach(async () => { + const qb = makeQb(); + + typeormRepo = { + create: jest.fn(), + save: jest.fn(), + findOne: jest.fn(), + find: jest.fn(), + update: jest.fn(), + manager: { createQueryBuilder: jest.fn().mockReturnValue(qb) }, + }; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + AnalyticsExportRepository, + { + provide: getRepositoryToken(ExportJobEntity), + useValue: typeormRepo, + }, + ], + }).compile(); + + repo = module.get(AnalyticsExportRepository); + }); + + // ─── createJob ────────────────────────────────────────────────────────────── + + describe('createJob()', () => { + it('should create and save a new export job entity', async () => { + const entity = makeJobEntity(); + typeormRepo.create.mockReturnValue(entity); + typeormRepo.save.mockResolvedValue(entity); + + const result = await repo.createJob(USER_ID, makeOptions()); + + expect(typeormRepo.create).toHaveBeenCalledWith( + expect.objectContaining({ userId: USER_ID, status: ExportStatus.PENDING }), + ); + expect(typeormRepo.save).toHaveBeenCalledWith(entity); + expect(result).toEqual(entity); + }); + }); + + // ─── findJobById ──────────────────────────────────────────────────────────── + + describe('findJobById()', () => { + it('should return entity when found', async () => { + typeormRepo.findOne.mockResolvedValue(makeJobEntity()); + const result = await repo.findJobById('job-1'); + expect(result).not.toBeNull(); + expect(result?.id).toBe('job-1'); + }); + + it('should return null when not found', async () => { + typeormRepo.findOne.mockResolvedValue(null); + const result = await repo.findJobById('missing'); + expect(result).toBeNull(); + }); + }); + + // ─── findJobsByUser ───────────────────────────────────────────────────────── + + describe('findJobsByUser()', () => { + it('should return a list of jobs for the given user', async () => { + typeormRepo.find.mockResolvedValue([makeJobEntity(), makeJobEntity({ id: 'job-2' })]); + const results = await repo.findJobsByUser(USER_ID); + expect(results).toHaveLength(2); + }); + + it('should return empty array when user has no jobs', async () => { + typeormRepo.find.mockResolvedValue([]); + const results = await repo.findJobsByUser(USER_ID); + expect(results).toEqual([]); + }); + }); + + // ─── updateJobStatus ──────────────────────────────────────────────────────── + + describe('updateJobStatus()', () => { + it('should call update with the given status', async () => { + typeormRepo.update.mockResolvedValue({ affected: 1 }); + await repo.updateJobStatus('job-1', ExportStatus.PROCESSING); + expect(typeormRepo.update).toHaveBeenCalledWith('job-1', { status: ExportStatus.PROCESSING }); + }); + + it('should merge extra fields when provided', async () => { + typeormRepo.update.mockResolvedValue({ affected: 1 }); + await repo.updateJobStatus('job-1', ExportStatus.PENDING, { bullJobId: 'bull-99' }); + expect(typeormRepo.update).toHaveBeenCalledWith('job-1', { + status: ExportStatus.PENDING, + bullJobId: 'bull-99', + }); + }); + }); + + // ─── markJobCompleted ─────────────────────────────────────────────────────── + + describe('markJobCompleted()', () => { + it('should update status to COMPLETED with row count, size, and URL', async () => { + typeormRepo.update.mockResolvedValue({ affected: 1 }); + + await repo.markJobCompleted('job-1', 500, 20480, '/download/job-1'); + + expect(typeormRepo.update).toHaveBeenCalledWith( + 'job-1', + expect.objectContaining({ + status: ExportStatus.COMPLETED, + rowCount: 500, + fileSizeBytes: 20480, + downloadUrl: '/download/job-1', + completedAt: expect.any(Date), + }), + ); + }); + + it('should set downloadUrl to null when not provided', async () => { + typeormRepo.update.mockResolvedValue({ affected: 1 }); + await repo.markJobCompleted('job-1', 0, 0); + expect(typeormRepo.update).toHaveBeenCalledWith( + 'job-1', + expect.objectContaining({ downloadUrl: null }), + ); + }); + }); + + // ─── markJobFailed ────────────────────────────────────────────────────────── + + describe('markJobFailed()', () => { + it('should update status to FAILED with error message', async () => { + typeormRepo.update.mockResolvedValue({ affected: 1 }); + await repo.markJobFailed('job-1', 'Connection timeout'); + expect(typeormRepo.update).toHaveBeenCalledWith( + 'job-1', + expect.objectContaining({ + status: ExportStatus.FAILED, + errorMessage: 'Connection timeout', + completedAt: expect.any(Date), + }), + ); + }); + }); + + // ─── fetchAnalyticsRecords ────────────────────────────────────────────────── + + describe('fetchAnalyticsRecords()', () => { + it('should call createQueryBuilder and return raw results', async () => { + const rawRecords = [{ id: 'rec-001', metric: 'gas_price' }]; + const qb = makeQb(rawRecords); + typeormRepo.manager.createQueryBuilder.mockReturnValue(qb); + + const results = await repo.fetchAnalyticsRecords(makeOptions()); + + expect(qb.from).toHaveBeenCalledWith('analytics_events', 'ae'); + expect(results).toEqual(rawRecords); + }); + + it('should skip metric filter when metrics includes ALL', async () => { + const qb = makeQb([]); + typeormRepo.manager.createQueryBuilder.mockReturnValue(qb); + + await repo.fetchAnalyticsRecords(makeOptions({ metrics: [AnalyticsMetric.ALL] })); + + expect(qb.andWhere).not.toHaveBeenCalledWith( + expect.stringContaining('metric IN'), + expect.anything(), + ); + }); + + it('should apply networkId filter when provided', async () => { + const qb = makeQb([]); + typeormRepo.manager.createQueryBuilder.mockReturnValue(qb); + + await repo.fetchAnalyticsRecords(makeOptions({ networkId: 'net-42' })); + + expect(qb.andWhere).toHaveBeenCalledWith('ae.network_id = :networkId', { networkId: 'net-42' }); + }); + + it('should apply limit when provided', async () => { + const qb = makeQb([]); + typeormRepo.manager.createQueryBuilder.mockReturnValue(qb); + + await repo.fetchAnalyticsRecords(makeOptions({ limit: 100 })); + + expect(qb.limit).toHaveBeenCalledWith(100); + }); + }); + + // ─── countAnalyticsRecords ────────────────────────────────────────────────── + + describe('countAnalyticsRecords()', () => { + it('should return parsed integer count', async () => { + const qb = makeQb([], { count: '750' }); + typeormRepo.manager.createQueryBuilder.mockReturnValue(qb); + + const count = await repo.countAnalyticsRecords(makeOptions()); + + expect(count).toBe(750); + }); + + it('should return 0 when count result is undefined', async () => { + const qb = makeQb([], undefined as never); + typeormRepo.manager.createQueryBuilder.mockReturnValue(qb); + + const count = await repo.countAnalyticsRecords(makeOptions()); + + expect(count).toBe(0); + }); + }); +}); diff --git a/apps/api/src/Analytics-Export/analytics-export.repository.ts b/apps/api/src/Analytics-Export/analytics-export.repository.ts new file mode 100644 index 0000000..574f04d --- /dev/null +++ b/apps/api/src/Analytics-Export/analytics-export.repository.ts @@ -0,0 +1,169 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Between, FindOptionsWhere, In, Repository } from 'typeorm'; +import { ExportJobEntity } from '../entities/export-job.entity'; +import { AnalyticsMetric } from '../enums/analytics-metric.enum'; +import { ExportStatus } from '../enums/export-status.enum'; +import { AnalyticsRecord } from '../interfaces/analytics-record.interface'; +import { ExportOptions } from '../interfaces/export-options.interface'; + +/** + * Represents a raw analytics row as stored in the database. + * Adjust the table name and columns to match your actual schema. + */ +@Injectable() +export class AnalyticsExportRepository { + private readonly logger = new Logger(AnalyticsExportRepository.name); + + constructor( + @InjectRepository(ExportJobEntity) + private readonly exportJobRepo: Repository, + ) {} + + // ─── Export Job CRUD ──────────────────────────────────────────────────────── + + async createJob( + userId: string, + options: ExportOptions, + ): Promise { + const job = this.exportJobRepo.create({ + userId, + options, + status: ExportStatus.PENDING, + }); + return this.exportJobRepo.save(job); + } + + async findJobById(id: string): Promise { + return this.exportJobRepo.findOne({ where: { id } }); + } + + async findJobsByUser(userId: string): Promise { + return this.exportJobRepo.find({ + where: { userId }, + order: { createdAt: 'DESC' }, + take: 50, + }); + } + + async updateJobStatus( + id: string, + status: ExportStatus, + extras?: Partial< + Pick< + ExportJobEntity, + 'rowCount' | 'fileSizeBytes' | 'downloadUrl' | 'errorMessage' | 'completedAt' | 'bullJobId' + > + >, + ): Promise { + await this.exportJobRepo.update(id, { status, ...extras }); + } + + async markJobCompleted( + id: string, + rowCount: number, + fileSizeBytes: number, + downloadUrl?: string, + ): Promise { + await this.exportJobRepo.update(id, { + status: ExportStatus.COMPLETED, + rowCount, + fileSizeBytes, + downloadUrl: downloadUrl ?? null, + completedAt: new Date(), + }); + } + + async markJobFailed(id: string, errorMessage: string): Promise { + await this.exportJobRepo.update(id, { + status: ExportStatus.FAILED, + errorMessage, + completedAt: new Date(), + }); + } + + // ─── Analytics Data Queries ───────────────────────────────────────────────── + + /** + * Fetch analytics records from the analytics_events table. + * This uses a raw query for maximum flexibility and performance. + */ + async fetchAnalyticsRecords(options: ExportOptions): Promise { + const { + metrics, + startDate, + endDate, + networkId, + userId, + limit, + } = options; + + const hasAllMetrics = metrics.includes(AnalyticsMetric.ALL); + + const qb = this.exportJobRepo.manager + .createQueryBuilder() + .select([ + 'ae.id AS id', + 'ae.metric AS metric', + 'ae.value AS value', + 'ae.unit AS unit', + 'ae.metadata AS metadata', + 'ae.network_id AS "networkId"', + 'ae.user_id AS "userId"', + 'ae.timestamp AS timestamp', + 'ae.created_at AS "createdAt"', + ]) + .from('analytics_events', 'ae') + .where('ae.timestamp BETWEEN :startDate AND :endDate', { + startDate, + endDate, + }) + .orderBy('ae.timestamp', 'ASC'); + + if (!hasAllMetrics) { + qb.andWhere('ae.metric IN (:...metrics)', { metrics }); + } + + if (networkId) { + qb.andWhere('ae.network_id = :networkId', { networkId }); + } + + if (userId) { + qb.andWhere('ae.user_id = :userId', { userId }); + } + + if (limit) { + qb.limit(limit); + } + + const rows = await qb.getRawMany(); + this.logger.debug(`Fetched ${rows.length} analytics records`); + return rows; + } + + /** + * Count how many analytics records match the given options (for progress tracking). + */ + async countAnalyticsRecords(options: ExportOptions): Promise { + const { metrics, startDate, endDate, networkId, userId } = options; + const hasAllMetrics = metrics.includes(AnalyticsMetric.ALL); + + const qb = this.exportJobRepo.manager + .createQueryBuilder() + .select('COUNT(*)', 'count') + .from('analytics_events', 'ae') + .where('ae.timestamp BETWEEN :startDate AND :endDate', { + startDate, + endDate, + }); + + if (!hasAllMetrics) { + qb.andWhere('ae.metric IN (:...metrics)', { metrics }); + } + if (networkId) qb.andWhere('ae.network_id = :networkId', { networkId }); + if (userId) qb.andWhere('ae.user_id = :userId', { userId }); + + const result = await qb.getRawOne<{ count: string }>(); + return parseInt(result?.count ?? '0', 10); + } +} diff --git a/apps/api/src/Analytics-Export/analytics-export.service.ts b/apps/api/src/Analytics-Export/analytics-export.service.ts new file mode 100644 index 0000000..bffd3d7 --- /dev/null +++ b/apps/api/src/Analytics-Export/analytics-export.service.ts @@ -0,0 +1,305 @@ +import { + BadRequestException, + Injectable, + Logger, + NotFoundException, +} from '@nestjs/common'; +import { InjectQueue } from '@nestjs/bull'; +import { Queue } from 'bull'; +import { AnalyticsExportRepository } from './analytics-export.repository'; +import { ExportAnalyticsDto } from './dto/export-analytics.dto'; +import { + ExportJobResponseDto, + ExportJobStatusDto, +} from './dto/export-response.dto'; +import { ExportJobEntity } from './entities/export-job.entity'; +import { AnalyticsMetric } from './enums/analytics-metric.enum'; +import { ExportStatus } from './enums/export-status.enum'; +import { AnalyticsRecord } from './interfaces/analytics-record.interface'; +import { + CsvBuildOptions, + CsvColumn, + ExportJobPayload, + ExportOptions, +} from './interfaces/export-options.interface'; +import { CsvBuilderUtil } from './utils/csv-builder.util'; + +export const ANALYTICS_EXPORT_QUEUE = 'analytics-export'; + +/** Threshold: above this row count, switch to async export */ +const SYNC_ROW_LIMIT = 10_000; + +@Injectable() +export class AnalyticsExportService { + private readonly logger = new Logger(AnalyticsExportService.name); + + constructor( + private readonly repository: AnalyticsExportRepository, + private readonly csvBuilder: CsvBuilderUtil, + @InjectQueue(ANALYTICS_EXPORT_QUEUE) + private readonly exportQueue: Queue, + ) {} + + // ─── Public API ───────────────────────────────────────────────────────────── + + /** + * Generate and stream a CSV directly for small datasets, + * or enqueue an async job for large ones. + */ + async initiateExport( + userId: string, + dto: ExportAnalyticsDto, + ): Promise<{ csv: string; filename: string } | ExportJobResponseDto> { + const options = this.mapDtoToOptions(dto); + + if (dto.async) { + return this.enqueueExportJob(userId, options); + } + + const count = await this.repository.countAnalyticsRecords(options); + if (count > SYNC_ROW_LIMIT) { + throw new BadRequestException( + `Dataset too large for synchronous export (${count} rows). ` + + `Use async=true to queue a background export job.`, + ); + } + + return this.buildSyncCsv(userId, options); + } + + /** + * Build CSV synchronously and return the raw string + suggested filename. + */ + async buildSyncCsv( + userId: string, + options: ExportOptions, + ): Promise<{ csv: string; filename: string }> { + const records = await this.repository.fetchAnalyticsRecords(options); + const csv = this.buildCsvFromRecords(records, options); + const filename = this.buildFilename(options); + + this.logger.log( + `Sync CSV export for user=${userId}: ${records.length} rows, ${this.csvBuilder.estimateSize(csv)} bytes`, + ); + + return { csv, filename }; + } + + /** + * Enqueue an async export job and return the job reference. + */ + async enqueueExportJob( + userId: string, + options: ExportOptions, + ): Promise { + const jobEntity = await this.repository.createJob(userId, options); + + const payload: ExportJobPayload = { + jobId: jobEntity.id, + userId, + options, + requestedAt: new Date().toISOString(), + }; + + const bullJob = await this.exportQueue.add('export', payload, { + attempts: 3, + backoff: { type: 'exponential', delay: 5000 }, + removeOnComplete: 100, + removeOnFail: 50, + }); + + await this.repository.updateJobStatus(jobEntity.id, ExportStatus.PENDING, { + bullJobId: String(bullJob.id), + }); + + return { + jobId: jobEntity.id, + status: ExportStatus.PENDING, + message: 'Export job queued successfully. Poll the status URL for updates.', + statusUrl: `/analytics/export/jobs/${jobEntity.id}`, + createdAt: jobEntity.createdAt.toISOString(), + }; + } + + /** + * Process an async export job (called from BullMQ processor). + */ + async processExportJob(payload: ExportJobPayload): Promise { + const { jobId, userId, options } = payload; + + await this.repository.updateJobStatus(jobId, ExportStatus.PROCESSING); + + try { + const { csv, filename } = await this.buildSyncCsv(userId, options); + const sizeBytes = this.csvBuilder.estimateSize(csv); + const rowCount = csv.split('\n').length - 1; // subtract header + + // In a real system, upload to S3/GCS and store the URL. + // Here we store the filename as a placeholder for the download URL. + await this.repository.markJobCompleted( + jobId, + rowCount, + sizeBytes, + `/analytics/export/jobs/${jobId}/download`, + ); + + this.logger.log( + `Async export completed: jobId=${jobId}, rows=${rowCount}, size=${sizeBytes}B`, + ); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + await this.repository.markJobFailed(jobId, message); + this.logger.error(`Async export failed: jobId=${jobId}`, error); + throw error; + } + } + + /** + * Fetch job status for the polling endpoint. + */ + async getJobStatus(jobId: string, requestingUserId: string): Promise { + const job = await this.repository.findJobById(jobId); + if (!job) throw new NotFoundException(`Export job ${jobId} not found`); + + if (job.userId !== requestingUserId) { + throw new NotFoundException(`Export job ${jobId} not found`); + } + + return this.mapEntityToStatusDto(job); + } + + /** + * List recent export jobs for a user. + */ + async listUserJobs(userId: string): Promise { + const jobs = await this.repository.findJobsByUser(userId); + return jobs.map((j) => this.mapEntityToStatusDto(j)); + } + + // ─── CSV Construction ──────────────────────────────────────────────────────── + + buildCsvFromRecords(records: AnalyticsRecord[], options: ExportOptions): string { + const columns = this.buildColumns(options); + const buildOptions: CsvBuildOptions = { + columns, + delimiter: options.delimiter, + includeHeader: true, + nullPlaceholder: '', + }; + return this.csvBuilder.build(records, buildOptions); + } + + /** + * Build the column schema dynamically based on requested metrics and options. + */ + buildColumns(options: ExportOptions): CsvColumn[] { + const { dateFormat, timezone, includeMetadata } = options; + const fmt = (d: unknown) => + this.csvBuilder.formatDate(d as Date | string, dateFormat, timezone); + + const base: CsvColumn[] = [ + { key: 'id', header: 'ID' }, + { key: 'metric', header: 'Metric' }, + { key: 'value', header: 'Value', formatter: (v) => String(v) }, + { key: 'unit', header: 'Unit' }, + { key: 'networkId', header: 'Network ID' }, + { key: 'userId', header: 'User ID' }, + { key: 'timestamp', header: 'Timestamp', formatter: fmt }, + { key: 'createdAt', header: 'Created At', formatter: fmt }, + ]; + + if (!includeMetadata) return base; + + const metricMetaCols = this.buildMetricMetadataColumns(options.metrics); + return [...base, ...metricMetaCols]; + } + + /** + * Add flattened metadata columns for each requested metric type. + */ + private buildMetricMetadataColumns(metrics: AnalyticsMetric[]): CsvColumn[] { + const allMetrics = metrics.includes(AnalyticsMetric.ALL); + const cols: CsvColumn[] = []; + + // Gas price columns + if (allMetrics || metrics.includes(AnalyticsMetric.GAS_PRICE)) { + cols.push( + { key: 'metadata.baseFee', header: 'Base Fee (Gwei)', formatter: (v) => String(v ?? '') }, + { key: 'metadata.priorityFee', header: 'Priority Fee (Gwei)', formatter: (v) => String(v ?? '') }, + { key: 'metadata.gasLimit', header: 'Gas Limit', formatter: (v) => String(v ?? '') }, + { key: 'metadata.blockNumber', header: 'Block Number', formatter: (v) => String(v ?? '') }, + { key: 'metadata.networkName', header: 'Network Name', formatter: (v) => String(v ?? '') }, + ); + } + + // Alert columns + if (allMetrics || metrics.includes(AnalyticsMetric.ALERT_TRIGGERED)) { + cols.push( + { key: 'metadata.alertId', header: 'Alert ID', formatter: (v) => String(v ?? '') }, + { key: 'metadata.alertName', header: 'Alert Name', formatter: (v) => String(v ?? '') }, + { key: 'metadata.thresholdValue', header: 'Threshold Value', formatter: (v) => String(v ?? '') }, + { key: 'metadata.actualValue', header: 'Actual Value', formatter: (v) => String(v ?? '') }, + { key: 'metadata.severity', header: 'Severity', formatter: (v) => String(v ?? '') }, + ); + } + + // Fee recommendation columns + if (allMetrics || metrics.includes(AnalyticsMetric.FEE_RECOMMENDATION)) { + cols.push( + { key: 'metadata.recommendedFee', header: 'Recommended Fee', formatter: (v) => String(v ?? '') }, + { key: 'metadata.confidence', header: 'Confidence Score', formatter: (v) => String(v ?? '') }, + { key: 'metadata.strategy', header: 'Fee Strategy', formatter: (v) => String(v ?? '') }, + { key: 'metadata.estimatedConfirmationTime', header: 'Est. Confirmation (s)', formatter: (v) => String(v ?? '') }, + ); + } + + // Volatility columns + if (allMetrics || metrics.includes(AnalyticsMetric.VOLATILITY_INDEX)) { + cols.push( + { key: 'metadata.stdDev', header: 'Std Deviation', formatter: (v) => String(v ?? '') }, + { key: 'metadata.percentileRank', header: 'Percentile Rank', formatter: (v) => String(v ?? '') }, + { key: 'metadata.windowMinutes', header: 'Window (minutes)', formatter: (v) => String(v ?? '') }, + { key: 'metadata.trend', header: 'Trend', formatter: (v) => String(v ?? '') }, + ); + } + + return cols; + } + + // ─── Helpers ───────────────────────────────────────────────────────────────── + + private mapDtoToOptions(dto: ExportAnalyticsDto): ExportOptions { + return { + metrics: dto.metrics, + startDate: new Date(dto.startDate), + endDate: new Date(dto.endDate), + networkId: dto.networkId, + userId: dto.userId, + includeMetadata: dto.includeMetadata ?? true, + delimiter: dto.delimiter ?? ',', + dateFormat: dto.dateFormat ?? 'iso', + timezone: dto.timezone ?? 'UTC', + limit: dto.limit, + }; + } + + private buildFilename(options: ExportOptions): string { + const metrics = options.metrics.join('-'); + const from = options.startDate.toISOString().slice(0, 10); + const to = options.endDate.toISOString().slice(0, 10); + return `analytics_${metrics}_${from}_to_${to}.csv`; + } + + private mapEntityToStatusDto(entity: ExportJobEntity): ExportJobStatusDto { + return { + jobId: entity.id, + status: entity.status, + rowCount: entity.rowCount, + fileSizeBytes: entity.fileSizeBytes, + downloadUrl: entity.downloadUrl, + errorMessage: entity.errorMessage, + createdAt: entity.createdAt.toISOString(), + completedAt: entity.completedAt?.toISOString() ?? null, + }; + } +} diff --git a/apps/api/src/Analytics-Export/analytics-metric.enum.ts b/apps/api/src/Analytics-Export/analytics-metric.enum.ts new file mode 100644 index 0000000..7f3b08d --- /dev/null +++ b/apps/api/src/Analytics-Export/analytics-metric.enum.ts @@ -0,0 +1,10 @@ +export enum AnalyticsMetric { + GAS_PRICE = 'gas_price', + ALERT_TRIGGERED = 'alert_triggered', + FEE_RECOMMENDATION = 'fee_recommendation', + VOLATILITY_INDEX = 'volatility_index', + TRANSACTION_COUNT = 'transaction_count', + USER_ACTIVITY = 'user_activity', + SPIKE_EVENT = 'spike_event', + ALL = 'all', +} diff --git a/apps/api/src/Analytics-Export/analytics-record.interface.ts b/apps/api/src/Analytics-Export/analytics-record.interface.ts new file mode 100644 index 0000000..6835648 --- /dev/null +++ b/apps/api/src/Analytics-Export/analytics-record.interface.ts @@ -0,0 +1,55 @@ +import { AnalyticsMetric } from '../enums/analytics-metric.enum'; + +export interface AnalyticsRecord { + id: string; + metric: AnalyticsMetric; + value: number; + unit: string; + metadata: Record; + networkId?: string; + userId?: string; + timestamp: Date; + createdAt: Date; +} + +export interface GasPriceRecord extends AnalyticsRecord { + metric: AnalyticsMetric.GAS_PRICE; + metadata: { + baseFee: number; + priorityFee: number; + gasLimit: number; + blockNumber: number; + networkName: string; + }; +} + +export interface AlertRecord extends AnalyticsRecord { + metric: AnalyticsMetric.ALERT_TRIGGERED; + metadata: { + alertId: string; + alertName: string; + thresholdValue: number; + actualValue: number; + severity: 'low' | 'medium' | 'high' | 'critical'; + }; +} + +export interface FeeRecommendationRecord extends AnalyticsRecord { + metric: AnalyticsMetric.FEE_RECOMMENDATION; + metadata: { + recommendedFee: number; + confidence: number; + strategy: 'economy' | 'standard' | 'fast'; + estimatedConfirmationTime: number; + }; +} + +export interface VolatilityRecord extends AnalyticsRecord { + metric: AnalyticsMetric.VOLATILITY_INDEX; + metadata: { + stdDev: number; + percentileRank: number; + windowMinutes: number; + trend: 'rising' | 'falling' | 'stable'; + }; +} diff --git a/apps/api/src/Analytics-Export/csv-builder.util.ts b/apps/api/src/Analytics-Export/csv-builder.util.ts new file mode 100644 index 0000000..abb8fe4 --- /dev/null +++ b/apps/api/src/Analytics-Export/csv-builder.util.ts @@ -0,0 +1,145 @@ +import { Injectable } from '@nestjs/common'; +import { CsvBuildOptions, CsvColumn } from '../interfaces/export-options.interface'; +import { AnalyticsRecord } from '../interfaces/analytics-record.interface'; + +@Injectable() +export class CsvBuilderUtil { + private readonly DEFAULT_NULL = ''; + + /** + * Build a complete CSV string from an array of records. + */ + build( + records: AnalyticsRecord[], + options: CsvBuildOptions, + ): string { + const lines: string[] = []; + + if (options.includeHeader) { + lines.push(this.buildHeader(options.columns, options.delimiter)); + } + + for (const record of records) { + lines.push(this.buildRow(record, options)); + } + + return lines.join('\n'); + } + + /** + * Stream records as CSV lines (generator for memory-efficient large exports). + */ + *stream( + records: AnalyticsRecord[], + options: CsvBuildOptions, + ): Generator { + if (options.includeHeader) { + yield this.buildHeader(options.columns, options.delimiter) + '\n'; + } + + for (const record of records) { + yield this.buildRow(record, options) + '\n'; + } + } + + /** + * Build the header row. + */ + buildHeader(columns: CsvColumn[], delimiter: string): string { + return columns.map((col) => this.escapeField(col.header, delimiter)).join(delimiter); + } + + /** + * Build a single data row. + */ + buildRow(record: AnalyticsRecord, options: CsvBuildOptions): string { + const { columns, delimiter, nullPlaceholder } = options; + + return columns + .map((col) => { + const rawValue = this.getNestedValue(record, col.key); + let formatted: string; + + if (rawValue === null || rawValue === undefined) { + formatted = nullPlaceholder ?? this.DEFAULT_NULL; + } else if (col.formatter) { + formatted = col.formatter(rawValue, record); + } else { + formatted = this.defaultFormat(rawValue); + } + + return this.escapeField(formatted, delimiter); + }) + .join(delimiter); + } + + /** + * Escape a CSV field value (wrap in quotes if it contains delimiter, quotes, or newlines). + */ + escapeField(value: string, delimiter: string): string { + const needsQuoting = + value.includes(delimiter) || + value.includes('"') || + value.includes('\n') || + value.includes('\r'); + + if (needsQuoting) { + return `"${value.replace(/"/g, '""')}"`; + } + return value; + } + + /** + * Access deeply nested object properties via dot-notation key. + * e.g. 'metadata.baseFee' + */ + getNestedValue(obj: Record, key: string): unknown { + return key.split('.').reduce((acc, part) => { + if (acc !== null && acc !== undefined && typeof acc === 'object') { + return (acc as Record)[part]; + } + return undefined; + }, obj as unknown); + } + + /** + * Default formatter: converts values to strings. + */ + private defaultFormat(value: unknown): string { + if (value instanceof Date) { + return value.toISOString(); + } + if (typeof value === 'object') { + return JSON.stringify(value); + } + return String(value); + } + + /** + * Calculate estimated file size in bytes for a CSV string. + */ + estimateSize(csv: string): number { + return Buffer.byteLength(csv, 'utf8'); + } + + /** + * Format a date according to the requested format. + */ + formatDate( + date: Date | string, + format: 'iso' | 'unix' | 'locale', + timezone = 'UTC', + ): string { + const d = typeof date === 'string' ? new Date(date) : date; + + switch (format) { + case 'unix': + return String(Math.floor(d.getTime() / 1000)); + case 'locale': + return d.toLocaleString('en-US', { timeZone: timezone }); + case 'iso': + default: + return d.toISOString(); + } + } +} diff --git a/apps/api/src/Analytics-Export/export-analytics.dto.ts b/apps/api/src/Analytics-Export/export-analytics.dto.ts new file mode 100644 index 0000000..0c3838c --- /dev/null +++ b/apps/api/src/Analytics-Export/export-analytics.dto.ts @@ -0,0 +1,105 @@ +import { Transform, Type } from 'class-transformer'; +import { + ArrayMinSize, + IsArray, + IsBoolean, + IsEnum, + IsIn, + IsISO8601, + IsInt, + IsOptional, + IsString, + IsUUID, + Max, + Min, + Validate, + ValidatorConstraint, + ValidatorConstraintInterface, +} from 'class-validator'; +import { AnalyticsMetric } from '../enums/analytics-metric.enum'; + +@ValidatorConstraint({ name: 'DateRangeValid', async: false }) +class DateRangeConstraint implements ValidatorConstraintInterface { + validate(_value: unknown, args: { object: ExportAnalyticsDto }): boolean { + const { startDate, endDate } = args.object; + if (!startDate || !endDate) return true; + return new Date(startDate) < new Date(endDate); + } + + defaultMessage(): string { + return 'startDate must be before endDate'; + } +} + +@ValidatorConstraint({ name: 'MaxRangeValid', async: false }) +class MaxRangeConstraint implements ValidatorConstraintInterface { + private readonly MAX_DAYS = 365; + + validate(_value: unknown, args: { object: ExportAnalyticsDto }): boolean { + const { startDate, endDate } = args.object; + if (!startDate || !endDate) return true; + const diffMs = + new Date(endDate).getTime() - new Date(startDate).getTime(); + const diffDays = diffMs / (1000 * 60 * 60 * 24); + return diffDays <= this.MAX_DAYS; + } + + defaultMessage(): string { + return 'Date range cannot exceed 365 days'; + } +} + +export class ExportAnalyticsDto { + @IsArray() + @ArrayMinSize(1) + @IsEnum(AnalyticsMetric, { each: true }) + @Transform(({ value }) => + Array.isArray(value) ? value : [value], + ) + metrics: AnalyticsMetric[]; + + @IsISO8601() + @Validate(DateRangeConstraint) + @Validate(MaxRangeConstraint) + startDate: string; + + @IsISO8601() + endDate: string; + + @IsOptional() + @IsUUID() + networkId?: string; + + @IsOptional() + @IsUUID() + userId?: string; + + @IsOptional() + @IsBoolean() + @Transform(({ value }) => value === 'true' || value === true) + includeMetadata?: boolean = true; + + @IsOptional() + @IsIn([',', ';', '\t']) + delimiter?: ',' | ';' | '\t' = ','; + + @IsOptional() + @IsIn(['iso', 'unix', 'locale']) + dateFormat?: 'iso' | 'unix' | 'locale' = 'iso'; + + @IsOptional() + @IsString() + timezone?: string = 'UTC'; + + @IsOptional() + @IsInt() + @Min(1) + @Max(1_000_000) + @Type(() => Number) + limit?: number; + + @IsOptional() + @IsBoolean() + @Transform(({ value }) => value === 'true' || value === true) + async?: boolean = false; +} diff --git a/apps/api/src/Analytics-Export/export-job.entity.ts b/apps/api/src/Analytics-Export/export-job.entity.ts new file mode 100644 index 0000000..83df6bc --- /dev/null +++ b/apps/api/src/Analytics-Export/export-job.entity.ts @@ -0,0 +1,52 @@ +import { + Column, + CreateDateColumn, + Entity, + Index, + PrimaryGeneratedColumn, + UpdateDateColumn, +} from 'typeorm'; +import { ExportStatus } from '../enums/export-status.enum'; +import { ExportOptions } from '../interfaces/export-options.interface'; + +@Entity('analytics_export_jobs') +@Index(['userId', 'createdAt']) +@Index(['status']) +export class ExportJobEntity { + @PrimaryGeneratedColumn('uuid') + id: string; + + @Column({ type: 'uuid' }) + @Index() + userId: string; + + @Column({ type: 'enum', enum: ExportStatus, default: ExportStatus.PENDING }) + status: ExportStatus; + + @Column({ type: 'jsonb' }) + options: ExportOptions; + + @Column({ type: 'int', nullable: true }) + rowCount: number | null; + + @Column({ type: 'bigint', nullable: true }) + fileSizeBytes: number | null; + + @Column({ type: 'text', nullable: true }) + downloadUrl: string | null; + + @Column({ type: 'text', nullable: true }) + errorMessage: string | null; + + @Column({ type: 'text', nullable: true }) + bullJobId: string | null; + + @CreateDateColumn() + createdAt: Date; + + @UpdateDateColumn() + updatedAt: Date; + + @Column({ type: 'timestamptz', nullable: true }) + completedAt: Date | null; +} diff --git a/apps/api/src/Analytics-Export/export-options.interface.ts b/apps/api/src/Analytics-Export/export-options.interface.ts new file mode 100644 index 0000000..ed4581c --- /dev/null +++ b/apps/api/src/Analytics-Export/export-options.interface.ts @@ -0,0 +1,42 @@ +import { AnalyticsMetric } from '../enums/analytics-metric.enum'; + +export interface ExportOptions { + metrics: AnalyticsMetric[]; + startDate: Date; + endDate: Date; + networkId?: string; + userId?: string; + includeMetadata: boolean; + delimiter: ',' | ';' | '\t'; + dateFormat: 'iso' | 'unix' | 'locale'; + timezone: string; + limit?: number; +} + +export interface CsvColumn { + key: string; + header: string; + formatter?: (value: unknown, record?: unknown) => string; +} + +export interface CsvBuildOptions { + columns: CsvColumn[]; + delimiter: string; + includeHeader: boolean; + nullPlaceholder: string; +} + +export interface ExportJobPayload { + jobId: string; + userId: string; + options: ExportOptions; + requestedAt: string; +} + +export interface ExportJobResult { + jobId: string; + rowCount: number; + fileSizeBytes: number; + completedAt: string; + downloadUrl?: string; +} diff --git a/apps/api/src/Analytics-Export/export-response.dto.ts b/apps/api/src/Analytics-Export/export-response.dto.ts new file mode 100644 index 0000000..1bc0d6a --- /dev/null +++ b/apps/api/src/Analytics-Export/export-response.dto.ts @@ -0,0 +1,20 @@ +import { ExportStatus } from '../enums/export-status.enum'; + +export class ExportJobResponseDto { + jobId: string; + status: ExportStatus; + message: string; + statusUrl: string; + createdAt: string; +} + +export class ExportJobStatusDto { + jobId: string; + status: ExportStatus; + rowCount: number | null; + fileSizeBytes: number | null; + downloadUrl: string | null; + errorMessage: string | null; + createdAt: string; + completedAt: string | null; +} diff --git a/apps/api/src/Analytics-Export/export-status.enum.ts b/apps/api/src/Analytics-Export/export-status.enum.ts new file mode 100644 index 0000000..a883943 --- /dev/null +++ b/apps/api/src/Analytics-Export/export-status.enum.ts @@ -0,0 +1,6 @@ +export enum ExportStatus { + PENDING = 'pending', + PROCESSING = 'processing', + COMPLETED = 'completed', + FAILED = 'failed', +}