Skip to content

Commit 4f2a7b9

Browse files
Merge pull request #210 from Stephan-Thomas/feat/data-export-gdpr-165
feat(users): add data export flow, entity, service, processor wiring …
2 parents ec080fe + 934e88b commit 4f2a7b9

File tree

9 files changed

+190
-7
lines changed

9 files changed

+190
-7
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
export default () => ({
2+
export: {
3+
storageBaseUrl: process.env.EXPORT_STORAGE_BASE_URL || 'https://storage.example.com/exports',
4+
allowedFormats: ['json', 'csv'],
5+
downloadExpirySeconds: parseInt(process.env.EXPORT_DOWNLOAD_EXPIRY || '86400', 10),
6+
},
7+
});

BackEnd/src/modules/jobs/job.types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ export interface DataExportPayload extends JobPayload {
116116
exportType: 'users' | 'payouts' | 'quests' | 'analytics';
117117
format: 'csv' | 'json' | 'xlsx';
118118
userId: string; // Who requested the export
119+
exportId?: string; // optional DB record id to correlate
119120
}
120121

121122
export interface ReportGeneratePayload extends JobPayload {

BackEnd/src/modules/jobs/jobs.module.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ import { WebhookProcessor } from './processors/webhook.processor';
1212
import { AnalyticsProcessor } from './processors/analytics.processor';
1313
import { QuestProcessor } from './processors/quest.processor';
1414
import { JobLog, JobLogRetry, JobDependency, JobSchedule } from './entities/job-log.entity';
15+
import { DataExport } from '../users/entities/data-export.entity';
1516

1617
@Module({
1718
imports: [
18-
TypeOrmModule.forFeature([JobLog, JobLogRetry, JobDependency, JobSchedule]),
19+
TypeOrmModule.forFeature([JobLog, JobLogRetry, JobDependency, JobSchedule, DataExport]),
1920
],
2021
providers: [
2122
JobsService,

BackEnd/src/modules/jobs/jobs.service.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { Injectable, Logger, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
22
import { Queue, Worker, Job } from 'bullmq';
33
import { QUEUES, DEFAULT_JOB_OPTIONS } from './jobs.constants';
4+
import { DataExportProcessor } from './processors/export.processor';
45

56
const redisConnection = () => {
67
const url = process.env.REDIS_URL || 'redis://127.0.0.1:6379';
@@ -14,6 +15,8 @@ export class JobsService implements OnModuleInit, OnModuleDestroy {
1415
private workers: Worker[] = [];
1516
private emailProcessor: ((messageId: string, dto: any) => Promise<void>) | null = null;
1617

18+
constructor(private readonly dataExportProcessor?: DataExportProcessor) {}
19+
1720
registerEmailProcessor(processor: (messageId: string, dto: any) => Promise<void>) {
1821
this.emailProcessor = processor;
1922
}
@@ -25,12 +28,17 @@ export class JobsService implements OnModuleInit, OnModuleDestroy {
2528
this.queues[QUEUES.SCHEDULED] = new Queue(QUEUES.SCHEDULED, redisConnection() as any);
2629
this.queues[QUEUES.DEAD_LETTER] = new Queue(QUEUES.DEAD_LETTER, redisConnection() as any);
2730
this.queues[QUEUES.EMAIL] = new Queue(QUEUES.EMAIL, redisConnection() as any);
31+
this.queues[QUEUES.EXPORTS] = new Queue(QUEUES.EXPORTS, redisConnection() as any);
2832

2933
this.createWorker(QUEUES.NOTIFICATIONS, this.handleNotification.bind(this));
3034
this.createWorker(QUEUES.ANALYTICS, this.handleAnalytics.bind(this));
3135
this.createWorker(QUEUES.CLEANUP, this.handleCleanup.bind(this));
3236
this.createWorker(QUEUES.SCHEDULED, this.handleScheduled.bind(this));
3337
this.createWorker(QUEUES.EMAIL, this.handleEmail.bind(this));
38+
// register exports worker if processor available
39+
if (this.dataExportProcessor && typeof this.dataExportProcessor.processExport === 'function') {
40+
this.createWorker(QUEUES.EXPORTS, this.dataExportProcessor.processExport.bind(this.dataExportProcessor));
41+
}
3442
}
3543

3644
async onModuleDestroy() {

BackEnd/src/modules/jobs/processors/export.processor.ts

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import { Injectable, Logger } from '@nestjs/common';
22
import { Job } from 'bullmq';
3+
import { InjectRepository } from '@nestjs/typeorm';
4+
import { Repository } from 'typeorm';
35
import { DataExportPayload, ReportGeneratePayload, JobResult } from '../job.types';
46
import { JobLogService } from './job-log.service';
7+
import { DataExport, DataExportStatus } from '../../users/entities/data-export.entity';
58

69
/**
710
* Data Export Processor
@@ -11,7 +14,11 @@ import { JobLogService } from './job-log.service';
1114
export class DataExportProcessor {
1215
private readonly logger = new Logger(DataExportProcessor.name);
1316

14-
constructor(private readonly jobLogService: JobLogService) {}
17+
constructor(
18+
private readonly jobLogService: JobLogService,
19+
@InjectRepository(DataExport)
20+
private readonly dataExportRepo: Repository<DataExport>,
21+
) {}
1522

1623
/**
1724
* Process data export job
@@ -52,7 +59,7 @@ export class DataExportProcessor {
5259

5360
// Simulate file generation
5461
const fileName = `export-${exportType}-${Date.now()}.${this.getFileExtension(format)}`;
55-
const downloadUrl = `https://storage.example.com/exports/${fileName}`;
62+
const downloadUrl = `${process.env.EXPORT_STORAGE_BASE_URL || 'https://storage.example.com/exports'}/${fileName}`;
5663

5764
await job.updateProgress(75);
5865

@@ -75,6 +82,21 @@ export class DataExportProcessor {
7582
duration: Date.now() - job.timestamp,
7683
};
7784

85+
// Update DB record if present
86+
if (job.data && (job.data as any).exportId) {
87+
try {
88+
await this.dataExportRepo.update((job.data as any).exportId, {
89+
status: DataExportStatus.COMPLETED,
90+
fileName,
91+
downloadUrl,
92+
recordCount: result.data.recordCount,
93+
exportedAt: result.data.exportedAt,
94+
} as any);
95+
} catch (err) {
96+
this.logger.error('Failed to update data export record', err?.stack || err);
97+
}
98+
}
99+
78100
this.logger.log(`Data export completed: ${fileName}`);
79101
return result;
80102
} catch (error) {
@@ -83,6 +105,17 @@ export class DataExportProcessor {
83105
error.stack,
84106
);
85107

108+
// mark DB record as failed if exportId provided
109+
if ((job.data as any)?.exportId) {
110+
try {
111+
await this.dataExportRepo.update((job.data as any).exportId, {
112+
status: DataExportStatus.FAILED,
113+
} as any);
114+
} catch (err) {
115+
this.logger.error('Failed to mark export record failed', err?.stack || err);
116+
}
117+
}
118+
86119
throw error;
87120
}
88121
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import { Injectable, Logger } from '@nestjs/common';
2+
import { InjectRepository } from '@nestjs/typeorm';
3+
import { Repository } from 'typeorm';
4+
import { DataExport, DataExportStatus } from './entities/data-export.entity';
5+
import { JobsService } from '../jobs/jobs.service';
6+
import { QUEUES } from '../jobs/jobs.constants';
7+
8+
@Injectable()
9+
export class DataExportService {
10+
private readonly logger = new Logger(DataExportService.name);
11+
12+
constructor(
13+
@InjectRepository(DataExport)
14+
private readonly dataExportRepo: Repository<DataExport>,
15+
private readonly jobsService: JobsService,
16+
) {}
17+
18+
async requestExport(userId: string, exportType: string, format: string) {
19+
const exportRecord = this.dataExportRepo.create({
20+
userId,
21+
exportType,
22+
format,
23+
status: DataExportStatus.PENDING,
24+
});
25+
26+
const saved = await this.dataExportRepo.save(exportRecord);
27+
28+
try {
29+
await this.jobsService.addJob(QUEUES.EXPORTS, {
30+
organizationId: null,
31+
exportType,
32+
format,
33+
userId,
34+
exportId: saved.id,
35+
});
36+
this.logger.log(`Queued export job for user ${userId} id=${saved.id}`);
37+
} catch (err) {
38+
this.logger.error('Failed to enqueue export job', err?.stack || err);
39+
saved.status = DataExportStatus.FAILED;
40+
await this.dataExportRepo.save(saved);
41+
}
42+
43+
return saved;
44+
}
45+
46+
async markProcessing(id: string) {
47+
await this.dataExportRepo.update(id, { status: DataExportStatus.PROCESSING });
48+
}
49+
50+
async markCompleted(id: string, payload: Partial<DataExport>) {
51+
await this.dataExportRepo.update(id, { status: DataExportStatus.COMPLETED, ...payload });
52+
}
53+
54+
async markFailed(id: string, error?: string) {
55+
await this.dataExportRepo.update(id, { status: DataExportStatus.FAILED });
56+
}
57+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import { Entity, PrimaryGeneratedColumn, Column, CreateDateColumn, UpdateDateColumn } from 'typeorm';
2+
3+
export enum DataExportStatus {
4+
PENDING = 'pending',
5+
PROCESSING = 'processing',
6+
COMPLETED = 'completed',
7+
FAILED = 'failed',
8+
}
9+
10+
@Entity('data_exports')
11+
export class DataExport {
12+
@PrimaryGeneratedColumn('uuid')
13+
id: string;
14+
15+
@Column({ nullable: false })
16+
userId: string;
17+
18+
@Column({ default: 'users' })
19+
exportType: string;
20+
21+
@Column({ default: 'json' })
22+
format: string;
23+
24+
@Column({ type: 'enum', enum: DataExportStatus, default: DataExportStatus.PENDING })
25+
status: DataExportStatus;
26+
27+
@Column({ nullable: true })
28+
fileName?: string;
29+
30+
@Column({ nullable: true })
31+
downloadUrl?: string;
32+
33+
@Column({ type: 'int', nullable: true })
34+
recordCount?: number;
35+
36+
@Column({ type: 'timestamptz', nullable: true })
37+
exportedAt?: Date;
38+
39+
@CreateDateColumn()
40+
createdAt: Date;
41+
42+
@UpdateDateColumn()
43+
updatedAt: Date;
44+
}

BackEnd/src/modules/users/user.controller.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {
22
Controller,
33
Get,
4+
Post,
45
Patch,
56
Delete,
67
Param,
@@ -30,11 +31,15 @@ import { UsersService } from './user.service';
3031
import { User } from './entities/user.entity';
3132
import { Role } from '../../common/enums/role.enum';
3233
import { Roles } from '../auth/decorators/roles.decorator';
34+
import { DataExportService } from './data-export.service';
3335

3436
@ApiTags('users')
3537
@Controller('users')
3638
export class UsersController {
37-
constructor(private readonly usersService: UsersService) {}
39+
constructor(
40+
private readonly usersService: UsersService,
41+
private readonly dataExportService: DataExportService,
42+
) {}
3843

3944
@Get('search')
4045
@ApiOperation({ summary: 'Search users by username or address' })
@@ -150,4 +155,27 @@ export class UsersController {
150155
) {
151156
await this.usersService.deleteUser(address, requestingUser);
152157
}
158+
159+
@Post('export')
160+
@UseGuards(JwtAuthGuard)
161+
@ApiBearerAuth()
162+
@ApiOperation({ summary: 'Request data export (async)' })
163+
@ApiResponse({ status: 202, description: 'Export queued' })
164+
async requestExport(
165+
@CurrentUser() user: User,
166+
@Body('format') format: string = 'json',
167+
@Body('exportType') exportType: string = 'users',
168+
) {
169+
if (!user || !user.id) {
170+
throw new BadRequestException('Invalid user');
171+
}
172+
173+
const record = await this.dataExportService.requestExport(user.id, exportType, format);
174+
175+
return {
176+
id: record.id,
177+
status: record.status,
178+
message: 'Export request queued. You will be notified when ready.',
179+
};
180+
}
153181
}

BackEnd/src/modules/users/users.module.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,18 @@ import { Submission } from '../submissions/entities/submission.entity';
88
import { Payout } from '../payouts/entities/payout.entity';
99
import { CacheModule } from '../cache/cache.module';
1010
import { UserExperienceListener } from './events/user-experience.listener';
11+
import { DataExport } from './entities/data-export.entity';
12+
import { DataExportService } from './data-export.service';
13+
import { JobsModule } from '../jobs/jobs.module';
1114

1215
@Module({
1316
imports: [
14-
TypeOrmModule.forFeature([User, Quest, Submission, Payout]),
17+
TypeOrmModule.forFeature([User, Quest, Submission, Payout, DataExport]),
1518
CacheModule,
19+
JobsModule,
1620
],
1721
controllers: [UsersController],
18-
providers: [UsersService, UserExperienceListener],
19-
exports: [UsersService],
22+
providers: [UsersService, UserExperienceListener, DataExportService],
23+
exports: [UsersService, DataExportService],
2024
})
2125
export class UsersModule {}

0 commit comments

Comments
 (0)