Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,14 @@ DB_HOST=localhost
DB_PORT=5432
DB_USERNAME=postgres
DB_PASSWORD=password
DB_DATABASE=tradeflow
DB_DATABASE=tradeflow

# Soroban Event Indexer & WebSocket Configuration
SOROBAN_RPC_URL="https://soroban-testnet.stellar.org"
POOL_ADDRESS="CC..." # Replace with your Pool Contract ID
INDEXER_POLL_INTERVAL=5000 # Polling interval in ms
WS_PORT=3001 # WebSocket server port

# Security Configuration
JWT_SECRET="YOUR_SUPER_SECRET_KEY_CHANGE_ME"
ADMIN_PASSWORD="admin_secure_password_123"
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
"prisma:generate": "prisma generate",
"prisma:push": "prisma db push",
"prisma:migrate": "prisma migrate dev",
"prisma:studio": "prisma studio"
"prisma:studio": "prisma studio",
"start:indexer": "node services/eventIndexer.js",
"start:all": "node server.js"
},
"dependencies": {
"@nestjs/common": "^10.4.22",
Expand Down
83 changes: 83 additions & 0 deletions server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* server.js
*
* Main Entry Point for combined Soroban Indexer and WebSocket streaming server.
* This file orchestrates the real-time data flow from the blockchain to the frontend.
*/

const WebSocket = require('ws');
const wsEvents = require('./services/wsEvents');
const { startIndexer } = require('./services/eventIndexer');

// Config and Port Setup
const WS_PORT = process.env.WS_PORT || 3001;
const wss = new WebSocket.Server({ port: WS_PORT });

console.log('--- 🌐 TradeFlow Real-Time Stream Server ---');
console.log(`📡 WebSocket server running on ws://localhost:${WS_PORT}`);

// Connection tracking
let activeConnections = 0;

/**
* Listen for incoming WebSocket connections.
*/
wss.on('connection', (ws) => {
activeConnections++;
console.log(`✅ New Web3 client connected. Active: ${activeConnections}`);

// Initial Connection ACK
ws.send(JSON.stringify({
event: 'INDEXER_CONNECTED',
status: 'ONLINE',
timestamp: new Date().toISOString()
}));

ws.on('close', () => {
activeConnections--;
console.log(`❌ Web3 client disconnected. Active: ${activeConnections}`);
});

ws.on('error', (err) => {
console.error('⚠️ WS Socket Error:', err.message);
});
});

/**
* BROADCASTER: Listens to the internal 'newTrade' event emitter.
* Broadcasts every new blockchain event caught by the Indexer daemon
* to all connected browser clients.
*/
wsEvents.on('newTrade', (tradeData) => {
console.log(`📣 BROADCASTING: New trade found in pool ${tradeData.poolId.slice(0, 8)}...`);

const payload = JSON.stringify({
event: 'NEW_TRADE_EVENT',
data: tradeData,
receivedAt: new Date().toISOString()
});

// Iterative broadcast to all active subscribers
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(payload);
}
});
});

/**
* 🚀 START DAEMON ORCHESTRATION
* We start the Soroban Indexer in the same Node.js process to bridge
* the blockchain data with the WebSocket emitter via internal memory.
*/
startIndexer().catch((err) => {
console.error('❌ CRITICAL ERROR: Event Indexer failed to start:', err.message);
process.exit(1);
});

// Process Management
process.on('SIGTERM', () => {
console.log('🛑 Closing WebSocket connections and shutting down.');
wss.close();
process.exit(0);
});
150 changes: 150 additions & 0 deletions services/eventIndexer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/**
* services/eventIndexer.js
*
* Background Soroban Event Listener daemon for TradeFlow-API.
* This service polls the Soroban RPC for events emitted by the specified Pool contract.
* When a 'Swap' event is detected, it parses the data and saves it to the database with Prisma.
*/

const { rpc } = require('@stellar/stellar-sdk');
const { PrismaClient } = require('@prisma/client');
const { parseScVal } = require('./scValParser');
const wsEvents = require('./wsEvents');

// In case dotenv is not installed as a top-level dependency,
// we try to load it safely. Most Node.js environments for this project should have it.
try {
require('dotenv').config();
} catch (e) {
console.warn('⚠️ dotenv not loaded. Ensure environment variables are set manually.');
}

const prisma = new PrismaClient();

// Configuration
const RPC_URL = process.env.SOROBAN_RPC_URL || 'https://soroban-testnet.stellar.org';
const POOL_ADDRESS = process.env.POOL_ADDRESS;
const POLL_INTERVAL = parseInt(process.env.INDEXER_POLL_INTERVAL || '5000');

if (!POOL_ADDRESS) {
console.error('❌ POOL_ADDRESS is not defined in environment variables.');
console.error('Please add POOL_ADDRESS="YOUR_CONTRACT_ID" to your .env file.');
process.exit(1);
}

const server = new rpc.Server(RPC_URL);

/**
* Main daemon loop to poll for Soroban events.
*/
async function startIndexer() {
console.log('--- 🚀 TradeFlow Soroban Event Indexer ---');
console.log(`📡 RPC Node: ${RPC_URL}`);
console.log(`🎯 Pool Contract: ${POOL_ADDRESS}`);
console.log('-------------------------------------------');

// Start from the latest ledger initially
let currentLedgerSequence;
try {
const latestLedger = await server.getLatestLedger();
currentLedgerSequence = latestLedger.sequence;
console.log(`Initial Start Ledger: ${currentLedgerSequence}`);
} catch (err) {
console.error('❌ Failed to connect to Soroban RPC. Verify your SOROBAN_RPC_URL.');
process.exit(1);
}

// Periodic polling
setInterval(async () => {
try {
const response = await server.getEvents({
startLedger: currentLedgerSequence,
filters: [
{
type: 'contract',
contractIds: [POOL_ADDRESS],
},
],
limit: 10,
});

if (response.events && response.events.length > 0) {
console.log(`Found ${response.events.length} new event(s). Processing...`);

for (const event of response.events) {
// Process event
await handleContractEvent(event);
}

// Advance ledger checkpoint
const latestProcessed = Math.max(...response.events.map(e => parseInt(e.ledger)));
currentLedgerSequence = latestProcessed + 1;
}
} catch (error) {
console.error('⚠️ Indexer Polling Error:', error.message);
}
}, POLL_INTERVAL);
}

/**
* Handles an individual contract event.
* Filters for 'Swap' events and indexes them.
*
* @param {rpc.Api.GetEventsResponse.Event} event - The Soroban event from RPC.
*/
async function handleContractEvent(event) {
try {
// Decode topics to identify the event
const topics = event.topic.map(t => parseScVal(t));

// Check if topics contain "Swap" (case-insensitive)
const isSwapEvent = topics.some(topic =>
typeof topic === 'string' && topic.toLowerCase() === 'swap'
);

if (isSwapEvent) {
console.log(`✅ Detected SwapEvent in ledger ${event.ledger}`);

const payload = parseScVal(event.value);
if (!payload) return;

console.log('Decoded Payload:', JSON.stringify(payload, null, 2));

// Map Soroban event data to our Prisma Trade model
// Expected structure from SwapEvent: { user, amount_in, amount_out }
const tradeData = {
poolId: event.contractId,
userAddress: payload.user || payload.address || 'Unknown',
amountIn: (payload.amount_in || payload.amountIn || '0').toString(),
amountOut: (payload.amount_out || payload.amountOut || '0').toString(),
timestamp: new Date(),
};

// Save to Database via Prisma
const savedTrade = await prisma.trade.create({
data: tradeData
});

console.log(`💾 Indexed Trade saved. DB ID: ${savedTrade.id}`);

// Trigger WebSocket broadcast
wsEvents.emit('newTrade', savedTrade);
}
} catch (error) {
console.error('❌ Failed to process event:', error.message);
}
}

// Graceful Shut-off
process.on('SIGINT', async () => {
console.log('\n--- Indexer Shutting Down ---');
await prisma.$disconnect();
process.exit(0);
});

exports.startIndexer = startIndexer;

// In standalone mode, starting the indexer automatically
if (require.main === module) {
startIndexer();
}
58 changes: 58 additions & 0 deletions services/scValParser.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* services/scValParser.js
*
* Utility to decode Soroban XDR ScVal types into native JavaScript objects/JSON.
*/

const { scValToNative } = require('@stellar/stellar-sdk');

/**
* Decodes a Soroban ScVal into its native JavaScript representation.
* Handles BigInt conversions to standard strings for JSON compatibility.
*
* @param {xdr.ScVal} scVal - The Soroban value to decode.
* @returns {any} - The native JavaScript value.
*/
function parseScVal(scVal) {
try {
const native = scValToNative(scVal);
return stringifyBigInts(native);
} catch (error) {
console.error('Error decoding Soroban XDR:', error.message);
return null;
}
}

/**
* Recursively converts BigInt values to strings in an object/array.
* This is useful for Prisma and JSON serialization.
*
* @param {any} obj - The object to process.
* @returns {any} - The object with BigInts converted to strings.
*/
function stringifyBigInts(obj) {
if (typeof obj === 'bigint') {
return obj.toString();
}

if (Array.isArray(obj)) {
return obj.map(stringifyBigInts);
}

if (obj !== null && typeof obj === 'object') {
const result = {};
for (const key in obj) {
if (Object.prototype.hasOwnProperty.call(obj, key)) {
result[key] = stringifyBigInts(obj[key]);
}
}
return result;
}

return obj;
}

module.exports = {
parseScVal,
stringifyBigInts
};
4 changes: 4 additions & 0 deletions services/wsEvents.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
const EventEmitter = require('events');
const wsEvents = new EventEmitter();

module.exports = wsEvents;
14 changes: 12 additions & 2 deletions src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Module } from '@nestjs/common';
import { Module, NestModule, MiddlewareConsumer, RequestMethod } from '@nestjs/common';
import { APP_GUARD, APP_FILTER } from '@nestjs/core';
import { ThrottlerModule, ThrottlerGuard } from '@nestjs/throttler';
import { AppController } from './app.controller';
Expand All @@ -13,6 +13,7 @@ import { TokensModule } from './tokens/tokens.module';
import { ThrottlerExceptionFilter } from './common/filters/throttler-exception.filter';
import { AllExceptionsFilter } from './common/filters/all-exceptions.filter';
import { OgModule } from './og/og.module';
import { RequireJwtMiddleware } from './common/middleware/require-jwt.middleware';

@Module({
imports: [PrismaModule, HealthModule, RiskModule, AuthModule, AnalyticsModule, SwapModule, TokensModule, OgModule],
Expand All @@ -33,4 +34,13 @@ import { OgModule } from './og/og.module';
},
],
})
export class AppModule {}
export class AppModule implements NestModule {
configure(consumer: MiddlewareConsumer) {
consumer
.apply(RequireJwtMiddleware)
.forRoutes(
{ path: 'api/v1/webhook/soroban', method: RequestMethod.POST },
{ path: 'invoices', method: RequestMethod.POST }, // Database-mutating in AppController
);
}
}
32 changes: 32 additions & 0 deletions src/auth/admin.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { Controller, Post, Body, HttpException, HttpStatus } from '@nestjs/common';
import { ApiTags, ApiOperation, ApiResponse } from '@nestjs/swagger';
import { AuthService } from './auth.service';

@ApiTags('admin')
@Controller('api/v1/admin')
export class AdminController {
constructor(private readonly authService: AuthService) {}

@Post('login')
@ApiOperation({ summary: 'Admin login for backend dashboard' })
@ApiResponse({
status: 200,
description: 'Admin login successful',
schema: { type: 'object', properties: { token: { type: 'string' } } }
})
@ApiResponse({ status: 401, description: 'Unauthorized' })
async login(@Body() body: { password: string }) {
if (!body.password) {
throw new HttpException('Password is required', HttpStatus.BAD_REQUEST);
}

const isValid = await this.authService.verifyAdminPassword(body.password);

if (!isValid) {
throw new HttpException('Invalid admin password', HttpStatus.UNAUTHORIZED);
}

const token = this.authService.generateAdminJWT();
return { token };
}
}
Loading
Loading