Skip to content
Merged
1 change: 0 additions & 1 deletion src/mirror-service/mirror_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ export async function startMirrorService(
getMirrordaemonDebugState(daemon, {
port: boundPort,
baseUrl: syncManager.getLocalBaseUrl(),
peersKnown: observability.getMetrics().gauges.peers_known || syncManager.listPeers().length,
}),
);
});
Expand Down
21 changes: 1 addition & 20 deletions src/mirror-service/runtime_host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,26 +123,7 @@ export async function createMirrorRuntimeHost(
baseUrl: config.baseUrl,
fetchImpl: deps.fetchImpl,
onRuntimeEvent: daemon.publishRuntimeEvent,
observability: {
onConflictWarning: () => {
daemon.getObservability().incrementMetric("conflict_warnings");
},
onUpdatesPulled: (count) => {
daemon.getObservability().incrementMetric("updates_pulled", count);
},
onSyncFailure: () => {
daemon.getObservability().incrementMetric("sync_failures");
},
onPeerAnnounced: (payload) => {
daemon.getObservability().logEvent("sync.peer.announced", payload);
},
onPullCompleted: (payload) => {
daemon.getObservability().logEvent("sync.pull.completed", payload);
},
onPullFailed: (payload) => {
daemon.getObservability().logEvent("sync.pull.failed", payload);
},
},
observability: daemon.getObservability(),
});

return {
Expand Down
91 changes: 72 additions & 19 deletions src/mirror-sync/sync_manager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import express from "express";
import type { MirrorObservabilityContext } from "../mirror-observability/index.js";
import type { FetchLike } from "../mirror-provider/index.js";
import {
applyRemoteCanonUpdates,
Expand Down Expand Up @@ -39,28 +40,79 @@ export type MirrorSyncHandlers = {
pull: (req: express.Request, res: express.Response) => Promise<unknown>;
};

export type MirrorSyncObservabilityHooks = {
onConflictWarning?: () => void;
onUpdatesPulled?: (count: number) => void;
onSyncFailure?: () => void;
onPeerAnnounced?: (payload: { peer_id: string; base_url: string }) => void;
onPullCompleted?: (payload: {
peer_id: string;
pulled_files: number;
conflicts: number;
graph_rebuilt: boolean;
}) => void;
onPullFailed?: (payload: { peer_id: string; error: string }) => void;
};

type MirrorSyncObservabilityContextLike = Pick<
MirrorObservabilityContext,
"incrementMetric" | "logEvent"
>;

type MirrorSyncManagerOptions = {
nodeId: string;
loreDir: string;
baseUrl?: string | null;
fetchImpl?: FetchLike;
registry?: MirrorPeerRegistry;
onRuntimeEvent?: (type: string, payload?: Record<string, unknown>) => void;
observability?: {
onConflictWarning?: () => void;
onUpdatesPulled?: (count: number) => void;
onSyncFailure?: () => void;
onPeerAnnounced?: (payload: { peer_id: string; base_url: string }) => void;
onPullCompleted?: (payload: {
peer_id: string;
pulled_files: number;
conflicts: number;
graph_rebuilt: boolean;
}) => void;
onPullFailed?: (payload: { peer_id: string; error: string }) => void;
};
observability?: MirrorSyncObservabilityHooks | MirrorSyncObservabilityContextLike;
};

function hasMirrorSyncObservabilityContext(
observability: MirrorSyncManagerOptions["observability"],
): observability is MirrorSyncObservabilityContextLike {
return (
!!observability &&
typeof observability === "object" &&
"incrementMetric" in observability &&
typeof observability.incrementMetric === "function" &&
"logEvent" in observability &&
typeof observability.logEvent === "function"
);
}

function normalizeMirrorSyncObservabilityHooks(
observability: MirrorSyncManagerOptions["observability"],
): MirrorSyncObservabilityHooks {
if (!observability) {
return {};
}
if (!hasMirrorSyncObservabilityContext(observability)) {
return observability;
}
return {
onConflictWarning: () => {
observability.incrementMetric("conflict_warnings");
},
onUpdatesPulled: (count) => {
observability.incrementMetric("updates_pulled", count);
},
onSyncFailure: () => {
observability.incrementMetric("sync_failures");
},
onPeerAnnounced: (payload) => {
observability.logEvent("sync.peer.announced", payload);
},
onPullCompleted: (payload) => {
observability.logEvent("sync.pull.completed", payload);
},
onPullFailed: (payload) => {
observability.logEvent("sync.pull.failed", payload);
},
};
}

async function parseJsonResponse<T>(response: Response): Promise<T> {
if (!response.ok) {
throw new Error(`sync request failed: ${response.status} ${await response.text()}`);
Expand Down Expand Up @@ -237,6 +289,7 @@ function collectMirrorSyncPullNeededPaths(
export function createMirrorSyncManager(options: MirrorSyncManagerOptions): MirrorSyncManager {
const fetchImpl = options.fetchImpl ?? fetch;
const registry = options.registry ?? createMirrorPeerRegistry();
const observability = normalizeMirrorSyncObservabilityHooks(options.observability);
let localBaseUrl = options.baseUrl ? normalizeMirrorPeerBaseUrl(options.baseUrl) : null;

return {
Expand All @@ -253,7 +306,7 @@ export function createMirrorSyncManager(options: MirrorSyncManagerOptions): Mirr
peer_id: peer.peer_id,
base_url: peer.base_url,
});
options.observability?.onPeerAnnounced?.({
observability.onPeerAnnounced?.({
peer_id: peer.peer_id,
base_url: peer.base_url,
});
Expand Down Expand Up @@ -318,10 +371,10 @@ export function createMirrorSyncManager(options: MirrorSyncManagerOptions): Mirr
remoteContents,
metrics: {
onConflictWarning: () => {
options.observability?.onConflictWarning?.();
observability.onConflictWarning?.();
},
onUpdatesPulled: (count) => {
options.observability?.onUpdatesPulled?.(count);
observability.onUpdatesPulled?.(count);
},
},
});
Expand All @@ -338,7 +391,7 @@ export function createMirrorSyncManager(options: MirrorSyncManagerOptions): Mirr
conflicts: canonResult.conflicts.length,
graph_rebuilt: graphResult.rebuilt,
});
options.observability?.onPullCompleted?.({
observability.onPullCompleted?.({
peer_id: peer.peer_id,
pulled_files: canonResult.pulledFiles.length,
conflicts: canonResult.conflicts.length,
Expand All @@ -352,13 +405,13 @@ export function createMirrorSyncManager(options: MirrorSyncManagerOptions): Mirr
graphResult,
});
} catch (error) {
options.observability?.onSyncFailure?.();
observability.onSyncFailure?.();
registry.markStatus(peer.peer_id, "error", String(error));
options.onRuntimeEvent?.("sync.pull.failed", {
peer_id: peer.peer_id,
error: String(error),
});
options.observability?.onPullFailed?.({
observability.onPullFailed?.({
peer_id: peer.peer_id,
error: String(error),
});
Expand Down
1 change: 0 additions & 1 deletion src/mirrordaemon/mirrordaemon.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ describe("mirrordaemon", () => {
const debug = getMirrordaemonDebugState(daemon, {
port: 7788,
baseUrl: "http://127.0.0.1:7788",
peersKnown: 2,
});

expect(runtime.node_id).toBe("daemon-node");
Expand Down
Loading