diff --git a/src/collector.ts b/src/collector.ts index 1cfc447..e4241c0 100644 --- a/src/collector.ts +++ b/src/collector.ts @@ -1,7 +1,7 @@ // Data collector - reads Codex CLI storage and returns raw data import { createReadStream } from "node:fs"; -import { readdir, readFile, stat } from "node:fs/promises"; +import { readdir, stat } from "node:fs/promises"; import { join } from "node:path"; import os from "node:os"; import { createInterface } from "node:readline"; @@ -80,18 +80,26 @@ export async function listCodexSessionFiles(year: number): Promise { export async function getCodexFirstPromptTimestamp(): Promise { try { - const raw = await readFile(CODEX_HISTORY_PATH, "utf8"); let minTs: number | null = null; - for (const line of raw.split("\n")) { - if (!line.trim()) continue; - try { - const entry = JSON.parse(line) as { ts?: number }; - if (!entry.ts) continue; - if (minTs === null || entry.ts < minTs) { - minTs = entry.ts; - } - } catch { - // Skip malformed lines + + // Stream rather than `readFile` to avoid loading large history files into memory. + const rl = createInterface({ + input: createReadStream(CODEX_HISTORY_PATH), + crlfDelay: Infinity, + }); + + for await (const line of rl) { + const trimmed = line.trim(); + if (!trimmed) continue; + + // Avoid JSON.parse in the hot path; we only need `ts`. + const match = trimmed.match(/"ts"\s*:\s*(\d+)/); + if (!match) continue; + const ts = Number(match[1]); + if (!Number.isFinite(ts) || ts <= 0) continue; + + if (minTs === null || ts < minTs) { + minTs = ts; } } return minTs; @@ -102,140 +110,23 @@ export async function getCodexFirstPromptTimestamp(): Promise { export async function collectCodexUsageData(year: number): Promise { const files = await listCodexSessionFiles(year); + const concurrency = Math.max(1, Math.min(os.cpus()?.length ?? 4, 8)); + const results = await asyncPool(concurrency, files, processSessionFile); + const events: CodexUsageEvent[] = []; const dailyActivity = new Map(); const projects = new Set(); let totalMessages = 0; let earliestSessionDate: Date | null = null; - for (const filePath of files) { - let previousTotals: RawUsage | null = null; - let currentModel: string | undefined; - let currentModelIsFallback = false; - let legacyFallbackUsed = false; - - const rl = createInterface({ - input: createReadStream(filePath), - crlfDelay: Infinity, - }); - - for await (const line of rl) { - const trimmed = line.trim(); - if (!trimmed) continue; - let entry: any; - try { - entry = JSON.parse(trimmed); - } catch { - continue; - } - - const entryType = entry?.type; - - if (entryType === "session_meta") { - const sessionTimestamp = entry?.payload?.timestamp ?? entry?.timestamp; - if (sessionTimestamp) { - const sessionDate = new Date(sessionTimestamp); - if (!earliestSessionDate || sessionDate < earliestSessionDate) { - earliestSessionDate = sessionDate; - } - } - const cwd = entry?.payload?.cwd; - if (cwd) { - projects.add(cwd); - } - continue; - } - - if (entryType === "turn_context") { - const model = extractModel(entry?.payload); - if (model) { - currentModel = model; - currentModelIsFallback = false; - } - continue; - } - - if (entryType === "event_msg") { - const payload = entry?.payload; - if (payload?.type === "user_message") { - totalMessages += 1; - const timestamp = entry?.timestamp; - if (timestamp) { - const dateKey = formatDateKey(new Date(timestamp)); - dailyActivity.set(dateKey, (dailyActivity.get(dateKey) || 0) + 1); - } - continue; - } - - if (payload?.type !== "token_count") { - continue; - } - - const timestamp = entry?.timestamp; - if (!timestamp) continue; - - const info = payload?.info; - const lastUsage = normalizeRawUsage(info?.last_token_usage); - const totalUsage = normalizeRawUsage(info?.total_token_usage); - - let raw = lastUsage; - if (!raw && totalUsage) { - raw = subtractRawUsage(totalUsage, previousTotals); - } - - if (totalUsage) { - previousTotals = totalUsage; - } - - if (!raw) continue; - - const delta = convertToDelta(raw); - if ( - delta.inputTokens === 0 && - delta.cachedInputTokens === 0 && - delta.outputTokens === 0 && - delta.reasoningOutputTokens === 0 - ) { - continue; - } - - const extractedModel = extractModel({ ...payload, info }); - let isFallback = false; - if (extractedModel) { - currentModel = extractedModel; - currentModelIsFallback = false; - } - - let model = extractedModel ?? currentModel; - if (!model) { - model = LEGACY_FALLBACK_MODEL; - isFallback = true; - legacyFallbackUsed = true; - currentModel = model; - currentModelIsFallback = true; - } else if (!extractedModel && currentModelIsFallback) { - isFallback = true; - } - - events.push({ - timestamp, - model, - inputTokens: delta.inputTokens, - cachedInputTokens: delta.cachedInputTokens, - outputTokens: delta.outputTokens, - reasoningOutputTokens: delta.reasoningOutputTokens, - totalTokens: delta.totalTokens, - }); - - if (isFallback) { - // No-op for now; kept for parity with ccusage - } - } - } - - if (legacyFallbackUsed) { - // ignore - best-effort + for (const res of results) { + totalMessages += res.totalMessages; + if (res.earliestSessionDate && (!earliestSessionDate || res.earliestSessionDate < earliestSessionDate)) { + earliestSessionDate = res.earliestSessionDate; } + for (const project of res.projects) projects.add(project); + mergeCountMap(dailyActivity, res.dailyActivity); + if (res.events.length > 0) events.push(...res.events); } events.sort((a, b) => new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime()); @@ -339,6 +230,202 @@ function asNonEmptyString(value: unknown): string | undefined { return trimmed === "" ? undefined : trimmed; } +type FileUsageResult = { + events: CodexUsageEvent[]; + dailyActivity: Map; + totalMessages: number; + projects: Set; + earliestSessionDate: Date | null; +}; + +async function processSessionFile(filePath: string): Promise { + const events: CodexUsageEvent[] = []; + const dailyActivity = new Map(); + const projects = new Set(); + let totalMessages = 0; + let earliestSessionDate: Date | null = null; + + let previousTotals: RawUsage | null = null; + let currentModel: string | undefined; + let currentModelIsFallback = false; + let legacyFallbackUsed = false; + + const rl = createInterface({ + input: createReadStream(filePath), + crlfDelay: Infinity, + }); + + for await (const line of rl) { + const trimmed = line.trim(); + if (!trimmed) continue; + + // Fast path: avoid JSON.parse for irrelevant lines (most lines are `response_item` etc.). + const entryType = getTopLevelType(trimmed); + if (entryType !== "session_meta" && entryType !== "turn_context" && entryType !== "event_msg") { + continue; + } + + let entry: any; + try { + entry = JSON.parse(trimmed); + } catch { + continue; + } + + if (entryType === "session_meta") { + const sessionTimestamp = entry?.payload?.timestamp ?? entry?.timestamp; + if (sessionTimestamp) { + const sessionDate = new Date(sessionTimestamp); + if (!earliestSessionDate || sessionDate < earliestSessionDate) { + earliestSessionDate = sessionDate; + } + } + const cwd = entry?.payload?.cwd; + if (cwd) { + projects.add(cwd); + } + continue; + } + + if (entryType === "turn_context") { + const model = extractModel(entry?.payload); + if (model) { + currentModel = model; + currentModelIsFallback = false; + } + continue; + } + + if (entryType === "event_msg") { + const payload = entry?.payload; + if (payload?.type === "user_message") { + totalMessages += 1; + const timestamp = entry?.timestamp; + if (timestamp) { + const dateKey = formatDateKey(new Date(timestamp)); + dailyActivity.set(dateKey, (dailyActivity.get(dateKey) || 0) + 1); + } + continue; + } + + if (payload?.type !== "token_count") { + continue; + } + + const timestamp = entry?.timestamp; + if (!timestamp) continue; + + const info = payload?.info; + const lastUsage = normalizeRawUsage(info?.last_token_usage); + const totalUsage = normalizeRawUsage(info?.total_token_usage); + + let raw = lastUsage; + if (!raw && totalUsage) { + raw = subtractRawUsage(totalUsage, previousTotals); + } + + if (totalUsage) { + previousTotals = totalUsage; + } + + if (!raw) continue; + + const delta = convertToDelta(raw); + if ( + delta.inputTokens === 0 && + delta.cachedInputTokens === 0 && + delta.outputTokens === 0 && + delta.reasoningOutputTokens === 0 + ) { + continue; + } + + // `info` is already part of the payload; avoid object spreading on the hot path. + const extractedModel = extractModel(payload); + let isFallback = false; + if (extractedModel) { + currentModel = extractedModel; + currentModelIsFallback = false; + } + + let model = extractedModel ?? currentModel; + if (!model) { + model = LEGACY_FALLBACK_MODEL; + isFallback = true; + legacyFallbackUsed = true; + currentModel = model; + currentModelIsFallback = true; + } else if (!extractedModel && currentModelIsFallback) { + isFallback = true; + } + + events.push({ + timestamp, + model, + inputTokens: delta.inputTokens, + cachedInputTokens: delta.cachedInputTokens, + outputTokens: delta.outputTokens, + reasoningOutputTokens: delta.reasoningOutputTokens, + totalTokens: delta.totalTokens, + }); + + if (isFallback) { + // No-op for now; kept for parity with ccusage + } + } + } + + if (legacyFallbackUsed) { + // ignore - best-effort + } + + return { events, dailyActivity, totalMessages, projects, earliestSessionDate }; +} + +function mergeCountMap(into: Map, from: Map) { + for (const [k, v] of from.entries()) { + into.set(k, (into.get(k) || 0) + v); + } +} + +function getTopLevelType(line: string): string | null { + const keyIdx = line.indexOf("\"type\""); + if (keyIdx === -1) return null; + // Avoid mistakenly matching nested `"type"` fields inside payload objects. + const payloadIdx = line.indexOf("\"payload\""); + if (payloadIdx !== -1 && keyIdx > payloadIdx) return null; + + const colonIdx = line.indexOf(":", keyIdx + 6); + if (colonIdx === -1) return null; + let i = colonIdx + 1; + while (i < line.length && (line[i] === " " || line[i] === "\t")) i++; + if (line[i] !== "\"") return null; + const start = i + 1; + const end = line.indexOf("\"", start); + if (end === -1) return null; + return line.slice(start, end); +} + +async function asyncPool( + concurrency: number, + items: readonly T[], + worker: (item: T, index: number) => Promise +): Promise { + const results = new Array(items.length); + let nextIndex = 0; + + const runners = new Array(Math.min(concurrency, items.length)).fill(null).map(async () => { + while (true) { + const index = nextIndex++; + if (index >= items.length) return; + results[index] = await worker(items[index], index); + } + }); + + await Promise.all(runners); + return results; +} + function formatDateKey(date: Date): string { const year = date.getFullYear(); const month = String(date.getMonth() + 1).padStart(2, "0");