Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions apps/daemon/src/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
// Web-side captures (apps/web/src/analytics) carry the matching identity in
// HTTP headers (see x-od-analytics-* constants in @open-design/contracts);
// daemon reads those headers off the request and reuses the same
// anonymous_id as the PostHog distinct_id so events from both sides land on
// the same person.
// device_id as the PostHog distinct_id so events from both sides land on
// the same person. (v2: renamed from `anonymous_id`.)

import crypto from 'node:crypto';
import { PostHog } from 'posthog-node';
import type { Request } from 'express';
import {
ANALYTICS_HEADER_ANONYMOUS_ID,
ANALYTICS_HEADER_DEVICE_ID,
ANALYTICS_HEADER_CLIENT_TYPE,
ANALYTICS_HEADER_LOCALE,
ANALYTICS_HEADER_REQUEST_ID,
Expand All @@ -27,7 +27,7 @@ import { readAppConfig } from './app-config.js';
const DEFAULT_HOST = 'https://us.i.posthog.com';

export interface AnalyticsContext {
anonymousId: string;
deviceId: string;
sessionId: string;
clientType: AnalyticsClientType;
locale: string;
Expand All @@ -39,15 +39,15 @@ export interface AnalyticsContext {
// web side too). Daemon-internal capture sites (e.g. background sweeps with
// no request) should not invoke this path.
export function readAnalyticsContext(req: Request): AnalyticsContext | null {
const anonymousId = headerString(req, ANALYTICS_HEADER_ANONYMOUS_ID);
if (!anonymousId) return null;
const sessionId = headerString(req, ANALYTICS_HEADER_SESSION_ID) ?? anonymousId;
const deviceId = headerString(req, ANALYTICS_HEADER_DEVICE_ID);
if (!deviceId) return null;
const sessionId = headerString(req, ANALYTICS_HEADER_SESSION_ID) ?? deviceId;
const clientHeader = headerString(req, ANALYTICS_HEADER_CLIENT_TYPE);
const clientType: AnalyticsClientType =
clientHeader === 'desktop' ? 'desktop' : 'web';
const locale = headerString(req, ANALYTICS_HEADER_LOCALE) ?? 'en';
const requestId = headerString(req, ANALYTICS_HEADER_REQUEST_ID);
return { anonymousId, sessionId, clientType, locale, requestId };
return { deviceId, sessionId, clientType, locale, requestId };
}

function headerString(req: Request, name: string): string | null {
Expand Down Expand Up @@ -140,7 +140,7 @@ export function createAnalyticsService(args: {
const appCfg = await readAppConfig(args.dataDir);
if (appCfg.telemetry?.metrics !== true) return;
client.capture({
distinctId: context.anonymousId,
distinctId: context.deviceId,
event: eventName,
properties: {
...properties,
Expand All @@ -149,7 +149,8 @@ export function createAnalyticsService(args: {
ui_version: appVersion,
app_version: appVersion,
session_id: context.sessionId,
anonymous_id: context.anonymousId,
// v2 rename: was `anonymous_id`. Value unchanged.
device_id: context.deviceId,
client_type: context.clientType,
locale: context.locale,
...(context.requestId ? { request_id: context.requestId } : {}),
Expand Down
178 changes: 6 additions & 172 deletions apps/daemon/src/chat-routes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { Express } from 'express';
import type { RouteDeps } from './server-context.js';
import { newInsertId } from './analytics.js';
import { seedProviderIfMissing } from './media-config.js';
import {
BYOK_SENSEAUDIO_TOOLS,
Expand All @@ -10,40 +9,11 @@ import {
type BYOKToolContext,
} from './byok-tools.js';
import { isSafeId as isSafeProjectId } from './projects.js';
import {
agentIdToTracking,
projectKindToTracking,
} from '@open-design/contracts/analytics';
import { projectKindToTracking } from '@open-design/contracts/analytics';
import { validateBaseUrlResolved } from './connectionTest.js';

export interface RegisterChatRoutesDeps extends RouteDeps<'db' | 'design' | 'http' | 'chat' | 'agents' | 'critique' | 'validation' | 'lifecycle' | 'paths'> {}

// Invariant: a chat assistant message row reflects its run's terminal state
// even when the web client never persists the cancel/finish itself (refresh
// or dropped PUT). Without this, a row stuck at run_status='running' /
// ended_at=NULL makes the elapsed-time renderer fall back to now - startedAt
// after reload. COALESCE preserves any endedAt the web already wrote; the
// run_status guard skips rows the web has already finalized.
function reconcileAssistantMessageOnRunEnd(
db: RegisterChatRoutesDeps['db'],
runs: RegisterChatRoutesDeps['design']['runs'],
run: { assistantMessageId: string | null },
): void {
if (!run.assistantMessageId) return;
void runs
.wait(run)
.then((finalStatus: { status: string }) => {
db.prepare(
`UPDATE messages
SET run_status = ?, ended_at = COALESCE(ended_at, ?)
WHERE id = ? AND run_status IN ('queued', 'running')`,
).run(finalStatus.status, Date.now(), run.assistantMessageId);
})
.catch((err: unknown) => {
console.warn('[runs] message reconciliation failed', err);
});
}

export function registerChatRoutes(app: Express, ctx: RegisterChatRoutesDeps) {
const { db, design } = ctx;
const { sendApiError, createSseResponse } = ctx.http;
Expand Down Expand Up @@ -76,147 +46,11 @@ export function registerChatRoutes(app: Express, ctx: RegisterChatRoutesDeps) {
return false;
};

app.post('/api/runs', (req, res) => {
if (isDaemonShuttingDown()) {
return sendApiError(res, 503, 'UPSTREAM_UNAVAILABLE', 'daemon is shutting down');
}
const run = design.runs.create(req.body || {});
const declared = String(req.get('x-od-client') ?? '').toLowerCase();
if (declared === 'desktop' || declared === 'web') {
run.clientType = declared;
} else {
const ua = String(req.get('user-agent') ?? '');
run.clientType = ua.includes('Electron/') ? 'desktop' : 'web';
}
/** @type {import('@open-design/contracts').ChatRunCreateResponse} */
const body = { runId: run.id };
res.status(202).json(body);
design.runs.start(run, () => startChatRun(req.body || {}, run));
reconcileAssistantMessageOnRunEnd(db, design.runs, run);

// Analytics: emit run_created (daemon-side, authoritative) and
// schedule a run_finished emission on wait() resolution. Both events
// use the same insert_id so PostHog dedupes against the web mirror
// that fires on SSE start/end. No-op when POSTHOG_KEY is unset.
const context = design.readAnalyticsContext?.(req);
if (context) {
const reqBody = (req.body || {}) as Record<string, unknown>;
const runInsertId = newInsertId();
const runStartedAt = Date.now();
// Estimate user_query_tokens from the request prompt — we never
// transmit the prompt text itself, just the integer count. The
// canonical extraction (currentPrompt fallback to message) lives
// in telemetryPromptFromRunRequest; mirroring it inline keeps the
// analytics emit self-contained and out of the startChatRun
// critical path.
const promptText =
typeof reqBody.currentPrompt === 'string'
? reqBody.currentPrompt
: typeof reqBody.message === 'string'
? reqBody.message
: '';
// ~4 chars per token is the common rough heuristic for English /
// Latin text; CJK skews token-per-char higher but this is still the
// industry-standard estimate when no tokenizer is available. The
// accompanying token_count_source field marks this as 'estimated'
// so dashboards can tell estimate from real provider counts.
const userQueryTokens = promptText.length > 0
? Math.ceil(promptText.length / 4)
: 0;
const baseProps: Record<string, unknown> = {
page: 'studio',
area: 'chat_composer',
project_id: typeof reqBody.projectId === 'string' ? reqBody.projectId : null,
conversation_id:
typeof reqBody.conversationId === 'string' ? reqBody.conversationId : null,
run_id: run.id,
project_kind: null,
design_system_id:
typeof reqBody.designSystemId === 'string'
? reqBody.designSystemId
: undefined,
design_system_source: 'unknown',
has_attachment: Array.isArray(reqBody.attachments)
? (reqBody.attachments as unknown[]).length > 0
: false,
user_query_tokens: userQueryTokens,
model_id: typeof reqBody.model === 'string' ? reqBody.model : null,
agent_provider_id:
typeof reqBody.agentId === 'string'
? agentIdToTracking(reqBody.agentId)
: null,
skill_id: typeof reqBody.skillId === 'string' ? reqBody.skillId : null,
mcp_id: null,
token_count_source: userQueryTokens > 0 ? 'estimated' : 'unknown',
};
design.analytics.capture({
eventName: 'run_created',
context,
appVersion: design.getAppVersion?.() ?? '0.0.0',
properties: baseProps,
insertId: runInsertId,
});
// Run lifecycle hook: emit run_finished when the run reaches a
// terminal state. The same context is reused — captures are
// synchronous and never block the run.
design.runs.wait(run).then((status: { status: string }) => {
const result =
status.status === 'succeeded'
? 'success'
: status.status === 'canceled'
? 'cancelled'
: 'failed';
// Pull input/output token totals from the agent's usage event,
// which claude-stream.ts emits as `{ type: 'usage', usage: {...} }`
// and the run service stores in run.events. Provider only gives
// totals (no 7-subfield breakdown), so token_count_source flips
// to 'provider_usage' here only when at least one number landed;
// otherwise stays 'unknown'.
let inputTokens: number | undefined;
let outputTokens: number | undefined;
for (let i = run.events.length - 1; i >= 0; i -= 1) {
const ev = run.events[i];
const data = ev?.data as
| { type?: string; usage?: Record<string, unknown> | null }
| null
| undefined;
if (ev?.event === 'agent' && data?.type === 'usage' && data.usage) {
const u = data.usage;
if (typeof u.input_tokens === 'number') inputTokens = u.input_tokens;
if (typeof u.output_tokens === 'number') outputTokens = u.output_tokens;
if (inputTokens !== undefined || outputTokens !== undefined) break;
}
}
const haveUsage = inputTokens !== undefined || outputTokens !== undefined;
const totalTokens =
inputTokens !== undefined && outputTokens !== undefined
? inputTokens + outputTokens
: undefined;
design.analytics.capture({
eventName: 'run_finished',
context,
appVersion: design.getAppVersion?.() ?? '0.0.0',
properties: {
...baseProps,
area: 'chat_panel',
result,
artifact_count: 0,
total_duration_ms: Date.now() - runStartedAt,
...(inputTokens !== undefined ? { input_tokens: inputTokens } : {}),
...(outputTokens !== undefined ? { output_tokens: outputTokens } : {}),
...(totalTokens !== undefined ? { total_tokens: totalTokens } : {}),
// Upgrade source to 'provider_usage' when the agent reported
// input/output totals; otherwise inherit baseProps' value
// ('estimated' when user_query_tokens > 0, else 'unknown').
...(haveUsage ? { token_count_source: 'provider_usage' } : {}),
},
insertId: `${runInsertId}-finish`,
});
}).catch(() => {
// wait() can't reject in current runs.ts impl, but guard anyway.
});
}
});
// The canonical POST /api/runs handler lives in `server.ts` — it ran
// first in Express's registration order long before this file existed,
// so any handler we wired here was shadowed and never executed. Plugin
// snapshot resolution, clientType inference, and the daemon-side
// run_created/finished analytics all live in `server.ts` now.

app.get('/api/runs', (req, res) => {
const { projectId, conversationId, status } = req.query;
Expand Down
Loading
Loading