Skip to content

Commit 9f4179a

Browse files
authored
Merge branch 'main' into feat/wallet-ux
2 parents aaa6306 + 182a5ff commit 9f4179a

File tree

12 files changed

+1449
-57
lines changed

12 files changed

+1449
-57
lines changed

backend/src/index.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ import { stellarRouter } from './routes/stellar.js';
1010
import { catalogRouter } from './routes/catalog.js';
1111
import { jobsRouter } from './routes/jobs.js';
1212
import { healthRouter } from './routes/health.js';
13+
import { queueRouter } from './routes/queue.js';
1314
import { slaRouter } from './routes/sla.js';
1415
import { startJobs, getJobScheduler } from './jobs/index.js';
1516
import { errorHandler, notFoundHandler, AppError } from './middleware/errorHandler.js';
17+
import { messageQueue } from './services/queue.js';
18+
import { registerDefaultProcessors } from './services/queue-producers.js';
1619
import { slaTrackingMiddleware } from './middleware/slaTracking.js';
1720

1821
dotenv.config();
@@ -183,6 +186,7 @@ apiV1Router.use('/invoice', invoiceLimiter, invoiceRouter);
183186
apiV1Router.use('/stellar', stellarRouter);
184187
apiV1Router.use('/catalog', catalogRouter);
185188
apiV1Router.use('/jobs', jobsRouter);
189+
apiV1Router.use('/queue', queueRouter);
186190
apiV1Router.use('/sla', slaRouter);
187191

188192
// Explicit URL-based mounting
@@ -209,6 +213,13 @@ if (jobsEnabled) {
209213
startJobs();
210214
}
211215

216+
// Initialize message queue
217+
registerDefaultProcessors();
218+
const queueEnabled = process.env.QUEUE_ENABLED !== 'false';
219+
if (queueEnabled) {
220+
messageQueue.start();
221+
}
222+
212223
const server = app.listen(PORT, () => {
213224
console.log(`AgenticPay backend running on port ${PORT}`);
214225
});
@@ -230,6 +241,13 @@ const shutdown = (signal: string) => {
230241
console.error('Error stopping scheduler:', err);
231242
}
232243

244+
try {
245+
messageQueue.stop();
246+
console.log('Message queue stopped.');
247+
} catch (err) {
248+
console.error('Error stopping message queue:', err);
249+
}
250+
233251
console.log('Graceful shutdown complete. Exiting.');
234252
process.exit(0);
235253
});

backend/src/routes/queue.ts

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
/**
2+
* Queue Routes
3+
* API endpoints for managing message queue and monitoring jobs
4+
*/
5+
6+
import { Router } from 'express';
7+
import { messageQueue, JobStatus } from '../services/queue.js';
8+
import {
9+
queueEmail,
10+
queueNotification,
11+
queueWebhook,
12+
EmailJobData,
13+
NotificationJobData,
14+
WebhookJobData,
15+
} from '../services/queue-producers.js';
16+
import { asyncHandler, AppError } from '../middleware/errorHandler.js';
17+
18+
export const queueRouter = Router();
19+
20+
const allowedStatuses: JobStatus[] = ['pending', 'processing', 'completed', 'failed', 'retrying'];
21+
22+
function getParamAsString(param: string | string[]): string {
23+
return Array.isArray(param) ? param[0] : param;
24+
}
25+
26+
/**
27+
* POST /api/v1/queue/email
28+
* Queue an email to be sent asynchronously
29+
*/
30+
queueRouter.post(
31+
'/email',
32+
asyncHandler(async (req, res) => {
33+
const { to, subject, body, html, from } = req.body as EmailJobData;
34+
35+
if (!to || !subject || !body) {
36+
throw new AppError(400, 'Missing required fields: to, subject, body', 'INVALID_REQUEST');
37+
}
38+
39+
const job = await queueEmail({ to, subject, body, html, from });
40+
41+
res.status(202).json({
42+
message: 'Email queued for delivery',
43+
jobId: job.id,
44+
status: job.status,
45+
queue: job.queue,
46+
});
47+
})
48+
);
49+
50+
/**
51+
* POST /api/v1/queue/notification
52+
* Queue a notification to be delivered asynchronously
53+
*/
54+
queueRouter.post(
55+
'/notification',
56+
asyncHandler(async (req, res) => {
57+
const { userId, type, title, message, metadata } = req.body as NotificationJobData;
58+
59+
if (!userId || !type || !title || !message) {
60+
throw new AppError(
61+
400,
62+
'Missing required fields: userId, type, title, message',
63+
'INVALID_REQUEST'
64+
);
65+
}
66+
67+
const job = await queueNotification({ userId, type, title, message, metadata });
68+
69+
res.status(202).json({
70+
message: 'Notification queued for delivery',
71+
jobId: job.id,
72+
status: job.status,
73+
queue: job.queue,
74+
});
75+
})
76+
);
77+
78+
/**
79+
* POST /api/v1/queue/webhook
80+
* Queue a webhook call to be delivered asynchronously
81+
*/
82+
queueRouter.post(
83+
'/webhook',
84+
asyncHandler(async (req, res) => {
85+
const { url, method = 'POST', headers, body, timeout } = req.body as WebhookJobData;
86+
87+
if (!url) {
88+
throw new AppError(400, 'Missing required field: url', 'INVALID_REQUEST');
89+
}
90+
91+
const job = await queueWebhook({ url, method, headers, body, timeout });
92+
93+
res.status(202).json({
94+
message: 'Webhook queued for delivery',
95+
jobId: job.id,
96+
status: job.status,
97+
queue: job.queue,
98+
});
99+
})
100+
);
101+
102+
/**
103+
* GET /api/v1/queue/jobs
104+
* Get all queued jobs or filter by queue/status
105+
*/
106+
queueRouter.get(
107+
'/jobs',
108+
asyncHandler(async (req, res) => {
109+
const queue = req.query.queue as string | undefined;
110+
const status = req.query.status as string | undefined;
111+
112+
let jobs = messageQueue.getAllJobs();
113+
114+
if (queue) {
115+
jobs = messageQueue.getJobsByQueue(queue);
116+
} else if (status) {
117+
if (!allowedStatuses.includes(status as JobStatus)) {
118+
throw new AppError(400, `Invalid status: ${status}`, 'INVALID_REQUEST');
119+
}
120+
jobs = messageQueue.getJobsByStatus(status as JobStatus);
121+
}
122+
123+
res.json({
124+
data: jobs,
125+
count: jobs.length,
126+
timestamp: new Date(),
127+
});
128+
})
129+
);
130+
131+
/**
132+
* GET /api/v1/queue/jobs/:jobId
133+
* Get a specific job by ID
134+
*/
135+
queueRouter.get(
136+
'/jobs/:jobId',
137+
asyncHandler(async (req, res) => {
138+
const jobId = getParamAsString(req.params.jobId);
139+
const job = messageQueue.getJob(jobId);
140+
141+
if (!job) {
142+
throw new AppError(404, `Job not found: ${jobId}`, 'JOB_NOT_FOUND');
143+
}
144+
145+
res.json({
146+
data: job,
147+
timestamp: new Date(),
148+
});
149+
})
150+
);
151+
152+
/**
153+
* POST /api/v1/queue/jobs/:jobId/retry
154+
* Retry a failed job
155+
*/
156+
queueRouter.post(
157+
'/jobs/:jobId/retry',
158+
asyncHandler(async (req, res) => {
159+
const jobId = getParamAsString(req.params.jobId);
160+
const retried = messageQueue.retryJob(jobId);
161+
162+
if (!retried) {
163+
throw new AppError(400, 'Job cannot be retried', 'INVALID_STATE');
164+
}
165+
166+
res.json({
167+
message: 'Job scheduled for retry',
168+
jobId,
169+
timestamp: new Date(),
170+
});
171+
})
172+
);
173+
174+
/**
175+
* DELETE /api/v1/queue/jobs/:jobId
176+
* Delete a job
177+
*/
178+
queueRouter.delete(
179+
'/jobs/:jobId',
180+
asyncHandler(async (req, res) => {
181+
const jobId = getParamAsString(req.params.jobId);
182+
const deleted = messageQueue.deleteJob(jobId);
183+
184+
if (!deleted) {
185+
throw new AppError(404, `Job not found: ${jobId}`, 'JOB_NOT_FOUND');
186+
}
187+
188+
res.json({
189+
message: 'Job deleted',
190+
jobId,
191+
timestamp: new Date(),
192+
});
193+
})
194+
);
195+
196+
/**
197+
* GET /api/v1/queue/stats
198+
* Get queue statistics
199+
*/
200+
queueRouter.get(
201+
'/stats',
202+
asyncHandler(async (req, res) => {
203+
const stats = messageQueue.getStats();
204+
205+
res.json({
206+
data: stats,
207+
timestamp: new Date(),
208+
});
209+
})
210+
);
211+
212+
/**
213+
* DELETE /api/v1/queue/clear
214+
* Clear jobs by status
215+
*/
216+
queueRouter.delete(
217+
'/clear',
218+
asyncHandler(async (req, res) => {
219+
const status = req.query.status as string | undefined;
220+
221+
if (!status) {
222+
throw new AppError(400, 'Status query parameter is required', 'INVALID_REQUEST');
223+
}
224+
225+
if (!allowedStatuses.includes(status as JobStatus)) {
226+
throw new AppError(400, `Invalid status: ${status}`, 'INVALID_REQUEST');
227+
}
228+
229+
const cleared = messageQueue.clearByStatus(status as JobStatus);
230+
231+
res.json({
232+
message: `Cleared ${cleared} jobs with status: ${status}`,
233+
cleared,
234+
timestamp: new Date(),
235+
});
236+
})
237+
);

0 commit comments

Comments
 (0)