diff --git a/.env.example b/.env.example index 4c046a2..4e9ed0e 100644 --- a/.env.example +++ b/.env.example @@ -6,4 +6,14 @@ DB_HOST=localhost DB_PORT=5432 DB_USERNAME=postgres DB_PASSWORD=password -DB_DATABASE=tradeflow \ No newline at end of file +DB_DATABASE=tradeflow + +# Soroban Event Indexer & WebSocket Configuration +SOROBAN_RPC_URL="https://soroban-testnet.stellar.org" +POOL_ADDRESS="CC..." # Replace with your Pool Contract ID +INDEXER_POLL_INTERVAL=5000 # Polling interval in ms +WS_PORT=3001 # WebSocket server port + +# Security Configuration +JWT_SECRET="YOUR_SUPER_SECRET_KEY_CHANGE_ME" +ADMIN_PASSWORD="admin_secure_password_123" \ No newline at end of file diff --git a/package.json b/package.json index af8e4d6..53e55d8 100644 --- a/package.json +++ b/package.json @@ -26,7 +26,9 @@ "prisma:generate": "prisma generate", "prisma:push": "prisma db push", "prisma:migrate": "prisma migrate dev", - "prisma:studio": "prisma studio" + "prisma:studio": "prisma studio", + "start:indexer": "node services/eventIndexer.js", + "start:all": "node server.js" }, "dependencies": { "@nestjs/common": "^10.4.22", diff --git a/server.js b/server.js new file mode 100644 index 0000000..9b8883a --- /dev/null +++ b/server.js @@ -0,0 +1,83 @@ +/** + * server.js + * + * Main Entry Point for combined Soroban Indexer and WebSocket streaming server. + * This file orchestrates the real-time data flow from the blockchain to the frontend. + */ + +const WebSocket = require('ws'); +const wsEvents = require('./services/wsEvents'); +const { startIndexer } = require('./services/eventIndexer'); + +// Config and Port Setup +const WS_PORT = process.env.WS_PORT || 3001; +const wss = new WebSocket.Server({ port: WS_PORT }); + +console.log('--- 🌐 TradeFlow Real-Time Stream Server ---'); +console.log(`📡 WebSocket server running on ws://localhost:${WS_PORT}`); + +// Connection tracking +let activeConnections = 0; + +/** + * Listen for incoming WebSocket connections. + */ +wss.on('connection', (ws) => { + activeConnections++; + console.log(`✅ New Web3 client connected. Active: ${activeConnections}`); + + // Initial Connection ACK + ws.send(JSON.stringify({ + event: 'INDEXER_CONNECTED', + status: 'ONLINE', + timestamp: new Date().toISOString() + })); + + ws.on('close', () => { + activeConnections--; + console.log(`❌ Web3 client disconnected. Active: ${activeConnections}`); + }); + + ws.on('error', (err) => { + console.error('⚠️ WS Socket Error:', err.message); + }); +}); + +/** + * BROADCASTER: Listens to the internal 'newTrade' event emitter. + * Broadcasts every new blockchain event caught by the Indexer daemon + * to all connected browser clients. + */ +wsEvents.on('newTrade', (tradeData) => { + console.log(`📣 BROADCASTING: New trade found in pool ${tradeData.poolId.slice(0, 8)}...`); + + const payload = JSON.stringify({ + event: 'NEW_TRADE_EVENT', + data: tradeData, + receivedAt: new Date().toISOString() + }); + + // Iterative broadcast to all active subscribers + wss.clients.forEach((client) => { + if (client.readyState === WebSocket.OPEN) { + client.send(payload); + } + }); +}); + +/** + * 🚀 START DAEMON ORCHESTRATION + * We start the Soroban Indexer in the same Node.js process to bridge + * the blockchain data with the WebSocket emitter via internal memory. + */ +startIndexer().catch((err) => { + console.error('❌ CRITICAL ERROR: Event Indexer failed to start:', err.message); + process.exit(1); +}); + +// Process Management +process.on('SIGTERM', () => { + console.log('🛑 Closing WebSocket connections and shutting down.'); + wss.close(); + process.exit(0); +}); diff --git a/services/eventIndexer.js b/services/eventIndexer.js new file mode 100644 index 0000000..f0882c0 --- /dev/null +++ b/services/eventIndexer.js @@ -0,0 +1,150 @@ +/** + * services/eventIndexer.js + * + * Background Soroban Event Listener daemon for TradeFlow-API. + * This service polls the Soroban RPC for events emitted by the specified Pool contract. + * When a 'Swap' event is detected, it parses the data and saves it to the database with Prisma. + */ + +const { rpc } = require('@stellar/stellar-sdk'); +const { PrismaClient } = require('@prisma/client'); +const { parseScVal } = require('./scValParser'); +const wsEvents = require('./wsEvents'); + +// In case dotenv is not installed as a top-level dependency, +// we try to load it safely. Most Node.js environments for this project should have it. +try { + require('dotenv').config(); +} catch (e) { + console.warn('⚠️ dotenv not loaded. Ensure environment variables are set manually.'); +} + +const prisma = new PrismaClient(); + +// Configuration +const RPC_URL = process.env.SOROBAN_RPC_URL || 'https://soroban-testnet.stellar.org'; +const POOL_ADDRESS = process.env.POOL_ADDRESS; +const POLL_INTERVAL = parseInt(process.env.INDEXER_POLL_INTERVAL || '5000'); + +if (!POOL_ADDRESS) { + console.error('❌ POOL_ADDRESS is not defined in environment variables.'); + console.error('Please add POOL_ADDRESS="YOUR_CONTRACT_ID" to your .env file.'); + process.exit(1); +} + +const server = new rpc.Server(RPC_URL); + +/** + * Main daemon loop to poll for Soroban events. + */ +async function startIndexer() { + console.log('--- 🚀 TradeFlow Soroban Event Indexer ---'); + console.log(`📡 RPC Node: ${RPC_URL}`); + console.log(`🎯 Pool Contract: ${POOL_ADDRESS}`); + console.log('-------------------------------------------'); + + // Start from the latest ledger initially + let currentLedgerSequence; + try { + const latestLedger = await server.getLatestLedger(); + currentLedgerSequence = latestLedger.sequence; + console.log(`Initial Start Ledger: ${currentLedgerSequence}`); + } catch (err) { + console.error('❌ Failed to connect to Soroban RPC. Verify your SOROBAN_RPC_URL.'); + process.exit(1); + } + + // Periodic polling + setInterval(async () => { + try { + const response = await server.getEvents({ + startLedger: currentLedgerSequence, + filters: [ + { + type: 'contract', + contractIds: [POOL_ADDRESS], + }, + ], + limit: 10, + }); + + if (response.events && response.events.length > 0) { + console.log(`Found ${response.events.length} new event(s). Processing...`); + + for (const event of response.events) { + // Process event + await handleContractEvent(event); + } + + // Advance ledger checkpoint + const latestProcessed = Math.max(...response.events.map(e => parseInt(e.ledger))); + currentLedgerSequence = latestProcessed + 1; + } + } catch (error) { + console.error('⚠️ Indexer Polling Error:', error.message); + } + }, POLL_INTERVAL); +} + +/** + * Handles an individual contract event. + * Filters for 'Swap' events and indexes them. + * + * @param {rpc.Api.GetEventsResponse.Event} event - The Soroban event from RPC. + */ +async function handleContractEvent(event) { + try { + // Decode topics to identify the event + const topics = event.topic.map(t => parseScVal(t)); + + // Check if topics contain "Swap" (case-insensitive) + const isSwapEvent = topics.some(topic => + typeof topic === 'string' && topic.toLowerCase() === 'swap' + ); + + if (isSwapEvent) { + console.log(`✅ Detected SwapEvent in ledger ${event.ledger}`); + + const payload = parseScVal(event.value); + if (!payload) return; + + console.log('Decoded Payload:', JSON.stringify(payload, null, 2)); + + // Map Soroban event data to our Prisma Trade model + // Expected structure from SwapEvent: { user, amount_in, amount_out } + const tradeData = { + poolId: event.contractId, + userAddress: payload.user || payload.address || 'Unknown', + amountIn: (payload.amount_in || payload.amountIn || '0').toString(), + amountOut: (payload.amount_out || payload.amountOut || '0').toString(), + timestamp: new Date(), + }; + + // Save to Database via Prisma + const savedTrade = await prisma.trade.create({ + data: tradeData + }); + + console.log(`💾 Indexed Trade saved. DB ID: ${savedTrade.id}`); + + // Trigger WebSocket broadcast + wsEvents.emit('newTrade', savedTrade); + } + } catch (error) { + console.error('❌ Failed to process event:', error.message); + } +} + +// Graceful Shut-off +process.on('SIGINT', async () => { + console.log('\n--- Indexer Shutting Down ---'); + await prisma.$disconnect(); + process.exit(0); +}); + +exports.startIndexer = startIndexer; + +// In standalone mode, starting the indexer automatically +if (require.main === module) { + startIndexer(); +} diff --git a/services/scValParser.js b/services/scValParser.js new file mode 100644 index 0000000..cc9d01e --- /dev/null +++ b/services/scValParser.js @@ -0,0 +1,58 @@ +/** + * services/scValParser.js + * + * Utility to decode Soroban XDR ScVal types into native JavaScript objects/JSON. + */ + +const { scValToNative } = require('@stellar/stellar-sdk'); + +/** + * Decodes a Soroban ScVal into its native JavaScript representation. + * Handles BigInt conversions to standard strings for JSON compatibility. + * + * @param {xdr.ScVal} scVal - The Soroban value to decode. + * @returns {any} - The native JavaScript value. + */ +function parseScVal(scVal) { + try { + const native = scValToNative(scVal); + return stringifyBigInts(native); + } catch (error) { + console.error('Error decoding Soroban XDR:', error.message); + return null; + } +} + +/** + * Recursively converts BigInt values to strings in an object/array. + * This is useful for Prisma and JSON serialization. + * + * @param {any} obj - The object to process. + * @returns {any} - The object with BigInts converted to strings. + */ +function stringifyBigInts(obj) { + if (typeof obj === 'bigint') { + return obj.toString(); + } + + if (Array.isArray(obj)) { + return obj.map(stringifyBigInts); + } + + if (obj !== null && typeof obj === 'object') { + const result = {}; + for (const key in obj) { + if (Object.prototype.hasOwnProperty.call(obj, key)) { + result[key] = stringifyBigInts(obj[key]); + } + } + return result; + } + + return obj; +} + +module.exports = { + parseScVal, + stringifyBigInts +}; diff --git a/services/wsEvents.js b/services/wsEvents.js new file mode 100644 index 0000000..29b36d4 --- /dev/null +++ b/services/wsEvents.js @@ -0,0 +1,4 @@ +const EventEmitter = require('events'); +const wsEvents = new EventEmitter(); + +module.exports = wsEvents; diff --git a/src/app.module.ts b/src/app.module.ts index 41468fe..6bc9c67 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -1,4 +1,4 @@ -import { Module } from '@nestjs/common'; +import { Module, NestModule, MiddlewareConsumer, RequestMethod } from '@nestjs/common'; import { APP_GUARD, APP_FILTER } from '@nestjs/core'; import { ThrottlerModule, ThrottlerGuard } from '@nestjs/throttler'; import { AppController } from './app.controller'; @@ -13,6 +13,7 @@ import { TokensModule } from './tokens/tokens.module'; import { ThrottlerExceptionFilter } from './common/filters/throttler-exception.filter'; import { AllExceptionsFilter } from './common/filters/all-exceptions.filter'; import { OgModule } from './og/og.module'; +import { RequireJwtMiddleware } from './common/middleware/require-jwt.middleware'; @Module({ imports: [PrismaModule, HealthModule, RiskModule, AuthModule, AnalyticsModule, SwapModule, TokensModule, OgModule], @@ -33,4 +34,13 @@ import { OgModule } from './og/og.module'; }, ], }) -export class AppModule {} +export class AppModule implements NestModule { + configure(consumer: MiddlewareConsumer) { + consumer + .apply(RequireJwtMiddleware) + .forRoutes( + { path: 'api/v1/webhook/soroban', method: RequestMethod.POST }, + { path: 'invoices', method: RequestMethod.POST }, // Database-mutating in AppController + ); + } +} diff --git a/src/auth/admin.controller.ts b/src/auth/admin.controller.ts new file mode 100644 index 0000000..f703d6f --- /dev/null +++ b/src/auth/admin.controller.ts @@ -0,0 +1,32 @@ +import { Controller, Post, Body, HttpException, HttpStatus } from '@nestjs/common'; +import { ApiTags, ApiOperation, ApiResponse } from '@nestjs/swagger'; +import { AuthService } from './auth.service'; + +@ApiTags('admin') +@Controller('api/v1/admin') +export class AdminController { + constructor(private readonly authService: AuthService) {} + + @Post('login') + @ApiOperation({ summary: 'Admin login for backend dashboard' }) + @ApiResponse({ + status: 200, + description: 'Admin login successful', + schema: { type: 'object', properties: { token: { type: 'string' } } } + }) + @ApiResponse({ status: 401, description: 'Unauthorized' }) + async login(@Body() body: { password: string }) { + if (!body.password) { + throw new HttpException('Password is required', HttpStatus.BAD_REQUEST); + } + + const isValid = await this.authService.verifyAdminPassword(body.password); + + if (!isValid) { + throw new HttpException('Invalid admin password', HttpStatus.UNAUTHORIZED); + } + + const token = this.authService.generateAdminJWT(); + return { token }; + } +} diff --git a/src/auth/auth.module.ts b/src/auth/auth.module.ts index a7d9fbc..6448d1a 100644 --- a/src/auth/auth.module.ts +++ b/src/auth/auth.module.ts @@ -1,9 +1,12 @@ import { Module } from '@nestjs/common'; import { AuthController } from './auth.controller'; +import { AdminController } from './admin.controller'; +import { WebhookController } from './webhook.controller'; import { AuthService } from './auth.service'; @Module({ - controllers: [AuthController], + controllers: [AuthController, AdminController, WebhookController], providers: [AuthService], + exports: [AuthService], // Export for middleware use }) export class AuthModule {} diff --git a/src/auth/auth.service.ts b/src/auth/auth.service.ts index bcfce2d..7c98707 100644 --- a/src/auth/auth.service.ts +++ b/src/auth/auth.service.ts @@ -7,11 +7,17 @@ import { Keypair } from '@stellar/stellar-sdk'; export class AuthService { private readonly jwtSecret = process.env.JWT_SECRET || 'fallback-secret-key'; private readonly jwtExpiration = '1h'; + private readonly adminExpiration = '24h'; + private readonly adminPassword = process.env.ADMIN_PASSWORD || 'admin123'; generateNonce(): string { return crypto.randomBytes(16).toString('hex'); } + async verifyAdminPassword(password: string): Promise { + return password === this.adminPassword; + } + async verifySignature(publicKey: string, signature: string, nonce: string): Promise { try { const message = `Sign in to TradeFlow with nonce: ${nonce}`; @@ -40,6 +46,17 @@ export class AuthService { }); } + generateAdminJWT(): string { + const payload = { + role: 'admin', + iat: Math.floor(Date.now() / 1000), + }; + + return jwt.sign(payload, this.jwtSecret, { + expiresIn: this.adminExpiration, + }); + } + verifyJWT(token: string): any { try { return jwt.verify(token, this.jwtSecret); diff --git a/src/auth/webhook.controller.ts b/src/auth/webhook.controller.ts new file mode 100644 index 0000000..40c136f --- /dev/null +++ b/src/auth/webhook.controller.ts @@ -0,0 +1,33 @@ +import { Controller, Post, Body, HttpCode, HttpStatus } from '@nestjs/common'; +import { ApiTags, ApiOperation, ApiResponse, ApiHeader } from '@nestjs/swagger'; + +@ApiTags('webhooks') +@Controller('api/v1/webhook') +export class WebhookController { + + @Post('soroban') + @HttpCode(HttpStatus.OK) + @ApiOperation({ + summary: 'Stellar Soroban event webhook receiver', + description: 'Receives and processes incoming smart contract events. Requires JWT authentication.' + }) + @ApiHeader({ + name: 'Authorization', + description: 'Bearer ', + required: true + }) + @ApiResponse({ status: 200, description: 'Event processed successfully' }) + @ApiResponse({ status: 401, description: 'Unauthorized' }) + async handleSorobanEvent(@Body() eventData: any) { + console.log('--- Incoming Soroban Event Webhook ---'); + console.log('Payload:', JSON.stringify(eventData, null, 2)); + + // In a production scenario, logic to respond to specific events + // would go here (e.g. updating internal state). + + return { + status: 'success', + receivedAt: new Date().toISOString() + }; + } +} diff --git a/src/common/middleware/require-jwt.middleware.ts b/src/common/middleware/require-jwt.middleware.ts new file mode 100644 index 0000000..97d94d1 --- /dev/null +++ b/src/common/middleware/require-jwt.middleware.ts @@ -0,0 +1,26 @@ +import { Injectable, NestMiddleware, UnauthorizedException } from '@nestjs/common'; +import { Request, Response, NextFunction } from 'express'; +import { AuthService } from '../../auth/auth.service'; + +@Injectable() +export class RequireJwtMiddleware implements NestMiddleware { + constructor(private readonly authService: AuthService) {} + + use(req: Request, res: Response, next: NextFunction) { + const authHeader = req.headers.authorization; + + if (!authHeader || !authHeader.startsWith('Bearer ')) { + throw new UnauthorizedException('Missing or invalid Authorization header'); + } + + const token = authHeader.split(' ')[1]; + + try { + const decoded = this.authService.verifyJWT(token); + req['user'] = decoded; + next(); + } catch (error) { + throw new UnauthorizedException('Invalid or expired token'); + } + } +}