Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ PORT=3000
HORIZON_URL=https://horizon-testnet.stellar.org
NETWORK_PASSPHRASE=Test SDF Network ; September 2015

# Future additions (not yet required)
# DATABASE_URL=postgresql://user:password@localhost:5432/fluxora
# Database configuration
DATABASE_URL=postgresql://user:password@localhost:5432/fluxora

# Auth (Future additions)
# JWT_SECRET=your-secret-here
71 changes: 71 additions & 0 deletions migrations/1774715131962_streams-table.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { ColumnDefinitions, MigrationBuilder } from 'node-pg-migrate';

export const shorthands: ColumnDefinitions | undefined = undefined;

export async function up(pgm: MigrationBuilder): Promise<void> {
pgm.createTable('streams', {
id: { type: 'text', primaryKey: true },
sender_address: { type: 'text', notNull: true },
recipient_address: { type: 'text', notNull: true },
amount: { type: 'text', notNull: true },
streamed_amount: { type: 'text', notNull: true, default: '0' },
remaining_amount: { type: 'text', notNull: true },
rate_per_second: { type: 'text', notNull: true },
start_time: { type: 'bigint', notNull: true },
end_time: { type: 'bigint', notNull: true, default: 0 },
status: { type: 'text', notNull: true, default: 'active' },
contract_id: { type: 'text', notNull: true },
transaction_hash: { type: 'text', notNull: true },
event_index: { type: 'integer', notNull: true },
created_at: {
type: 'timestamp with time zone',
notNull: true,
default: pgm.func('current_timestamp'),
},
updated_at: {
type: 'timestamp with time zone',
notNull: true,
default: pgm.func('current_timestamp'),
},
});

// Unique constraint for idempotency
pgm.addConstraint('streams', 'idx_streams_unique_event', {
unique: ['transaction_hash', 'event_index'],
});

// Indexes for common query patterns
pgm.createIndex('streams', 'status');
pgm.createIndex('streams', 'sender_address');
pgm.createIndex('streams', 'recipient_address');
pgm.createIndex('streams', 'contract_id');
pgm.createIndex('streams', 'created_at');

// Contract events table for the indexer service
pgm.createTable('contract_events', {
event_id: { type: 'text', primaryKey: true },
ledger: { type: 'integer', notNull: true },
contract_id: { type: 'text', notNull: true },
topic: { type: 'text', notNull: true },
tx_hash: { type: 'text', notNull: true },
tx_index: { type: 'integer', notNull: true },
operation_index: { type: 'integer', notNull: true },
event_index: { type: 'integer', notNull: true },
payload: { type: 'jsonb', notNull: true },
happened_at: { type: 'timestamp with time zone', notNull: true },
ingested_at: {
type: 'timestamp with time zone',
notNull: true,
default: pgm.func('current_timestamp'),
},
});

pgm.createIndex('contract_events', 'contract_id');
pgm.createIndex('contract_events', 'tx_hash');
pgm.createIndex('contract_events', 'happened_at');
}

export async function down(pgm: MigrationBuilder): Promise<void> {
pgm.dropTable('contract_events');
pgm.dropTable('streams');
}
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
"start": "node dist/index.js",
"dev": "tsx watch src/index.ts",
"test": "vitest run",
"test:watch": "vitest",
"test:coverage": "vitest run --coverage"
"test:coverage": "vitest run --coverage",
"db:migrate": "node-pg-migrate -j ts",
"db:migrate:make": "node-pg-migrate create -j ts"
},
"dependencies": {
"better-sqlite3": "^12.8.0",
Expand All @@ -20,8 +21,8 @@
"zod": "^4.3.6"
},
"devDependencies": {
"@types/better-sqlite3": "^7.6.13",
"@jest/globals": "^29.7.0",
"@types/better-sqlite3": "^7.6.13",
"@types/express": "^4.17.21",
"@types/jest": "^30.0.0",
"@types/jsonwebtoken": "^9.0.10",
Expand All @@ -38,4 +39,4 @@
"typescript": "^5.3.3",
"vitest": "^1.6.1"
}
}
}
170 changes: 34 additions & 136 deletions src/db/migrate.ts
Original file line number Diff line number Diff line change
@@ -1,158 +1,56 @@
/**
* Database migration runner
*
* Applies migrations in order, tracking which have been applied.
* Uses SQLite's built-in journaling for atomic commits.
* Uses node-pg-migrate to apply migrations to PostgreSQL.
*
* @module db/migrate
*/

import Database from "better-sqlite3";
import { info, warn, error as logError } from "../utils/logger.js";
import path from "path";
import { fileURLToPath } from "url";
import { runner } from 'node-pg-migrate';
import { info, error as logError } from '../utils/logger.js';
import path from 'path';
import { fileURLToPath } from 'url';

const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);

// Import migrations - they will be executed in order by filename
const migrations: { up: string; down: string }[] = [];

async function loadMigrations(): Promise<void> {
const migrationFiles = await Promise.all([
import("./migrations/001_create_streams_table.js"),
]);

for (const mod of migrationFiles) {
migrations.push({ up: mod.up, down: mod.down });
}
}

export interface MigrationResult {
applied: string[];
failed: string[];
}

/**
* Run all pending migrations
*/
export function migrate(
db: Database.Database,
direction: "up" | "down" = "up",
): MigrationResult {
const applied: string[] = [];
const failed: string[] = [];

// Ensure migrations table exists
db.exec(`
CREATE TABLE IF NOT EXISTS _migrations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
applied_at TEXT NOT NULL DEFAULT (datetime('now'))
);
`);

// Get already applied migrations
const appliedMigrations = db
.prepare("SELECT name FROM _migrations ORDER BY id")
.all() as { name: string }[];
const appliedNames = new Set(appliedMigrations.map((m) => m.name));

const migrationNames = migrations.map(
(_, i) => `00${i + 1}_create_streams_table`,
);

if (direction === "up") {
// Apply pending migrations
for (let i = 0; i < migrations.length; i++) {
const name = migrationNames[i];

if (appliedNames.has(name)) {
info(`Migration ${name} already applied, skipping`);
continue;
}
export async function migrate(): Promise<void> {
const databaseUrl = process.env.DATABASE_URL;

try {
info(`Applying migration: ${name}`);
db.exec(migrations[i].up);

// Record successful migration
db.prepare("INSERT INTO _migrations (name) VALUES (?)").run(name);
applied.push(name);
info(`Successfully applied: ${name}`);
} catch (err) {
const message = err instanceof Error ? err.message : "Unknown error";
logError(`Failed to apply migration ${name}: ${message}`);
failed.push(name);
break;
}
}
} else {
// Rollback in reverse order
for (let i = migrations.length - 1; i >= 0; i--) {
const name = migrationNames[i];

if (!appliedNames.has(name)) {
warn(`Migration ${name} not applied, skipping rollback`);
continue;
}

try {
info(`Rolling back migration: ${name}`);
db.exec(migrations[i].down);
db.prepare("DELETE FROM _migrations WHERE name = ?").run(name);
applied.push(name);
info(`Successfully rolled back: ${name}`);
} catch (err) {
const message = err instanceof Error ? err.message : "Unknown error";
logError(`Failed to rollback migration ${name}: ${message}`);
failed.push(name);
break;
}
}
if (!databaseUrl) {
throw new Error('DATABASE_URL environment variable is required for migrations');
}

return { applied, failed };
try {
info('Running database migrations...');

await runner({
databaseUrl,
dir: path.join(__dirname, '../../migrations'),
direction: 'up',
migrationsTable: 'pgmigrations',
count: Infinity,
logger: {
info: (msg: string) => info(msg),
warn: (msg: string) => info(msg), // Mapping warn to info for cleaner logs
error: (msg: string) => logError(msg),
},
});

info('Migrations completed successfully');
} catch (err) {
const message = err instanceof Error ? err.message : 'Unknown error';
logError(`Migration failure: ${message}`);
throw err;
}
}

/**
* Get current migration status
* Initialize migrations as part of setup
*/
export function getMigrationStatus(db: Database.Database): {
total: number;
applied: number;
pending: number;
} {
const total = migrations.length;

const appliedCount = db
.prepare("SELECT COUNT(*) as count FROM _migrations")
.get() as { count: number };

return {
total,
applied: appliedCount.count,
pending: total - appliedCount.count,
};
}

// Auto-run migrations when module is imported
export async function initializeMigrations(
db: Database.Database,
): Promise<void> {
await loadMigrations();

const status = getMigrationStatus(db);
info("Migration status", {
total: status.total,
applied: status.applied,
pending: status.pending,
});

if (status.pending > 0) {
const result = migrate(db, "up");
if (result.failed.length > 0) {
throw new Error(`Migration failed: ${result.failed.join(", ")}`);
}
}
export async function initializeMigrations(): Promise<void> {
await migrate();
}
46 changes: 34 additions & 12 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,46 @@
import { app } from './app.js';
import { initializeConfig, getConfig, resetConfig } from './config/env.js';
import { info, error } from './utils/logger.js';
/**
* Fluxora Backend — server entry point.
*
* Responsibilities:
* - Bind the Express app to a TCP port.
* - Register OS signal handlers for graceful shutdown.
*
* Everything else (routes, middleware, app config) lives in app.ts.
* Shutdown logic (drain + hooks) lives in shutdown.ts.
*/

import http from 'node:http';
import { createApp } from './app.js';
import { gracefulShutdown } from './shutdown.js';
import { logger } from './lib/logger.js';
import { initializeMigrations } from './db/migrate.js';

async function start() {
try {
// Load and validate environment configuration
const config = initializeConfig();
const { port, nodeEnv, apiVersion } = config;

const server = app.listen(port, () => {
info(`Fluxora API v${apiVersion} started`, {
port,
env: nodeEnv,
pid: process.pid,
});
});
const app = createApp();
const server = http.createServer(app);

async function startServer() {
try {
// Run migrations before starting the server
await initializeMigrations();

server.listen(PORT, () => {
logger.info('Fluxora API listening', undefined, { port: PORT });
});
} catch (err) {
logger.error('Failed to start Fluxora API', undefined, {
error: err instanceof Error ? err.message : 'Unknown error',
});
process.exit(1);
}
}

// Initialize graceful shutdown handler
gracefulShutdown(server);
void startServer();

} catch (err) {
error('Failed to start application', {}, err as Error);
Expand Down
1 change: 1 addition & 0 deletions src/lib/stellar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export interface VerifiedStream {
ratePerSecond: bigint;
startTime: number;
endTime: number;
contractId?: string;
}

/**
Expand Down
Loading
Loading