Skip to content

Commit 1bb07ff

Browse files
committed
arch: implement real-time trade broadcasting via WebSockets and EventEmitter (#159)
1 parent 18dfa9c commit 1bb07ff

File tree

5 files changed

+197
-17
lines changed

5 files changed

+197
-17
lines changed

package-lock.json

Lines changed: 121 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
"prisma:push": "prisma db push",
2828
"prisma:migrate": "prisma migrate dev",
2929
"prisma:studio": "prisma studio",
30-
"start:indexer": "node services/eventIndexer.js"
30+
"start:indexer": "node services/eventIndexer.js",
31+
"start:ws": "node server.js"
3132
},
3233
"dependencies": {
3334
"@nestjs/common": "^10.4.22",
@@ -48,7 +49,8 @@
4849
"reflect-metadata": "^0.2.2",
4950
"rxjs": "^7.8.2",
5051
"swagger-ui-express": "^5.0.0",
51-
"typeorm": "^0.3.28"
52+
"typeorm": "^0.3.28",
53+
"ws": "^8.20.0"
5254
},
5355
"devDependencies": {
5456
"@nestjs/cli": "^11.0.16",

server.js

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
const http = require('http');
2+
const WebSocket = require('ws');
3+
const eventEmitter = require('./services/eventEmitter');
4+
5+
/**
6+
* TradeFlow WebSocket Server
7+
* Broadcasts real-time Soroban ledger updates to connected clients
8+
*/
9+
const server = http.createServer();
10+
const wss = new WebSocket.Server({ server });
11+
12+
// Track connected clients
13+
let clients = 0;
14+
15+
wss.on('connection', (ws) => {
16+
clients++;
17+
console.log(`[WS] New client connected. Active: ${clients}`);
18+
19+
// Welcome message
20+
ws.send(JSON.stringify({
21+
type: 'SUBSCRIPTION_SUCCESS',
22+
message: 'Connected to TradeFlow Real-time Data Pipeline'
23+
}));
24+
25+
ws.on('close', () => {
26+
clients--;
27+
console.log(`[WS] Client disconnected. Active: ${clients}`);
28+
});
29+
});
30+
31+
/**
32+
* Handle new trade events from the Soroban Indexer
33+
*/
34+
eventEmitter.on('newTrade', (trade) => {
35+
console.log(`[WS] Incoming trade from Indexer. Broadcasting to ${wss.clients.size} clients...`);
36+
37+
const payload = JSON.stringify({
38+
topic: 'TRADE_UPDATE',
39+
data: trade,
40+
timestamp: new Date().toISOString()
41+
});
42+
43+
wss.clients.forEach((client) => {
44+
if (client.readyState === WebSocket.OPEN) {
45+
client.send(payload);
46+
}
47+
});
48+
});
49+
50+
const WS_PORT = process.env.WS_PORT || 8080;
51+
52+
server.listen(WS_PORT, () => {
53+
console.log('==============================================');
54+
console.log(`🚀 TradeFlow WebSocket Server active on port ${WS_PORT}`);
55+
console.log('==============================================');
56+
});

services/eventEmitter.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
const EventEmitter = require('events');
2+
3+
/**
4+
* Global Event Emitter for internal service communication
5+
* This allows the eventIndexer to communicate with the WebSocket server
6+
*/
7+
const tradeFlowEvents = new EventEmitter();
8+
9+
module.exports = tradeFlowEvents;

services/eventIndexer.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const { rpc, xdr, scValToNative } = require('@stellar/stellar-sdk');
22
const { PrismaClient } = require('@prisma/client');
3+
const eventEmitter = require('./eventEmitter');
34

45
// Initialize Prisma Client
56
const prisma = new PrismaClient();
@@ -164,6 +165,12 @@ async function saveTradeToDb(tradeData) {
164165
});
165166

166167
console.log(`✅ Trade indexed successfully in DB: ${tradeData.userAddress.substring(0, 8)}...`);
168+
169+
// Emit trade event for WebSocket broadcasting
170+
eventEmitter.emit('newTrade', {
171+
...tradeData,
172+
poolAddress: POOL_ADDRESS
173+
});
167174
} catch (error) {
168175
console.error('Database insertion error:', error);
169176
}

0 commit comments

Comments
 (0)