Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions src/directors/anomaly-detector.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { MemoryStore } from '../stores/memory.store';
import { RawDatum, Anomaly } from '../types';
import { genId, nowIso, safeNumber } from '../utils';

/**
* Real-time anomaly detector:
* - Maintains short-window statistics per source+metric (mean, std, count)
* - Uses z-score detection for spikes
* - Uses EWMA for drift detection (configurable alpha)
*
* This example treats numeric values extracted via a metric extractor function.
*/

type MetricExtractor = (d: RawDatum) => { metric: string; value: number } | null;

export class AnomalyDetectorService {
private store: MemoryStore;
private stats: Map<string, { mean: number; m2: number; n: number; ewma?: number }>;
private alpha: number;
private zThreshold: number;

constructor(store: MemoryStore, opts?: { alpha?: number; zThreshold?: number }) {
this.store = store;
this.stats = new Map();
this.alpha = opts?.alpha ?? 0.3; // EWMA smoothing
this.zThreshold = opts?.zThreshold ?? 4; // conservative default
}

private keyFor(source: string, metric: string) {
return `${source}::${metric}`;
}

ingest(datum: RawDatum, extractor: MetricExtractor) {
const m = extractor(datum);
if (!m) return;
const key = this.keyFor(datum.source, m.metric);
const v = safeNumber(m.value);
if (v === null) return;

// update Welford online mean/std
let s = this.stats.get(key);
if (!s) {
s = { mean: v, m2: 0, n: 1, ewma: v };
this.stats.set(key, s);
return;
}
s.n++;
const delta = v - s.mean;
s.mean += delta / s.n;
s.m2 += delta * (v - s.mean);

// EWMA
s.ewma = (s.ewma ?? v) * (1 - this.alpha) + v * this.alpha;

// check z-score if n>2
const variance = s.n > 1 ? s.m2 / (s.n - 1) : 0;
const std = Math.sqrt(variance);
const z = std === 0 ? 0 : Math.abs((v - s.mean) / std);
const reasons: string[] = [];

if (z > this.zThreshold) {
reasons.push(`z-score=${z.toFixed(2)} > ${this.zThreshold}`);
}
// ewma-based relative jump
if (s.ewma !== undefined && Math.abs(v - s.ewma) / (Math.abs(s.ewma) + 1e-9) > 0.2) {
reasons.push(`ewma jump ${(Math.abs(v - s.ewma) / (Math.abs(s.ewma) + 1e-9) * 100).toFixed(1)}%`);
}

if (reasons.length) {
const anomaly: Anomaly = {
id: genId('anom-'),
source: datum.source,
timestamp: datum.timestamp,
metric: m.metric,
value: v,
reason: reasons.join('; '),
severity: z > this.zThreshold ? 'high' : 'medium',
};
this.store.pushAnomaly(anomaly);
this.store.incAnomalyCount(datum.source);
}
}
}
44 changes: 44 additions & 0 deletions src/directors/completeness.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { RawDatum } from '../types';
import dayjs from 'dayjs';

/**
* Completeness / gap detection:
* - for feeds expected at a cadence (e.g., price ticks per symbol), detect missing windows
* - For simplicity, track last timestamp per (source, key) and if gap > expectedInterval*2, flag
*/

export class CompletenessService {
private lastPerKey: Map<string, string>;
private expectedIntervalSec: number;

constructor(opts?: { expectedIntervalSec?: number }) {
this.lastPerKey = new Map();
this.expectedIntervalSec = opts?.expectedIntervalSec ?? 30; // default expected cadence 30s
}

keyFor(source: string, key: string) {
return `${source}::${key}`;
}

mark(datum: RawDatum, key: string) {
const k = this.keyFor(datum.source, key);
this.lastPerKey.set(k, datum.timestamp);
}

detectGap(source: string, key: string) {
const k = this.keyFor(source, key);
const last = this.lastPerKey.get(k);
if (!last) return null;
const now = dayjs();
const diff = now.diff(dayjs(last), 'second');
if (diff > this.expectedIntervalSec * 2) {
return {
source,
key,
last,
gapSec: diff,
};
}
return null;
}
}
71 changes: 71 additions & 0 deletions src/directors/correction.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { MemoryStore } from '../stores/memory.store';
import { RawDatum } from '../types';
import { safeNumber, nowIso } from '../utils';

/**
* Automated correction:
* - type coercion, missing value imputation (last-known, mean), clipping to valid ranges
* - For safety, apply only for common, low-risk patterns; otherwise flag for manual review.
*
* Returns { correctedDatum, actionTaken: string | null }
*/

export class CorrectionService {
private store: MemoryStore;
private lastValues: Map<string, any>;

constructor(store: MemoryStore) {
this.store = store;
this.lastValues = new Map();
}

registerLast(source: string, metric: string, value: any) {
this.lastValues.set(`${source}::${metric}`, value);
}

getLast(source: string, metric: string) {
return this.lastValues.get(`${source}::${metric}`);
}

async attemptCorrection(datum: RawDatum): Promise<{ corrected: RawDatum; action: string | null }> {
// Basic example: if payload.price is string, cast; if missing price, impute last known
const corrected = { ...datum, payload: { ...datum.payload } };
let action: string | null = null;

if (corrected.payload && 'price' in corrected.payload) {
let p = corrected.payload.price;
const n = safeNumber(p);
if (n === null) {
// try to impute from last value
const last = this.getLast(corrected.source, 'price');
if (last !== undefined) {
corrected.payload.price = last;
action = 'imputed_price_from_last';
} else {
action = 'could_not_impute_price';
}
} else {
// valid number -> store as last known
corrected.payload.price = n;
this.registerLast(corrected.source, 'price', n);
action = 'type_cast_price';
}
}

// timestamp sanity: if timestamp in future or missing, replace with now
try {
const ts = new Date(corrected.timestamp);
if (!ts || isNaN(ts.getTime()) || ts.getTime() - Date.now() > 1000 * 60 * 5) {
corrected.timestamp = nowIso();
action = action ? `${action}; fixed_timestamp` : 'fixed_timestamp';
}
} catch (e) {
corrected.timestamp = nowIso();
action = action ? `${action}; fixed_timestamp` : 'fixed_timestamp';
}

// push to store as corrected
await this.store.pushDatum(corrected);
return { corrected, action };
}
}
38 changes: 38 additions & 0 deletions src/directors/dashboard.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import express from 'express';
import bodyParser from 'body-parser';
import cors from 'cors';
import { MemoryStore } from '../stores/memory.store';
import { ReportService } from '../reports/report.service';
import { FreshnessService } from '../detectors/freshness.service';

export function createDashboardApp(store: MemoryStore, reportService: ReportService, freshness: FreshnessService) {
const app = express();
app.use(cors());
app.use(bodyParser.json());

app.get('/health', (req, res) => res.json({ ok: true }));

app.get('/anomalies', async (req, res) => {
const source = typeof req.query.source === 'string' ? req.query.source : undefined;
const list = await store.getAnomalies(source, 1000);
res.json(list);
});

app.get('/sources', async (req, res) => {
const s = await store.getSourceScores();
res.json(s);
});

app.get('/report', async (req, res) => {
const r = await reportService.generateReport();
res.json(r);
});

app.get('/freshness/:source', (req, res) => {
const source = req.params.source;
const last = freshness.getLastSeen(source);
res.json({ source, last });
});

return app;
}
52 changes: 52 additions & 0 deletions src/directors/freshness.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { MemoryStore } from '../stores/memory.store';
import { RawDatum } from '../types';
import dayjs from 'dayjs';

/**
* Freshness monitoring:
* - track last timestamp per source
* - if last received timestamp is older than threshold, raise stale event
* - acceptance criteria: alerts on delays >15 minutes (configurable)
*/

export class FreshnessService {
private store: MemoryStore;
private lastSeenMap: Map<string, string>;
private thresholdMin: number;

constructor(store: MemoryStore, opts?: { thresholdMin?: number }) {
this.store = store;
this.lastSeenMap = new Map();
this.thresholdMin = opts?.thresholdMin ?? 15; // acceptance criteria default
}

markReceived(datum: RawDatum) {
// datum.timestamp is the data's own timestamp; we store it as last seen
this.lastSeenMap.set(datum.source, datum.timestamp);
}

async checkFreshness() {
const now = dayjs();
for (const [source, lastTs] of this.lastSeenMap) {
const diff = now.diff(dayjs(lastTs), 'minute');
if (diff > this.thresholdMin) {
// stale
await this.store.incStaleCount(source);
// write an anomaly-like record
await this.store.pushAnomaly({
id: `stale-${source}-${now.toISOString()}`,
source,
timestamp: now.toISOString(),
metric: 'freshness',
value: diff,
reason: `stale by ${diff} minutes (> ${this.thresholdMin}m)`,
severity: 'high',
});
}
}
}

getLastSeen(source: string) {
return this.lastSeenMap.get(source) ?? null;
}
}
27 changes: 27 additions & 0 deletions src/directors/lineage.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { RawDatum, DataLineage } from '../types';
import { nowIso } from '../utils';

/**
* Data lineage tracker: attach simple lineage metadata to each datum.
* For production, record transforms in DB and provide lineage graph.
*/

export class LineageService {
attach(datum: RawDatum, transformation?: string): RawDatum {
const lineage: DataLineage = {
source: datum.source,
receivedAt: nowIso(),
originalId: datum.id,
transformations: transformation ? [transformation] : [],
};
return { ...datum, lineage };
}

addTransform(datum: RawDatum, transformation: string): RawDatum {
const copy = { ...datum };
copy.lineage = copy.lineage ?? { source: datum.source, receivedAt: nowIso(), transformations: [] };
copy.lineage.transformations = copy.lineage.transformations ?? [];
copy.lineage.transformations.push(transformation);
return copy;
}
}
Loading