diff --git a/.env.example b/.env.example index c676c99..1095888 100644 --- a/.env.example +++ b/.env.example @@ -5,6 +5,8 @@ PORT=3000 HORIZON_URL=https://horizon-testnet.stellar.org NETWORK_PASSPHRASE=Test SDF Network ; September 2015 -# Future additions (not yet required) -# DATABASE_URL=postgresql://user:password@localhost:5432/fluxora +# Database configuration +DATABASE_URL=postgresql://user:password@localhost:5432/fluxora + +# Auth (Future additions) # JWT_SECRET=your-secret-here diff --git a/migrations/1774715131962_streams-table.ts b/migrations/1774715131962_streams-table.ts new file mode 100644 index 0000000..c373125 --- /dev/null +++ b/migrations/1774715131962_streams-table.ts @@ -0,0 +1,71 @@ +import { ColumnDefinitions, MigrationBuilder } from 'node-pg-migrate'; + +export const shorthands: ColumnDefinitions | undefined = undefined; + +export async function up(pgm: MigrationBuilder): Promise { + pgm.createTable('streams', { + id: { type: 'text', primaryKey: true }, + sender_address: { type: 'text', notNull: true }, + recipient_address: { type: 'text', notNull: true }, + amount: { type: 'text', notNull: true }, + streamed_amount: { type: 'text', notNull: true, default: '0' }, + remaining_amount: { type: 'text', notNull: true }, + rate_per_second: { type: 'text', notNull: true }, + start_time: { type: 'bigint', notNull: true }, + end_time: { type: 'bigint', notNull: true, default: 0 }, + status: { type: 'text', notNull: true, default: 'active' }, + contract_id: { type: 'text', notNull: true }, + transaction_hash: { type: 'text', notNull: true }, + event_index: { type: 'integer', notNull: true }, + created_at: { + type: 'timestamp with time zone', + notNull: true, + default: pgm.func('current_timestamp'), + }, + updated_at: { + type: 'timestamp with time zone', + notNull: true, + default: pgm.func('current_timestamp'), + }, + }); + + // Unique constraint for idempotency + pgm.addConstraint('streams', 'idx_streams_unique_event', { + unique: ['transaction_hash', 'event_index'], + }); + + // Indexes for common query patterns + pgm.createIndex('streams', 'status'); + pgm.createIndex('streams', 'sender_address'); + pgm.createIndex('streams', 'recipient_address'); + pgm.createIndex('streams', 'contract_id'); + pgm.createIndex('streams', 'created_at'); + + // Contract events table for the indexer service + pgm.createTable('contract_events', { + event_id: { type: 'text', primaryKey: true }, + ledger: { type: 'integer', notNull: true }, + contract_id: { type: 'text', notNull: true }, + topic: { type: 'text', notNull: true }, + tx_hash: { type: 'text', notNull: true }, + tx_index: { type: 'integer', notNull: true }, + operation_index: { type: 'integer', notNull: true }, + event_index: { type: 'integer', notNull: true }, + payload: { type: 'jsonb', notNull: true }, + happened_at: { type: 'timestamp with time zone', notNull: true }, + ingested_at: { + type: 'timestamp with time zone', + notNull: true, + default: pgm.func('current_timestamp'), + }, + }); + + pgm.createIndex('contract_events', 'contract_id'); + pgm.createIndex('contract_events', 'tx_hash'); + pgm.createIndex('contract_events', 'happened_at'); +} + +export async function down(pgm: MigrationBuilder): Promise { + pgm.dropTable('contract_events'); + pgm.dropTable('streams'); +} diff --git a/package.json b/package.json index 4ee0ca7..a5f2f4b 100644 --- a/package.json +++ b/package.json @@ -8,8 +8,9 @@ "start": "node dist/index.js", "dev": "tsx watch src/index.ts", "test": "vitest run", - "test:watch": "vitest", - "test:coverage": "vitest run --coverage" + "test:coverage": "vitest run --coverage", + "db:migrate": "node-pg-migrate -j ts", + "db:migrate:make": "node-pg-migrate create -j ts" }, "dependencies": { "better-sqlite3": "^12.8.0", @@ -20,8 +21,8 @@ "zod": "^4.3.6" }, "devDependencies": { - "@types/better-sqlite3": "^7.6.13", "@jest/globals": "^29.7.0", + "@types/better-sqlite3": "^7.6.13", "@types/express": "^4.17.21", "@types/jest": "^30.0.0", "@types/jsonwebtoken": "^9.0.10", @@ -38,4 +39,4 @@ "typescript": "^5.3.3", "vitest": "^1.6.1" } -} +} \ No newline at end of file diff --git a/src/db/migrate.ts b/src/db/migrate.ts index c578396..454d937 100644 --- a/src/db/migrate.ts +++ b/src/db/migrate.ts @@ -1,158 +1,56 @@ /** * Database migration runner * - * Applies migrations in order, tracking which have been applied. - * Uses SQLite's built-in journaling for atomic commits. + * Uses node-pg-migrate to apply migrations to PostgreSQL. * * @module db/migrate */ -import Database from "better-sqlite3"; -import { info, warn, error as logError } from "../utils/logger.js"; -import path from "path"; -import { fileURLToPath } from "url"; +import { runner } from 'node-pg-migrate'; +import { info, error as logError } from '../utils/logger.js'; +import path from 'path'; +import { fileURLToPath } from 'url'; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); -// Import migrations - they will be executed in order by filename -const migrations: { up: string; down: string }[] = []; - -async function loadMigrations(): Promise { - const migrationFiles = await Promise.all([ - import("./migrations/001_create_streams_table.js"), - ]); - - for (const mod of migrationFiles) { - migrations.push({ up: mod.up, down: mod.down }); - } -} - -export interface MigrationResult { - applied: string[]; - failed: string[]; -} - /** * Run all pending migrations */ -export function migrate( - db: Database.Database, - direction: "up" | "down" = "up", -): MigrationResult { - const applied: string[] = []; - const failed: string[] = []; - - // Ensure migrations table exists - db.exec(` - CREATE TABLE IF NOT EXISTS _migrations ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL UNIQUE, - applied_at TEXT NOT NULL DEFAULT (datetime('now')) - ); - `); - - // Get already applied migrations - const appliedMigrations = db - .prepare("SELECT name FROM _migrations ORDER BY id") - .all() as { name: string }[]; - const appliedNames = new Set(appliedMigrations.map((m) => m.name)); - - const migrationNames = migrations.map( - (_, i) => `00${i + 1}_create_streams_table`, - ); - - if (direction === "up") { - // Apply pending migrations - for (let i = 0; i < migrations.length; i++) { - const name = migrationNames[i]; - - if (appliedNames.has(name)) { - info(`Migration ${name} already applied, skipping`); - continue; - } +export async function migrate(): Promise { + const databaseUrl = process.env.DATABASE_URL; - try { - info(`Applying migration: ${name}`); - db.exec(migrations[i].up); - - // Record successful migration - db.prepare("INSERT INTO _migrations (name) VALUES (?)").run(name); - applied.push(name); - info(`Successfully applied: ${name}`); - } catch (err) { - const message = err instanceof Error ? err.message : "Unknown error"; - logError(`Failed to apply migration ${name}: ${message}`); - failed.push(name); - break; - } - } - } else { - // Rollback in reverse order - for (let i = migrations.length - 1; i >= 0; i--) { - const name = migrationNames[i]; - - if (!appliedNames.has(name)) { - warn(`Migration ${name} not applied, skipping rollback`); - continue; - } - - try { - info(`Rolling back migration: ${name}`); - db.exec(migrations[i].down); - db.prepare("DELETE FROM _migrations WHERE name = ?").run(name); - applied.push(name); - info(`Successfully rolled back: ${name}`); - } catch (err) { - const message = err instanceof Error ? err.message : "Unknown error"; - logError(`Failed to rollback migration ${name}: ${message}`); - failed.push(name); - break; - } - } + if (!databaseUrl) { + throw new Error('DATABASE_URL environment variable is required for migrations'); } - return { applied, failed }; + try { + info('Running database migrations...'); + + await runner({ + databaseUrl, + dir: path.join(__dirname, '../../migrations'), + direction: 'up', + migrationsTable: 'pgmigrations', + count: Infinity, + logger: { + info: (msg: string) => info(msg), + warn: (msg: string) => info(msg), // Mapping warn to info for cleaner logs + error: (msg: string) => logError(msg), + }, + }); + + info('Migrations completed successfully'); + } catch (err) { + const message = err instanceof Error ? err.message : 'Unknown error'; + logError(`Migration failure: ${message}`); + throw err; + } } /** - * Get current migration status + * Initialize migrations as part of setup */ -export function getMigrationStatus(db: Database.Database): { - total: number; - applied: number; - pending: number; -} { - const total = migrations.length; - - const appliedCount = db - .prepare("SELECT COUNT(*) as count FROM _migrations") - .get() as { count: number }; - - return { - total, - applied: appliedCount.count, - pending: total - appliedCount.count, - }; -} - -// Auto-run migrations when module is imported -export async function initializeMigrations( - db: Database.Database, -): Promise { - await loadMigrations(); - - const status = getMigrationStatus(db); - info("Migration status", { - total: status.total, - applied: status.applied, - pending: status.pending, - }); - - if (status.pending > 0) { - const result = migrate(db, "up"); - if (result.failed.length > 0) { - throw new Error(`Migration failed: ${result.failed.join(", ")}`); - } - } +export async function initializeMigrations(): Promise { + await migrate(); } diff --git a/src/index.ts b/src/index.ts index d130c20..19bdc24 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,7 +1,19 @@ -import { app } from './app.js'; -import { initializeConfig, getConfig, resetConfig } from './config/env.js'; -import { info, error } from './utils/logger.js'; +/** + * Fluxora Backend — server entry point. + * + * Responsibilities: + * - Bind the Express app to a TCP port. + * - Register OS signal handlers for graceful shutdown. + * + * Everything else (routes, middleware, app config) lives in app.ts. + * Shutdown logic (drain + hooks) lives in shutdown.ts. + */ + +import http from 'node:http'; +import { createApp } from './app.js'; import { gracefulShutdown } from './shutdown.js'; +import { logger } from './lib/logger.js'; +import { initializeMigrations } from './db/migrate.js'; async function start() { try { @@ -9,16 +21,26 @@ async function start() { const config = initializeConfig(); const { port, nodeEnv, apiVersion } = config; - const server = app.listen(port, () => { - info(`Fluxora API v${apiVersion} started`, { - port, - env: nodeEnv, - pid: process.pid, - }); - }); +const app = createApp(); +const server = http.createServer(app); + +async function startServer() { + try { + // Run migrations before starting the server + await initializeMigrations(); + + server.listen(PORT, () => { + logger.info('Fluxora API listening', undefined, { port: PORT }); + }); + } catch (err) { + logger.error('Failed to start Fluxora API', undefined, { + error: err instanceof Error ? err.message : 'Unknown error', + }); + process.exit(1); + } +} - // Initialize graceful shutdown handler - gracefulShutdown(server); +void startServer(); } catch (err) { error('Failed to start application', {}, err as Error); diff --git a/src/lib/stellar.ts b/src/lib/stellar.ts index e91b343..de64ac7 100644 --- a/src/lib/stellar.ts +++ b/src/lib/stellar.ts @@ -12,6 +12,7 @@ export interface VerifiedStream { ratePerSecond: bigint; startTime: number; endTime: number; + contractId?: string; } /** diff --git a/src/routes/streams.ts b/src/routes/streams.ts index 9b57467..a8c2c47 100644 --- a/src/routes/streams.ts +++ b/src/routes/streams.ts @@ -1,15 +1,10 @@ import { Router, Request, Response } from 'express'; import { getStreamById } from '../db/client.js'; import { - validateDecimalString, - validateAmountFields, formatFromStroops, - parseToStroops, } from '../serialization/decimal.js'; import { - ApiError, - ApiErrorCode, notFound, validationError, conflictError, @@ -193,7 +188,7 @@ streamsRouter.get( streamsRouter.post( '/', requireAuth, - asyncHandler(async (req: Request, res: Response) => { + asyncHandler(async (req: Request, res: Response): Promise => { const { transactionHash } = req.body ?? {}; const idempotencyKey = parseIdempotencyKey(req.header('Idempotency-Key')); @@ -218,9 +213,9 @@ streamsRouter.post( info('Verifying on-chain stream', { transactionHash }); - // Trust boundary: Verify the transaction on Stellar const verified = await verifyStreamOnChain(transactionHash); + const pool = getPool(); const id = `stream-${transactionHash.slice(0, 8)}`; const stream: Stream = { id, @@ -269,7 +264,7 @@ streamsRouter.delete( throw conflictError('Cannot cancel a completed stream'); } - streams[index] = { ...stream, status: 'cancelled' }; + await query(pool, "UPDATE streams SET status = 'cancelled', updated_at = CURRENT_TIMESTAMP WHERE id = $1", [id]); recordAuditEvent('STREAM_CANCELLED', 'stream', id, (req as any).correlationId); info('Stream cancelled', { id });