diff --git a/apps/web/src/App.tsx b/apps/web/src/App.tsx index c5f2306d..bf2f9936 100644 --- a/apps/web/src/App.tsx +++ b/apps/web/src/App.tsx @@ -80,6 +80,7 @@ function formatTrackedRunKind(kind: RunKind): string { case RunKinds['gsc-sync']: return 'GSC sync' case RunKinds['inspect-sitemap']: return 'Sitemap inspection' case RunKinds['ga-sync']: return 'GA sync' + case RunKinds['traffic-sync']: return 'Traffic sync' case RunKinds['bing-inspect']: return 'Bing URL inspection' case RunKinds['bing-inspect-sitemap']: return 'Bing sitemap inspection' case RunKinds['site-audit']: return 'Site audit' diff --git a/apps/web/src/build-dashboard.ts b/apps/web/src/build-dashboard.ts index c27570f8..c93a16fc 100644 --- a/apps/web/src/build-dashboard.ts +++ b/apps/web/src/build-dashboard.ts @@ -78,6 +78,7 @@ function kindLabel(kind: RunKind): string { case RunKinds['gsc-sync']: return 'GSC sync' case RunKinds['inspect-sitemap']: return 'Sitemap inspection' case RunKinds['ga-sync']: return 'GA sync' + case RunKinds['traffic-sync']: return 'Traffic sync' case RunKinds['bing-inspect']: return 'Bing URL inspection' case RunKinds['bing-inspect-sitemap']: return 'Bing sitemap inspection' case RunKinds['site-audit']: return 'Site audit' diff --git a/apps/web/src/queries/run-invalidations.ts b/apps/web/src/queries/run-invalidations.ts index 4d224872..ca77958a 100644 --- a/apps/web/src/queries/run-invalidations.ts +++ b/apps/web/src/queries/run-invalidations.ts @@ -30,6 +30,7 @@ export function invalidateQueriesForRunKind( void queryClient.invalidateQueries({ queryKey: queryKeys.gsc.project(projectName) }) return case RunKinds['ga-sync']: + case RunKinds['traffic-sync']: void queryClient.invalidateQueries({ queryKey: queryKeys.traffic.project(projectName) }) return case RunKinds['bing-inspect']: diff --git a/docs/data-model.md b/docs/data-model.md index 4c09a8a9..060c15f8 100644 --- a/docs/data-model.md +++ b/docs/data-model.md @@ -34,6 +34,11 @@ erDiagram projects ||--o{ bing_keyword_stats : has projects ||--o{ bing_coverage_snapshots : has + projects ||--o{ traffic_sources : has + traffic_sources ||--o{ crawler_events_hourly : "rolls up" + traffic_sources ||--o{ ai_referral_events_hourly : "rolls up" + traffic_sources ||--o{ raw_event_samples : "samples" + projects ||--o| agent_sessions : "has (1:1)" projects ||--o{ agent_memory : has ``` @@ -81,6 +86,15 @@ erDiagram | **ga_ai_referrals** | AI engine referral tracking. Unique: `(projectId, date, source, medium, sourceDimension)` | | **ga_social_referrals** | Social media referral tracking. Unique: `(projectId, date, source, medium, channelGroup)` | +### Server-Side Traffic Ingestion + +| Table | Purpose | +|-------|---------| +| **traffic_sources** | Per-connection metadata (Cloud Run today; future WordPress / Cloudflare / Vercel). Status `connected` / `paused` / `error` / `archived`. Credentials live in `~/.canonry/config.yaml`, never here. FK: projectId → projects. | +| **crawler_events_hourly** | Hourly rollup of server-observed AI crawler hits. Composite PK `(projectId, sourceId, tsHour, botId, verificationStatus, pathNormalized, status)` so repeat syncs upsert via `hits + ?`. | +| **ai_referral_events_hourly** | Hourly rollup of server-observed human AI-referral clicks (UTM or referer evidence). Composite PK matches the crawler bucket pattern. | +| **raw_event_samples** | Bounded sample tail for classifier debugging (default 30-day retention). FK: sourceId → traffic_sources. | + ### Intelligence | Table | Purpose | diff --git a/package.json b/package.json index 4081becf..fd4cdbeb 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "canonry", "private": true, - "version": "4.11.1", + "version": "4.12.1", "type": "module", "packageManager": "pnpm@10.28.2", "scripts": { diff --git a/packages/api-routes/AGENTS.md b/packages/api-routes/AGENTS.md index 3b01082b..97c9a221 100644 --- a/packages/api-routes/AGENTS.md +++ b/packages/api-routes/AGENTS.md @@ -22,6 +22,7 @@ Shared Fastify route plugins used by both the local server (`packages/canonry`) | `src/report.ts` | `GET /projects/:name/report` (JSON DTO) and `GET /projects/:name/report.html` (standalone downloadable HTML) — aggregated client-facing AEO report bundle (13 sections) | | `src/report-renderer.ts` | `renderReportHtml(report)` — server-side HTML renderer with inline SVG charts and inline CSS, re-exported from `@ainyc/canonry-api-routes` for the CLI | | `src/wordpress.ts` | WordPress integration routes | +| `src/traffic.ts` | Server-side traffic ingestion routes — `POST /traffic/connect/cloud-run`, `POST /traffic/sources/:id/sync`. Credentials resolved through an injected `cloudRunCredentialStore`; the Cloud Logging pull and access-token resolver are also injectable for tests. | | `src/backlinks.ts` | Backlinks (Common Crawl sync + per-project extract/summary/domains/history) routes | | `src/doctor.ts` | `GET /doctor` and `GET /projects/:name/doctor` — runs check registry, returns `DoctorReport` | | `src/doctor/registry.ts` | `ALL_CHECKS` — single source of truth for the doctor check catalog | diff --git a/packages/api-routes/package.json b/packages/api-routes/package.json index 03809c3d..ed041def 100644 --- a/packages/api-routes/package.json +++ b/packages/api-routes/package.json @@ -21,9 +21,11 @@ "@ainyc/canonry-db": "workspace:*", "@ainyc/canonry-intelligence": "workspace:*", "@ainyc/canonry-integration-bing": "workspace:*", + "@ainyc/canonry-integration-cloud-run": "workspace:*", "@ainyc/canonry-integration-google": "workspace:*", "@ainyc/canonry-integration-commoncrawl": "workspace:*", "@ainyc/canonry-integration-google-analytics": "workspace:*", + "@ainyc/canonry-integration-traffic": "workspace:*", "@ainyc/canonry-integration-wordpress": "workspace:*", "drizzle-orm": "^0.45.1", "fastify": "^5.4.0" diff --git a/packages/api-routes/src/index.ts b/packages/api-routes/src/index.ts index 897ffd90..e81faa61 100644 --- a/packages/api-routes/src/index.ts +++ b/packages/api-routes/src/index.ts @@ -41,6 +41,8 @@ import { wordpressRoutes } from './wordpress.js' import type { WordpressRoutesOptions } from './wordpress.js' import { backlinksRoutes } from './backlinks.js' import type { BacklinksRoutesOptions } from './backlinks.js' +import { trafficRoutes } from './traffic.js' +import type { TrafficRoutesOptions, CloudRunCredentialStore } from './traffic.js' import { doctorRoutes } from './doctor.js' declare module 'fastify' { @@ -105,6 +107,12 @@ export interface ApiRoutesOptions { onCdpConfigure?: CDPRoutesOptions['onCdpConfigure'] /** GA4 credential store — stores service account keys in config, not DB */ ga4CredentialStore?: Ga4CredentialStore + /** Cloud Run credential store — stores SA keys / OAuth tokens in config, not DB */ + cloudRunCredentialStore?: CloudRunCredentialStore + /** Override Cloud Run pull (tests) — see `TrafficRoutesOptions` */ + pullCloudRunEvents?: TrafficRoutesOptions['pullCloudRunEvents'] + /** Override Cloud Run access-token resolver (tests) — see `TrafficRoutesOptions` */ + resolveCloudRunAccessToken?: TrafficRoutesOptions['resolveCloudRunAccessToken'] /** Backlinks feature callbacks — see `backlinksRoutes` for details. */ getBacklinksStatus?: BacklinksRoutesOptions['getBacklinksStatus'] onInstallBacklinks?: BacklinksRoutesOptions['onInstallBacklinks'] @@ -262,6 +270,11 @@ export async function apiRoutes(app: FastifyInstance, opts: ApiRoutesOptions) { googleConnectionStore: opts.googleConnectionStore, getGoogleAuthConfig: opts.getGoogleAuthConfig, } satisfies GA4RoutesOptions) + await api.register(trafficRoutes, { + cloudRunCredentialStore: opts.cloudRunCredentialStore, + pullCloudRunEvents: opts.pullCloudRunEvents, + resolveCloudRunAccessToken: opts.resolveCloudRunAccessToken, + } satisfies TrafficRoutesOptions) // Always mount the backlinks routes so read endpoints (summary, domains, // history, sync list) work off the shared DB. Action routes (install, // sync, extract, cache prune) throw MISSING_DEPENDENCY when the host diff --git a/packages/api-routes/src/openapi.ts b/packages/api-routes/src/openapi.ts index c7f0faba..c72b9618 100644 --- a/packages/api-routes/src/openapi.ts +++ b/packages/api-routes/src/openapi.ts @@ -2735,6 +2735,68 @@ const routeCatalog: OpenApiOperation[] = [ 404: { description: 'Project not found.' }, }, }, + { + method: 'post', + path: '/api/v1/projects/{name}/traffic/connect/cloud-run', + summary: 'Connect a Cloud Run traffic source', + description: + 'Stores the service-account JSON in `~/.canonry/config.yaml` and creates a `traffic_sources` row for the project. Reconnecting updates the existing active source rather than creating a duplicate.', + tags: ['traffic'], + parameters: [nameParameter], + requestBody: { + required: true, + content: { + 'application/json': { + schema: { + type: 'object', + required: ['gcpProjectId', 'keyJson'], + properties: { + gcpProjectId: stringSchema, + serviceName: stringSchema, + location: stringSchema, + displayName: stringSchema, + keyJson: { ...stringSchema, description: 'Service-account JSON content.' }, + }, + }, + }, + }, + }, + responses: { + 200: { description: 'Traffic source DTO returned.' }, + 400: { description: 'Invalid Cloud Run connection request.' }, + 404: { description: 'Project not found.' }, + }, + }, + { + method: 'post', + path: '/api/v1/projects/{name}/traffic/sources/{id}/sync', + summary: 'Trigger a sync run for a traffic source', + description: + 'Pulls request logs from the configured Cloud Run service for the lookback window, classifies crawler / AI-referral hits, and upserts hourly buckets and a bounded sample tail.', + tags: ['traffic'], + parameters: [ + nameParameter, + { name: 'id', in: 'path', required: true, description: 'Traffic source ID.', schema: stringSchema }, + ], + requestBody: { + required: false, + content: { + 'application/json': { + schema: { + type: 'object', + properties: { + sinceMinutes: { ...integerSchema, description: 'Lookback window in minutes (default 60).' }, + }, + }, + }, + }, + }, + responses: { + 200: { description: 'Sync summary returned.' }, + 400: { description: 'Invalid sync request, missing credentials, or upstream pull error.' }, + 404: { description: 'Project or traffic source not found.' }, + }, + }, ] /** diff --git a/packages/api-routes/src/traffic.ts b/packages/api-routes/src/traffic.ts new file mode 100644 index 00000000..7e37f359 --- /dev/null +++ b/packages/api-routes/src/traffic.ts @@ -0,0 +1,521 @@ +import crypto from 'node:crypto' +import { eq, sql } from 'drizzle-orm' +import type { FastifyInstance } from 'fastify' +import { + trafficSources, + crawlerEventsHourly, + aiReferralEventsHourly, + rawEventSamples, + runs, + parseJsonColumn, +} from '@ainyc/canonry-db' +import { + notFound, + validationError, + RunKinds, + RunStatuses, + RunTriggers, + TrafficSourceStatuses, + TrafficSourceTypes, + TrafficSourceAuthModes, +} from '@ainyc/canonry-contracts' +import type { + TrafficSourceDto, + TrafficSyncResponse, + TrafficSourceStatus, + TrafficSourceAuthMode, +} from '@ainyc/canonry-contracts' +import { + listCloudRunTrafficEvents, + getCloudLoggingAccessToken, +} from '@ainyc/canonry-integration-cloud-run' +import type { + CloudRunTrafficEventsPage, + ListCloudRunTrafficEventsOptions, +} from '@ainyc/canonry-integration-cloud-run' +import { buildTrafficProbeReport } from '@ainyc/canonry-integration-traffic' +import { resolveProject, writeAuditLog } from './helpers.js' + +export interface CloudRunCredentialRecord { + projectName: string + gcpProjectId: string + serviceName?: string + location?: string + authMode: TrafficSourceAuthMode + clientEmail?: string + privateKey?: string + refreshToken?: string + tokenExpiresAt?: string + scopes?: string[] + createdAt: string + updatedAt: string +} + +export interface CloudRunCredentialStore { + getConnection: (projectName: string) => CloudRunCredentialRecord | undefined + upsertConnection: (record: CloudRunCredentialRecord) => CloudRunCredentialRecord + deleteConnection: (projectName: string) => boolean +} + +export interface TrafficRoutesOptions { + cloudRunCredentialStore?: CloudRunCredentialStore + /** Override the Cloud Run pull function (for tests). Defaults to `listCloudRunTrafficEvents`. */ + pullCloudRunEvents?: ( + accessToken: string, + options: ListCloudRunTrafficEventsOptions, + ) => Promise + /** Override the access-token resolver (for tests). Defaults to service-account JWT exchange. */ + resolveCloudRunAccessToken?: (record: CloudRunCredentialRecord) => Promise + /** Default lookback window in minutes when a sync is triggered without an explicit `since`. */ + defaultSyncWindowMinutes?: number + /** Default page size for entries.list pulls. */ + defaultPageSize?: number + /** Default max pages for entries.list pulls. */ + defaultMaxPages?: number + /** Cap on the number of raw_event_samples written per sync. */ + defaultSampleLimit?: number +} + +const DEFAULT_SYNC_WINDOW_MINUTES = 60 +const DEFAULT_PAGE_SIZE = 1000 +const DEFAULT_MAX_PAGES = 5 +const DEFAULT_SAMPLE_LIMIT = 100 + +function parseSourceConfig(row: typeof trafficSources.$inferSelect): Record { + return parseJsonColumn>(row.configJson, {}) +} + +function rowToDto(row: typeof trafficSources.$inferSelect): TrafficSourceDto { + return { + id: row.id, + projectId: row.projectId, + sourceType: row.sourceType as TrafficSourceDto['sourceType'], + displayName: row.displayName, + status: row.status as TrafficSourceStatus, + lastSyncedAt: row.lastSyncedAt ?? null, + lastCursor: row.lastCursor ?? null, + lastError: row.lastError ?? null, + archivedAt: row.archivedAt ?? null, + config: parseSourceConfig(row), + createdAt: row.createdAt, + updatedAt: row.updatedAt, + } +} + +async function defaultResolveAccessToken(record: CloudRunCredentialRecord): Promise { + if (record.authMode === TrafficSourceAuthModes['service-account']) { + if (!record.clientEmail || !record.privateKey) { + throw validationError('Service-account credentials missing client_email or private_key') + } + return getCloudLoggingAccessToken(record.clientEmail, record.privateKey) + } + throw validationError( + 'OAuth-mode Cloud Run sync is not yet supported in v1. Provide a service-account key file.', + ) +} + +export async function trafficRoutes(app: FastifyInstance, opts: TrafficRoutesOptions) { + const pullEvents = opts.pullCloudRunEvents ?? listCloudRunTrafficEvents + const resolveAccessToken = opts.resolveCloudRunAccessToken ?? defaultResolveAccessToken + const syncWindowMinutes = opts.defaultSyncWindowMinutes ?? DEFAULT_SYNC_WINDOW_MINUTES + const pageSize = opts.defaultPageSize ?? DEFAULT_PAGE_SIZE + const maxPages = opts.defaultMaxPages ?? DEFAULT_MAX_PAGES + const sampleLimit = opts.defaultSampleLimit ?? DEFAULT_SAMPLE_LIMIT + + // POST /projects/:name/traffic/connect/cloud-run + app.post<{ + Params: { name: string } + Body: { + gcpProjectId?: string + serviceName?: string + location?: string + displayName?: string + keyJson?: string + } + }>('/projects/:name/traffic/connect/cloud-run', async (request) => { + const project = resolveProject(app.db, request.params.name) + const body = request.body ?? {} + const { gcpProjectId, serviceName, location, displayName, keyJson } = body + + if (!gcpProjectId || typeof gcpProjectId !== 'string') { + throw validationError('gcpProjectId is required') + } + if (!keyJson) { + throw validationError( + 'keyJson is required for v1 (service-account JSON content). OAuth-mode Cloud Run is not yet supported.', + ) + } + if (!opts.cloudRunCredentialStore) { + throw validationError('Cloud Run credential storage is not configured for this deployment') + } + + let parsed: { client_email?: string; private_key?: string } + try { + parsed = JSON.parse(keyJson) as { client_email?: string; private_key?: string } + } catch { + throw validationError('Invalid JSON in keyJson') + } + if (!parsed.client_email || !parsed.private_key) { + throw validationError('Service-account JSON must contain client_email and private_key') + } + + const now = new Date().toISOString() + const existing = opts.cloudRunCredentialStore.getConnection(project.name) + opts.cloudRunCredentialStore.upsertConnection({ + projectName: project.name, + gcpProjectId, + serviceName: serviceName ?? undefined, + location: location ?? undefined, + authMode: TrafficSourceAuthModes['service-account'], + clientEmail: parsed.client_email, + privateKey: parsed.private_key, + createdAt: existing?.createdAt ?? now, + updatedAt: now, + }) + + // Find an existing non-archived source for this (project, sourceType). + // v1 supports a single active Cloud Run source per project; reconnect updates it. + const activeSource = app.db + .select() + .from(trafficSources) + .where(eq(trafficSources.projectId, project.id)) + .all() + .find((row) => row.sourceType === TrafficSourceTypes['cloud-run'] && row.status !== TrafficSourceStatuses.archived) + + const config: Record = { + gcpProjectId, + serviceName: serviceName ?? null, + location: location ?? null, + authMode: TrafficSourceAuthModes['service-account'], + } + const fallbackName = displayName ?? `Cloud Run · ${gcpProjectId}${serviceName ? ` / ${serviceName}` : ''}` + + let sourceRow: typeof trafficSources.$inferSelect + if (activeSource) { + app.db + .update(trafficSources) + .set({ + displayName: fallbackName, + status: TrafficSourceStatuses.connected, + lastError: null, + configJson: JSON.stringify(config), + updatedAt: now, + }) + .where(eq(trafficSources.id, activeSource.id)) + .run() + sourceRow = app.db + .select() + .from(trafficSources) + .where(eq(trafficSources.id, activeSource.id)) + .get()! + } else { + const newId = crypto.randomUUID() + app.db + .insert(trafficSources) + .values({ + id: newId, + projectId: project.id, + sourceType: TrafficSourceTypes['cloud-run'], + displayName: fallbackName, + status: TrafficSourceStatuses.connected, + lastSyncedAt: null, + lastCursor: null, + lastError: null, + archivedAt: null, + configJson: JSON.stringify(config), + createdAt: now, + updatedAt: now, + }) + .run() + sourceRow = app.db + .select() + .from(trafficSources) + .where(eq(trafficSources.id, newId)) + .get()! + } + + writeAuditLog(app.db, { + projectId: project.id, + actor: 'api', + action: 'traffic.cloud-run.connected', + entityType: 'traffic_source', + entityId: sourceRow.id, + }) + + return rowToDto(sourceRow) + }) + + // POST /projects/:name/traffic/sources/:id/sync + app.post<{ + Params: { name: string; id: string } + Body: { sinceMinutes?: number } + }>('/projects/:name/traffic/sources/:id/sync', async (request) => { + const project = resolveProject(app.db, request.params.name) + const sourceRow = app.db + .select() + .from(trafficSources) + .where(eq(trafficSources.id, request.params.id)) + .get() + if (!sourceRow || sourceRow.projectId !== project.id) { + throw notFound('Traffic source', request.params.id) + } + if (sourceRow.sourceType !== TrafficSourceTypes['cloud-run']) { + throw validationError( + `Sync for source type "${sourceRow.sourceType}" is not implemented yet — only cloud-run is supported in v1.`, + ) + } + + const credentialStore = opts.cloudRunCredentialStore + if (!credentialStore) { + throw validationError('Cloud Run credential storage is not configured for this deployment') + } + const credential = credentialStore.getConnection(project.name) + if (!credential) { + throw validationError( + `No Cloud Run credential found for project "${project.name}". Run "canonry traffic connect cloud-run" first.`, + ) + } + + const config = parseSourceConfig(sourceRow) + const gcpProjectId = (config.gcpProjectId as string | undefined) ?? credential.gcpProjectId + const serviceName = (config.serviceName as string | null | undefined) ?? credential.serviceName ?? undefined + const location = (config.location as string | null | undefined) ?? credential.location ?? undefined + + const requestedMinutes = request.body?.sinceMinutes + const windowMinutes = Number.isFinite(requestedMinutes) && requestedMinutes && requestedMinutes > 0 + ? Math.floor(requestedMinutes) + : syncWindowMinutes + + const windowEnd = new Date() + // Clamp windowStart forward to lastSyncedAt so back-to-back syncs don't + // re-pull the previous window and double-count via the `hits + ?` upsert. + const requestedStartMs = windowEnd.getTime() - windowMinutes * 60_000 + const lastSyncedMs = sourceRow.lastSyncedAt + ? new Date(sourceRow.lastSyncedAt).getTime() + : Number.NEGATIVE_INFINITY + const windowStart = new Date( + Math.min(windowEnd.getTime(), Math.max(requestedStartMs, lastSyncedMs)), + ) + + const startedAt = windowEnd.toISOString() + const runId = crypto.randomUUID() + app.db + .insert(runs) + .values({ + id: runId, + projectId: project.id, + kind: RunKinds['traffic-sync'], + status: RunStatuses.running, + trigger: RunTriggers.manual, + startedAt, + createdAt: startedAt, + }) + .run() + + let accessToken: string + try { + accessToken = await resolveAccessToken(credential) + } catch (e) { + const msg = e instanceof Error ? e.message : String(e) + app.db + .update(runs) + .set({ status: RunStatuses.failed, error: msg, finishedAt: new Date().toISOString() }) + .where(eq(runs.id, runId)) + .run() + app.db + .update(trafficSources) + .set({ status: TrafficSourceStatuses.error, lastError: msg, updatedAt: new Date().toISOString() }) + .where(eq(trafficSources.id, sourceRow.id)) + .run() + throw validationError(`Failed to resolve Cloud Run access token: ${msg}`) + } + + let allEvents: CloudRunTrafficEventsPage['events'] = [] + try { + const page = await pullEvents(accessToken, { + gcpProjectId, + serviceName, + location, + startTime: windowStart.toISOString(), + endTime: windowEnd.toISOString(), + pageSize, + maxPages, + }) + allEvents = page.events + } catch (e) { + const msg = e instanceof Error ? e.message : String(e) + app.db + .update(runs) + .set({ status: RunStatuses.failed, error: msg, finishedAt: new Date().toISOString() }) + .where(eq(runs.id, runId)) + .run() + app.db + .update(trafficSources) + .set({ status: TrafficSourceStatuses.error, lastError: msg, updatedAt: new Date().toISOString() }) + .where(eq(trafficSources.id, sourceRow.id)) + .run() + throw validationError(`Cloud Run pull failed: ${msg}`) + } + + const report = buildTrafficProbeReport(allEvents, { sampleLimit }) + const finishedAt = new Date().toISOString() + + let crawlerBucketRows = 0 + let aiReferralBucketRows = 0 + let sampleRows = 0 + + app.db.transaction((tx) => { + // Upsert crawler hourly buckets — composite PK lets us accumulate `hits`. + for (const bucket of report.crawlerEventsHourly) { + const status = bucket.status ?? 0 + tx + .insert(crawlerEventsHourly) + .values({ + projectId: project.id, + sourceId: sourceRow.id, + tsHour: bucket.tsHour, + botId: bucket.botId, + operator: bucket.operator, + verificationStatus: bucket.verificationStatus, + pathNormalized: bucket.pathNormalized, + status, + hits: bucket.hits, + sampledUserAgent: bucket.sampledUserAgent, + createdAt: finishedAt, + updatedAt: finishedAt, + }) + .onConflictDoUpdate({ + target: [ + crawlerEventsHourly.projectId, + crawlerEventsHourly.sourceId, + crawlerEventsHourly.tsHour, + crawlerEventsHourly.botId, + crawlerEventsHourly.verificationStatus, + crawlerEventsHourly.pathNormalized, + crawlerEventsHourly.status, + ], + set: { + hits: sql`${crawlerEventsHourly.hits} + ${bucket.hits}`, + sampledUserAgent: bucket.sampledUserAgent, + updatedAt: finishedAt, + }, + }) + .run() + crawlerBucketRows += 1 + } + + for (const bucket of report.aiReferralEventsHourly) { + const status = bucket.status ?? 0 + tx + .insert(aiReferralEventsHourly) + .values({ + projectId: project.id, + sourceId: sourceRow.id, + tsHour: bucket.tsHour, + product: bucket.product, + operator: bucket.operator, + sourceDomain: bucket.sourceDomain, + evidenceType: bucket.evidenceType, + landingPathNormalized: bucket.landingPathNormalized, + status, + sessionsOrHits: bucket.hits, + usersEstimated: null, + createdAt: finishedAt, + updatedAt: finishedAt, + }) + .onConflictDoUpdate({ + target: [ + aiReferralEventsHourly.projectId, + aiReferralEventsHourly.sourceId, + aiReferralEventsHourly.tsHour, + aiReferralEventsHourly.product, + aiReferralEventsHourly.sourceDomain, + aiReferralEventsHourly.evidenceType, + aiReferralEventsHourly.landingPathNormalized, + aiReferralEventsHourly.status, + ], + set: { + sessionsOrHits: sql`${aiReferralEventsHourly.sessionsOrHits} + ${bucket.hits}`, + updatedAt: finishedAt, + }, + }) + .run() + aiReferralBucketRows += 1 + } + + for (const sample of report.samples) { + const eventType = sample.crawler ? 'crawler' : sample.aiReferral ? 'ai_referral' : 'unknown' + const refererHost = (() => { + if (!sample.referer) return null + try { + return new URL(sample.referer).hostname + } catch { + return null + } + })() + tx + .insert(rawEventSamples) + .values({ + id: crypto.randomUUID(), + projectId: project.id, + sourceId: sourceRow.id, + ts: sample.observedAt, + eventType, + ipHash: null, + userAgent: sample.userAgent, + pathNormalized: sample.pathNormalized, + status: sample.status, + refererHost, + classifierDetailsJson: JSON.stringify({ + crawler: sample.crawler, + aiReferral: sample.aiReferral, + }), + createdAt: finishedAt, + }) + .run() + sampleRows += 1 + } + + tx + .update(trafficSources) + .set({ + status: TrafficSourceStatuses.connected, + lastSyncedAt: finishedAt, + lastError: null, + updatedAt: finishedAt, + }) + .where(eq(trafficSources.id, sourceRow.id)) + .run() + + tx + .update(runs) + .set({ status: RunStatuses.completed, finishedAt }) + .where(eq(runs.id, runId)) + .run() + }) + + writeAuditLog(app.db, { + projectId: project.id, + actor: 'api', + action: 'traffic.cloud-run.synced', + entityType: 'traffic_source', + entityId: sourceRow.id, + }) + + const response: TrafficSyncResponse = { + sourceId: sourceRow.id, + runId, + syncedAt: finishedAt, + pulledEvents: report.totals.normalizedEvents, + crawlerHits: report.totals.crawlerHits, + aiReferralHits: report.totals.aiReferralHits, + unknownHits: report.totals.unknownHits, + crawlerBucketRows, + aiReferralBucketRows, + sampleRows, + windowStart: windowStart.toISOString(), + windowEnd: windowEnd.toISOString(), + } + return response + }) +} diff --git a/packages/api-routes/test/traffic.test.ts b/packages/api-routes/test/traffic.test.ts new file mode 100644 index 00000000..988281f3 --- /dev/null +++ b/packages/api-routes/test/traffic.test.ts @@ -0,0 +1,450 @@ +import { describe, it, beforeEach, afterEach, expect } from 'vitest' +import fs from 'node:fs' +import path from 'node:path' +import os from 'node:os' +import Fastify from 'fastify' +import { eq } from 'drizzle-orm' +import { + createClient, + migrate, + trafficSources, + crawlerEventsHourly, + aiReferralEventsHourly, + rawEventSamples, + runs, +} from '@ainyc/canonry-db' +import { + TrafficSourceTypes, + TrafficSourceStatuses, + TrafficSourceAuthModes, + RunKinds, + RunStatuses, +} from '@ainyc/canonry-contracts' +import type { NormalizedTrafficRequest } from '@ainyc/canonry-contracts' +import type { CloudRunTrafficEventsPage } from '@ainyc/canonry-integration-cloud-run' +import { apiRoutes } from '../src/index.js' +import type { CloudRunCredentialRecord, CloudRunCredentialStore } from '../src/traffic.js' + +function buildEvent(overrides: Partial = {}): NormalizedTrafficRequest { + const base: NormalizedTrafficRequest = { + sourceType: 'cloud-run', + evidenceKind: 'raw-request', + confidence: 'observed', + eventId: `evt-${Math.random().toString(36).slice(2)}`, + observedAt: '2026-05-07T17:32:00.000Z', + method: 'GET', + requestUrl: 'https://example.com/blog/foo', + host: 'example.com', + path: '/blog/foo', + queryString: null, + status: 200, + userAgent: 'GPTBot/1.0', + remoteIp: '1.2.3.4', + referer: null, + latencyMs: null, + requestSizeBytes: null, + responseSizeBytes: null, + providerResource: { type: 'cloud_run_revision', labels: {} }, + providerLabels: {}, + } + return { ...base, ...overrides } +} + +async function buildHarness(events: NormalizedTrafficRequest[]) { + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'traffic-routes-test-')) + const dbPath = path.join(tmpDir, 'test.db') + const db = createClient(dbPath) + migrate(db) + + const credentials = new Map() + const cloudRunCredentialStore: CloudRunCredentialStore = { + getConnection: (projectName) => credentials.get(projectName), + upsertConnection: (record) => { + credentials.set(record.projectName, record) + return record + }, + deleteConnection: (projectName) => credentials.delete(projectName), + } + + let pullInvocations = 0 + const observedWindows: Array<{ startTime: string; endTime: string }> = [] + const app = Fastify() + app.register(apiRoutes, { + db, + skipAuth: true, + cloudRunCredentialStore, + pullCloudRunEvents: async (_token, options): Promise => { + pullInvocations += 1 + observedWindows.push({ startTime: options.startTime, endTime: options.endTime }) + // Mirror Cloud Logging's behavior: only return events inside the requested window. + const startMs = new Date(options.startTime).getTime() + const endMs = new Date(options.endTime).getTime() + const filtered = events.filter((e) => { + const t = new Date(e.observedAt).getTime() + return t >= startMs && t <= endMs + }) + return { + events: filtered, + rawEntryCount: filtered.length, + skippedEntryCount: 0, + nextPageToken: undefined, + filter: 'mock', + } + }, + resolveCloudRunAccessToken: async () => 'mock-access-token', + }) + await app.ready() + + // Seed a project + await app.inject({ + method: 'PUT', + url: '/api/v1/projects/test-project', + payload: { + displayName: 'Test Project', + canonicalDomain: 'example.com', + country: 'US', + language: 'en', + }, + }) + + return { + app, + db, + credentials, + tmpDir, + getPullCount: () => pullInvocations, + getObservedWindows: () => observedWindows, + close: async () => { + await app.close() + fs.rmSync(tmpDir, { recursive: true, force: true }) + }, + } +} + +const SA_KEY = JSON.stringify({ + client_email: 'sa@openclaw-nyc.iam.gserviceaccount.com', + private_key: '-----BEGIN PRIVATE KEY-----\nfake-key\n-----END PRIVATE KEY-----', +}) + +describe('POST /traffic/connect/cloud-run', () => { + let h: Awaited> + beforeEach(async () => { h = await buildHarness([]) }) + afterEach(async () => { await h.close() }) + + it('rejects requests without gcpProjectId', async () => { + const res = await h.app.inject({ + method: 'POST', + url: '/api/v1/projects/test-project/traffic/connect/cloud-run', + payload: { keyJson: SA_KEY }, + }) + expect(res.statusCode).toBe(400) + expect(JSON.parse(res.payload).error.message).toMatch(/gcpProjectId/) + }) + + it('rejects requests without keyJson', async () => { + const res = await h.app.inject({ + method: 'POST', + url: '/api/v1/projects/test-project/traffic/connect/cloud-run', + payload: { gcpProjectId: 'openclaw-nyc' }, + }) + expect(res.statusCode).toBe(400) + expect(JSON.parse(res.payload).error.message).toMatch(/keyJson/) + }) + + it('rejects malformed keyJson', async () => { + const res = await h.app.inject({ + method: 'POST', + url: '/api/v1/projects/test-project/traffic/connect/cloud-run', + payload: { gcpProjectId: 'openclaw-nyc', keyJson: 'not-json' }, + }) + expect(res.statusCode).toBe(400) + expect(JSON.parse(res.payload).error.message).toMatch(/Invalid JSON/i) + }) + + it('persists credentials and creates a connected source row', async () => { + const res = await h.app.inject({ + method: 'POST', + url: '/api/v1/projects/test-project/traffic/connect/cloud-run', + payload: { + gcpProjectId: 'openclaw-nyc', + serviceName: 'openclaw-nyc', + location: 'us-east1', + keyJson: SA_KEY, + }, + }) + expect(res.statusCode).toBe(200) + const dto = JSON.parse(res.payload) + expect(dto.sourceType).toBe(TrafficSourceTypes['cloud-run']) + expect(dto.status).toBe(TrafficSourceStatuses.connected) + expect(dto.config.gcpProjectId).toBe('openclaw-nyc') + expect(dto.config.serviceName).toBe('openclaw-nyc') + expect(dto.config.authMode).toBe(TrafficSourceAuthModes['service-account']) + expect(dto.archivedAt).toBeNull() + + const stored = h.credentials.get('test-project') + expect(stored).toBeDefined() + expect(stored?.clientEmail).toBe('sa@openclaw-nyc.iam.gserviceaccount.com') + expect(stored?.privateKey).toContain('PRIVATE KEY') + + const sourceRows = h.db.select().from(trafficSources).all() + expect(sourceRows.length).toBe(1) + expect(sourceRows[0].status).toBe(TrafficSourceStatuses.connected) + }) + + it('reuses the existing source row on reconnect rather than creating a duplicate', async () => { + await h.app.inject({ + method: 'POST', + url: '/api/v1/projects/test-project/traffic/connect/cloud-run', + payload: { gcpProjectId: 'old-project', keyJson: SA_KEY }, + }) + + const second = await h.app.inject({ + method: 'POST', + url: '/api/v1/projects/test-project/traffic/connect/cloud-run', + payload: { gcpProjectId: 'new-project', serviceName: 'new-svc', keyJson: SA_KEY }, + }) + + expect(second.statusCode).toBe(200) + const sources = h.db.select().from(trafficSources).all() + expect(sources.length).toBe(1) + const config = JSON.parse(sources[0].configJson) as Record + expect(config.gcpProjectId).toBe('new-project') + expect(config.serviceName).toBe('new-svc') + }) +}) + +describe('POST /traffic/sources/:id/sync', () => { + it('returns 404 when the source does not belong to the project', async () => { + const h = await buildHarness([]) + try { + const res = await h.app.inject({ + method: 'POST', + url: '/api/v1/projects/test-project/traffic/sources/no-such-source/sync', + payload: {}, + }) + expect(res.statusCode).toBe(404) + } finally { + await h.close() + } + }) + + it('errors when no credentials are stored for the project', async () => { + const h = await buildHarness([]) + try { + const { projects } = await import('@ainyc/canonry-db') + const projectRow = h.db.select().from(projects).all()[0] + h.db.insert(trafficSources).values({ + id: 'src_orphan', + projectId: projectRow.id, + sourceType: TrafficSourceTypes['cloud-run'], + displayName: 'orphan', + status: TrafficSourceStatuses.connected, + configJson: '{"gcpProjectId":"orphan-project","authMode":"service-account"}', + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }).run() + + const res = await h.app.inject({ + method: 'POST', + url: '/api/v1/projects/test-project/traffic/sources/src_orphan/sync', + payload: {}, + }) + expect(res.statusCode).toBe(400) + expect(JSON.parse(res.payload).error.message).toMatch(/credential/i) + } finally { + await h.close() + } + }) + + it('pulls events, classifies, writes hourly buckets + samples + a completed run', async () => { + // Anchor events inside the 120-min sync window the test requests below, + // and snap to the top of an hour so the two crawler hits land in the + // same hourly bucket regardless of when the test runs. + const baseTime = new Date(Date.now() - 60 * 60_000) + baseTime.setMinutes(0, 0, 0) + const fromBase = (mins: number) => new Date(baseTime.getTime() + mins * 60_000).toISOString() + + const events: NormalizedTrafficRequest[] = [ + // Two crawler hits same hour same path → should accumulate to hits=2 in one bucket + buildEvent({ userAgent: 'GPTBot/1.0', path: '/blog/foo', status: 200, observedAt: fromBase(1) }), + buildEvent({ userAgent: 'GPTBot/1.0', path: '/blog/foo', status: 200, observedAt: fromBase(30) }), + // One AI referral via UTM + buildEvent({ + userAgent: 'Mozilla/5.0', + path: '/landing', + queryString: 'utm_source=chatgpt.com', + status: 200, + observedAt: fromBase(15), + }), + // One unclassified hit + buildEvent({ userAgent: 'curl/7.x', path: '/anything', status: 404, observedAt: fromBase(32) }), + ] + + const h = await buildHarness(events) + try { + // Connect first + const connectRes = await h.app.inject({ + method: 'POST', + url: '/api/v1/projects/test-project/traffic/connect/cloud-run', + payload: { gcpProjectId: 'openclaw-nyc', serviceName: 'openclaw-nyc', location: 'us-east1', keyJson: SA_KEY }, + }) + expect(connectRes.statusCode).toBe(200) + const sourceId = JSON.parse(connectRes.payload).id + + // Sync + const syncRes = await h.app.inject({ + method: 'POST', + url: `/api/v1/projects/test-project/traffic/sources/${sourceId}/sync`, + payload: { sinceMinutes: 120 }, + }) + expect(syncRes.statusCode).toBe(200) + const body = JSON.parse(syncRes.payload) + expect(body.pulledEvents).toBe(4) + expect(body.crawlerHits).toBe(2) + expect(body.aiReferralHits).toBe(1) + expect(body.unknownHits).toBe(1) + expect(body.crawlerBucketRows).toBe(1) + expect(body.aiReferralBucketRows).toBe(1) + expect(body.sampleRows).toBe(4) + expect(body.runId).toBeDefined() + + // Crawler bucket accumulated hits=2 + const crawlerRows = h.db.select().from(crawlerEventsHourly).all() + expect(crawlerRows.length).toBe(1) + expect(crawlerRows[0].hits).toBe(2) + expect(crawlerRows[0].botId).toBe('openai-gptbot') + + // AI referral bucket + const aiRows = h.db.select().from(aiReferralEventsHourly).all() + expect(aiRows.length).toBe(1) + expect(aiRows[0].evidenceType).toBe('utm') + expect(aiRows[0].sessionsOrHits).toBe(1) + + // Samples + const samples = h.db.select().from(rawEventSamples).all() + expect(samples.length).toBe(4) + const types = samples.map((s) => s.eventType).sort() + expect(types).toEqual(['ai_referral', 'crawler', 'crawler', 'unknown']) + + // Source updated + const sources = h.db.select().from(trafficSources).where(eq(trafficSources.id, sourceId)).all() + expect(sources[0].lastSyncedAt).toBeTruthy() + expect(sources[0].lastError).toBeNull() + + // Run row marked completed with kind=traffic-sync + const runRows = h.db.select().from(runs).all() + expect(runRows.length).toBe(1) + expect(runRows[0].kind).toBe(RunKinds['traffic-sync']) + expect(runRows[0].status).toBe(RunStatuses.completed) + } finally { + await h.close() + } + }) + + it('clamps windowStart to lastSyncedAt so overlapping syncs do not double-count', async () => { + // Event sits inside the default 60-min sync window for the first sync. After + // the first sync, lastSyncedAt is "now-ish", so the second sync's window + // collapses to roughly [lastSyncedAt, now] and no longer covers the event. + const baseTime = new Date(Date.now() - 30 * 60_000) + baseTime.setMinutes(0, 0, 0) + const observedAt = new Date(baseTime.getTime() + 5 * 60_000).toISOString() + + const events: NormalizedTrafficRequest[] = [ + buildEvent({ userAgent: 'GPTBot/1.0', path: '/blog/foo', status: 200, observedAt }), + ] + const h = await buildHarness(events) + try { + const connectRes = await h.app.inject({ + method: 'POST', + url: '/api/v1/projects/test-project/traffic/connect/cloud-run', + payload: { gcpProjectId: 'openclaw-nyc', keyJson: SA_KEY }, + }) + const sourceId = JSON.parse(connectRes.payload).id + + const first = await h.app.inject({ + method: 'POST', + url: `/api/v1/projects/test-project/traffic/sources/${sourceId}/sync`, + payload: {}, + }) + expect(JSON.parse(first.payload).pulledEvents).toBe(1) + + const second = await h.app.inject({ + method: 'POST', + url: `/api/v1/projects/test-project/traffic/sources/${sourceId}/sync`, + payload: {}, + }) + expect(JSON.parse(second.payload).pulledEvents).toBe(0) + + const rows = h.db.select().from(crawlerEventsHourly).all() + expect(rows.length).toBe(1) + expect(rows[0].hits).toBe(1) + + // The second sync's startTime should have been clamped to the first sync's + // lastSyncedAt — i.e. ≥ the first sync's endTime. + const windows = h.getObservedWindows() + expect(windows.length).toBe(2) + expect(new Date(windows[1].startTime).getTime()).toBeGreaterThanOrEqual( + new Date(windows[0].endTime).getTime(), + ) + } finally { + await h.close() + } + }) + + it('marks the source as error and the run as failed when the pull throws', async () => { + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'traffic-routes-test-')) + const dbPath = path.join(tmpDir, 'test.db') + const db = createClient(dbPath) + migrate(db) + + const credentials = new Map() + const cloudRunCredentialStore: CloudRunCredentialStore = { + getConnection: (n) => credentials.get(n), + upsertConnection: (r) => { credentials.set(r.projectName, r); return r }, + deleteConnection: (n) => credentials.delete(n), + } + + const app = Fastify() + app.register(apiRoutes, { + db, + skipAuth: true, + cloudRunCredentialStore, + pullCloudRunEvents: async () => { + throw new Error('Cloud Logging boom') + }, + resolveCloudRunAccessToken: async () => 'mock-token', + }) + await app.ready() + + await app.inject({ + method: 'PUT', + url: '/api/v1/projects/test-project', + payload: { displayName: 'Test', canonicalDomain: 'example.com', country: 'US', language: 'en' }, + }) + + const connectRes = await app.inject({ + method: 'POST', + url: '/api/v1/projects/test-project/traffic/connect/cloud-run', + payload: { gcpProjectId: 'openclaw-nyc', keyJson: SA_KEY }, + }) + const sourceId = JSON.parse(connectRes.payload).id + + const syncRes = await app.inject({ + method: 'POST', + url: `/api/v1/projects/test-project/traffic/sources/${sourceId}/sync`, + payload: {}, + }) + expect(syncRes.statusCode).toBe(400) + + const sourceRow = db.select().from(trafficSources).all()[0] + expect(sourceRow.status).toBe(TrafficSourceStatuses.error) + expect(sourceRow.lastError).toMatch(/boom/) + + const runRow = db.select().from(runs).all()[0] + expect(runRow.status).toBe(RunStatuses.failed) + expect(runRow.error).toMatch(/boom/) + + await app.close() + fs.rmSync(tmpDir, { recursive: true, force: true }) + }) +}) diff --git a/packages/canonry/AGENTS.md b/packages/canonry/AGENTS.md index 6deff606..4d9b45f1 100644 --- a/packages/canonry/AGENTS.md +++ b/packages/canonry/AGENTS.md @@ -40,6 +40,8 @@ The publishable npm package (`@ainyc/canonry`). Bundles the CLI, local Fastify s | `src/commands/report.ts` | `runReportCommand` — `canonry report ` — fetches `/report` JSON, renders self-contained HTML to disk via `renderReportHtml` from `@ainyc/canonry-api-routes` | | `src/cli-commands/report.ts` | CLI spec for `canonry report [--output ] [--format json]` | | `src/commands/ga.ts` | GA4 commands: `ga sync`, `ga traffic`, `ga status`, `ga social-referral-history`, `ga social-referral-summary`, `ga attribution` | +| `src/commands/traffic.ts` | Server-side traffic commands: `traffic connect cloud-run` (writes SA key to `~/.canonry/config.yaml`, creates a `traffic_sources` row) and `traffic sync` (pulls Cloud Logging, classifies, upserts hourly buckets + samples). | +| `src/cloud-run-config.ts` | Helpers for `cloudRun:` connection block in `~/.canonry/config.yaml` (mirrors `ga4-config.ts` / `google-config.ts`). | | `src/commands/backlinks.ts` | Backlinks commands: `backlinks install`, `doctor`, `status`, `sync`, `list`, `extract`, `releases`, `cache prune` | | `src/commoncrawl-sync.ts` | `executeReleaseSync` — workspace-level Common Crawl release download + DuckDB query job | | `src/backlink-extract.ts` | `executeBacklinkExtract` — per-project backlink extraction run | diff --git a/packages/canonry/package.json b/packages/canonry/package.json index c78e0614..835b1849 100644 --- a/packages/canonry/package.json +++ b/packages/canonry/package.json @@ -1,6 +1,6 @@ { "name": "@ainyc/canonry", - "version": "4.11.1", + "version": "4.12.1", "type": "module", "description": "Agent-first open-source AEO operating platform - track how answer engines cite your domain", "license": "FSL-1.1-ALv2", @@ -69,8 +69,10 @@ "@ainyc/canonry-db": "workspace:*", "@ainyc/canonry-intelligence": "workspace:*", "@ainyc/canonry-integration-bing": "workspace:*", + "@ainyc/canonry-integration-cloud-run": "workspace:*", "@ainyc/canonry-integration-commoncrawl": "workspace:*", "@ainyc/canonry-integration-google": "workspace:*", + "@ainyc/canonry-integration-traffic": "workspace:*", "@ainyc/canonry-integration-wordpress": "workspace:*", "@ainyc/canonry-provider-cdp": "workspace:*", "@ainyc/canonry-provider-claude": "workspace:*", diff --git a/packages/canonry/src/cli-commands.ts b/packages/canonry/src/cli-commands.ts index 4be4fe4a..ff849dd0 100644 --- a/packages/canonry/src/cli-commands.ts +++ b/packages/canonry/src/cli-commands.ts @@ -5,6 +5,7 @@ import { BING_CLI_COMMANDS } from './cli-commands/bing.js' import { CDP_CLI_COMMANDS } from './cli-commands/cdp.js' import { DOCTOR_CLI_COMMANDS } from './cli-commands/doctor.js' import { GA_CLI_COMMANDS } from './cli-commands/ga.js' +import { TRAFFIC_CLI_COMMANDS } from './cli-commands/traffic.js' import { COMPETITOR_CLI_COMMANDS } from './cli-commands/competitor.js' import { GOOGLE_CLI_COMMANDS } from './cli-commands/google.js' import { KEYWORD_CLI_COMMANDS } from './cli-commands/keyword.js' @@ -46,6 +47,7 @@ export const REGISTERED_CLI_COMMANDS: readonly CliCommandSpec[] = [ ...WORDPRESS_CLI_COMMANDS, ...CDP_CLI_COMMANDS, ...GA_CLI_COMMANDS, + ...TRAFFIC_CLI_COMMANDS, ...INTELLIGENCE_CLI_COMMANDS, ...CONTENT_CLI_COMMANDS, ...AGENT_CLI_COMMANDS, diff --git a/packages/canonry/src/cli-commands/traffic.ts b/packages/canonry/src/cli-commands/traffic.ts new file mode 100644 index 00000000..ab89d6fc --- /dev/null +++ b/packages/canonry/src/cli-commands/traffic.ts @@ -0,0 +1,84 @@ +import { trafficConnectCloudRun, trafficSync } from '../commands/traffic.js' +import type { CliCommandSpec } from '../cli-dispatch.js' +import { getString, requireProject, stringOption, unknownSubcommand } from '../cli-command-helpers.js' + +export const TRAFFIC_CLI_COMMANDS: readonly CliCommandSpec[] = [ + { + path: ['traffic', 'connect', 'cloud-run'], + usage: 'canonry traffic connect cloud-run --gcp-project --service-account-key [--service ] [--location ] [--display-name ] [--format json]', + options: { + 'gcp-project': stringOption(), + service: stringOption(), + location: stringOption(), + 'service-account-key': stringOption(), + 'display-name': stringOption(), + }, + run: async (input) => { + const project = requireProject( + input, + 'traffic.connect.cloud-run', + 'canonry traffic connect cloud-run --gcp-project --service-account-key ', + ) + const gcpProject = getString(input.values, 'gcp-project') + if (!gcpProject) throw new Error('--gcp-project is required') + const serviceAccountKey = getString(input.values, 'service-account-key') + if (!serviceAccountKey) throw new Error('--service-account-key is required') + + await trafficConnectCloudRun(project, { + gcpProject, + service: getString(input.values, 'service'), + location: getString(input.values, 'location'), + serviceAccountKey, + displayName: getString(input.values, 'display-name'), + format: input.format, + }) + }, + }, + { + path: ['traffic', 'connect'], + usage: 'canonry traffic connect [args]', + run: async (input) => { + unknownSubcommand(input.positionals[0], { + command: 'traffic connect', + usage: 'canonry traffic connect [args]', + available: ['cloud-run'], + }) + }, + }, + { + path: ['traffic', 'sync'], + usage: 'canonry traffic sync --source [--since-minutes 60] [--format json]', + options: { + source: stringOption(), + 'since-minutes': stringOption(), + }, + run: async (input) => { + const project = requireProject( + input, + 'traffic.sync', + 'canonry traffic sync --source [--since-minutes 60]', + ) + const source = getString(input.values, 'source') + if (!source) throw new Error('--source is required') + const sinceStr = getString(input.values, 'since-minutes') + const sinceMinutes = sinceStr ? parseInt(sinceStr, 10) : undefined + + await trafficSync(project, { + source, + sinceMinutes, + format: input.format, + }) + }, + }, + { + path: ['traffic'], + usage: 'canonry traffic [args]', + run: async (input) => { + unknownSubcommand(input.positionals[0], { + command: 'traffic', + usage: 'canonry traffic [args]', + available: ['connect', 'sync'], + }) + }, + }, +] diff --git a/packages/canonry/src/client.ts b/packages/canonry/src/client.ts index 3025ee37..7ab0f394 100644 --- a/packages/canonry/src/client.ts +++ b/packages/canonry/src/client.ts @@ -71,6 +71,9 @@ import type { ProjectSearchResponseDto, DoctorReportDto, ProjectReportDto, + TrafficSourceDto, + TrafficConnectCloudRunRequest, + TrafficSyncResponse, } from '@ainyc/canonry-contracts' export type { BrandMetricsDto, GapAnalysisDto, SourceBreakdownDto, AuditLogEntry, CompetitorDto, KeywordDto, QueryDto } @@ -786,6 +789,27 @@ export class ApiClient { return this.request('GET', `/projects/${encodeURIComponent(project)}/ga/session-history${qs}`) } + // Traffic — server-side ingestion + async trafficConnectCloudRun(project: string, body: TrafficConnectCloudRunRequest): Promise { + return this.request( + 'POST', + `/projects/${encodeURIComponent(project)}/traffic/connect/cloud-run`, + body, + ) + } + + async trafficSync( + project: string, + sourceId: string, + body?: { sinceMinutes?: number }, + ): Promise { + return this.request( + 'POST', + `/projects/${encodeURIComponent(project)}/traffic/sources/${encodeURIComponent(sourceId)}/sync`, + body ?? {}, + ) + } + async wordpressConnect( project: string, body: { diff --git a/packages/canonry/src/cloud-run-config.ts b/packages/canonry/src/cloud-run-config.ts new file mode 100644 index 00000000..64af18c0 --- /dev/null +++ b/packages/canonry/src/cloud-run-config.ts @@ -0,0 +1,52 @@ +import type { CanonryConfig, CloudRunConnectionConfigEntry } from './config.js' + +function ensureConnections(config: CanonryConfig): CloudRunConnectionConfigEntry[] { + if (!config.cloudRun) config.cloudRun = {} + if (!config.cloudRun.connections) config.cloudRun.connections = [] + return config.cloudRun.connections +} + +export function listCloudRunConnections(config: CanonryConfig): CloudRunConnectionConfigEntry[] { + return config.cloudRun?.connections ?? [] +} + +export function getCloudRunConnection( + config: CanonryConfig, + projectName: string, +): CloudRunConnectionConfigEntry | undefined { + return (config.cloudRun?.connections ?? []).find((c) => c.projectName === projectName) +} + +export function upsertCloudRunConnection( + config: CanonryConfig, + connection: CloudRunConnectionConfigEntry, +): CloudRunConnectionConfigEntry { + const connections = ensureConnections(config) + const index = connections.findIndex((c) => c.projectName === connection.projectName) + + if (index === -1) { + connections.push(connection) + return connection + } + + connections[index] = connection + return connection +} + +export function removeCloudRunConnection( + config: CanonryConfig, + projectName: string, +): boolean { + const connections = config.cloudRun?.connections + if (!connections?.length) return false + + const next = connections.filter((c) => c.projectName !== projectName) + if (next.length === connections.length) return false + + if (!config.cloudRun) return false + config.cloudRun.connections = next + if (next.length === 0) { + delete config.cloudRun + } + return true +} diff --git a/packages/canonry/src/commands/traffic.ts b/packages/canonry/src/commands/traffic.ts new file mode 100644 index 00000000..f4fe6ff5 --- /dev/null +++ b/packages/canonry/src/commands/traffic.ts @@ -0,0 +1,107 @@ +import type { TrafficSourceDto, TrafficSyncResponse } from '@ainyc/canonry-contracts' +import { createApiClient } from '../client.js' +import { CliError } from '../cli-error.js' + +function getClient() { + return createApiClient() +} + +export async function trafficConnectCloudRun(project: string, opts: { + gcpProject: string + service?: string + location?: string + serviceAccountKey?: string + displayName?: string + format?: string +}): Promise { + if (!opts.gcpProject) { + throw new CliError({ + code: 'TRAFFIC_GCP_PROJECT_REQUIRED', + message: '--gcp-project is required', + displayMessage: 'Error: --gcp-project is required', + details: { project }, + }) + } + if (!opts.serviceAccountKey) { + throw new CliError({ + code: 'TRAFFIC_KEY_FILE_REQUIRED', + message: '--service-account-key is required for v1 (OAuth-mode Cloud Run is not yet supported).', + displayMessage: 'Error: --service-account-key is required', + details: { project }, + }) + } + + const fs = await import('node:fs') + let keyJson: string + try { + keyJson = fs.readFileSync(opts.serviceAccountKey, 'utf-8') + JSON.parse(keyJson) + } catch (e) { + const msg = e instanceof Error ? e.message : String(e) + throw new CliError({ + code: 'TRAFFIC_KEY_FILE_READ_ERROR', + message: `Failed to read service-account key: ${msg}`, + displayMessage: `Error: failed to read --service-account-key "${opts.serviceAccountKey}": ${msg}`, + details: { project, keyFile: opts.serviceAccountKey }, + }) + } + + const client = getClient() + const result: TrafficSourceDto = await client.trafficConnectCloudRun(project, { + gcpProjectId: opts.gcpProject, + serviceName: opts.service, + location: opts.location, + displayName: opts.displayName, + keyJson, + }) + + if (opts.format === 'json') { + console.log(JSON.stringify(result, null, 2)) + return + } + + console.log(`Cloud Run traffic source connected for project "${project}".`) + console.log(` Source ID: ${result.id}`) + console.log(` Display name: ${result.displayName}`) + console.log(` Status: ${result.status}`) + console.log(` GCP project: ${result.config.gcpProjectId ?? '(unset)'}`) + if (result.config.serviceName) console.log(` Service: ${result.config.serviceName}`) + if (result.config.location) console.log(` Location: ${result.config.location}`) + console.log('') + console.log(`Next: canonry traffic sync ${project} --source ${result.id}`) +} + +export async function trafficSync(project: string, opts: { + source: string + sinceMinutes?: number + format?: string +}): Promise { + if (!opts.source) { + throw new CliError({ + code: 'TRAFFIC_SOURCE_REQUIRED', + message: '--source is required', + displayMessage: 'Error: --source is required (run `canonry traffic connect cloud-run` first if you have not connected a source)', + details: { project }, + }) + } + + const client = getClient() + const result: TrafficSyncResponse = await client.trafficSync(project, opts.source, { + sinceMinutes: opts.sinceMinutes, + }) + + if (opts.format === 'json') { + console.log(JSON.stringify(result, null, 2)) + return + } + + console.log(`Traffic sync complete for "${project}" (source ${opts.source}).`) + console.log(` Run ID: ${result.runId}`) + console.log(` Window: ${result.windowStart} → ${result.windowEnd}`) + console.log(` Pulled events: ${result.pulledEvents}`) + console.log(` Crawler hits: ${result.crawlerHits} (${result.crawlerBucketRows} hourly bucket${result.crawlerBucketRows === 1 ? '' : 's'})`) + console.log(` AI referral hits: ${result.aiReferralHits} (${result.aiReferralBucketRows} hourly bucket${result.aiReferralBucketRows === 1 ? '' : 's'})`) + console.log(` Unknown hits: ${result.unknownHits}`) + console.log(` Sample rows: ${result.sampleRows}`) + console.log(` Synced at: ${result.syncedAt}`) +} diff --git a/packages/canonry/src/config.ts b/packages/canonry/src/config.ts index b1d1c08b..14f98044 100644 --- a/packages/canonry/src/config.ts +++ b/packages/canonry/src/config.ts @@ -70,6 +70,29 @@ export interface Ga4ConfigEntry { connections?: Ga4ConnectionConfigEntry[] } +export type CloudRunAuthMode = 'oauth' | 'service-account' + +export interface CloudRunConnectionConfigEntry { + projectName: string + gcpProjectId: string + serviceName?: string + location?: string + authMode: CloudRunAuthMode + // service-account fields + clientEmail?: string + privateKey?: string + // oauth fields + refreshToken?: string + tokenExpiresAt?: string + scopes?: string[] + createdAt: string + updatedAt: string +} + +export interface CloudRunConfigEntry { + connections?: CloudRunConnectionConfigEntry[] +} + export type WordpressEnv = 'live' | 'staging' export interface WordpressConnectionConfigEntry { @@ -111,6 +134,7 @@ export interface CanonryConfig { google?: GoogleConfigEntry bing?: BingConfigEntry ga4?: Ga4ConfigEntry + cloudRun?: CloudRunConfigEntry wordpress?: WordpressConfigEntry // Dashboard password hash (SHA-256 hex) — set during first dashboard visit dashboardPasswordHash?: string diff --git a/packages/canonry/src/mcp/openapi-classification.ts b/packages/canonry/src/mcp/openapi-classification.ts index f442f500..091806b2 100644 --- a/packages/canonry/src/mcp/openapi-classification.ts +++ b/packages/canonry/src/mcp/openapi-classification.ts @@ -95,6 +95,8 @@ export const MCP_OPENAPI_OPERATION_CLASSIFICATIONS = { 'POST /api/v1/projects/{name}/bing/request-indexing': 'deferred', 'GET /api/v1/projects/{name}/bing/performance': 'deferred', 'POST /api/v1/projects/{name}/wordpress/connect': 'deferred', + 'POST /api/v1/projects/{name}/traffic/connect/cloud-run': 'deferred', + 'POST /api/v1/projects/{name}/traffic/sources/{id}/sync': 'deferred', 'DELETE /api/v1/projects/{name}/wordpress/disconnect': 'deferred', 'GET /api/v1/projects/{name}/wordpress/status': 'deferred', 'GET /api/v1/projects/{name}/wordpress/pages': 'deferred', diff --git a/packages/canonry/src/server.ts b/packages/canonry/src/server.ts index 5fa16ec2..91ae0571 100644 --- a/packages/canonry/src/server.ts +++ b/packages/canonry/src/server.ts @@ -35,6 +35,11 @@ import { upsertGa4Connection, removeGa4Connection, } from './ga4-config.js' +import { + getCloudRunConnection, + upsertCloudRunConnection, + removeCloudRunConnection, +} from './cloud-run-config.js' import { getWordpressConnection, patchWordpressConnection, @@ -470,6 +475,36 @@ export async function createServer(opts: { }, } as const + // Cloud Run credential store — stores SA keys / OAuth tokens in ~/.canonry/config.yaml + const cloudRunCredentialStore = { + getConnection: (projectName: string) => { + return getCloudRunConnection(opts.config, projectName) + }, + upsertConnection: (record: { + projectName: string + gcpProjectId: string + serviceName?: string + location?: string + authMode: 'oauth' | 'service-account' + clientEmail?: string + privateKey?: string + refreshToken?: string + tokenExpiresAt?: string + scopes?: string[] + createdAt: string + updatedAt: string + }) => { + const updated = upsertCloudRunConnection(opts.config, record) + saveConfigPatch(opts.config) + return updated + }, + deleteConnection: (projectName: string) => { + const removed = removeCloudRunConnection(opts.config, projectName) + if (removed) saveConfigPatch(opts.config) + return removed + }, + } as const + const googleStateSecret = process.env.GOOGLE_STATE_SECRET ?? crypto.randomBytes(32).toString('hex') const googleConnectionStore = { @@ -921,6 +956,7 @@ export async function createServer(opts: { }, wordpressConnectionStore, ga4CredentialStore, + cloudRunCredentialStore, onRunCreated: (runId: string, projectId: string, providers?: string[], location?: import('@ainyc/canonry-contracts').LocationContext | null) => { // Fire and forget — run executes in background jobRunner.executeRun(runId, projectId, providers, location).catch((err: unknown) => { diff --git a/packages/canonry/test/cloud-run-config.test.ts b/packages/canonry/test/cloud-run-config.test.ts new file mode 100644 index 00000000..e8d4f72d --- /dev/null +++ b/packages/canonry/test/cloud-run-config.test.ts @@ -0,0 +1,114 @@ +import { test, expect } from 'vitest' + +import type { CanonryConfig } from '../src/config.js' +import { + getCloudRunConnection, + listCloudRunConnections, + upsertCloudRunConnection, + removeCloudRunConnection, +} from '../src/cloud-run-config.js' + +function makeConfig(): CanonryConfig { + return { + apiUrl: 'http://localhost:4100', + database: '/tmp/canonry.db', + apiKey: 'cnry_test', + } +} + +test('cloud-run config helpers persist a service-account connection scoped by project name', () => { + const config = makeConfig() + const now = new Date().toISOString() + + upsertCloudRunConnection(config, { + projectName: 'demo', + gcpProjectId: 'openclaw-nyc', + serviceName: 'openclaw-nyc', + location: 'us-east1', + authMode: 'service-account', + clientEmail: 'sa@openclaw-nyc.iam.gserviceaccount.com', + privateKey: '-----BEGIN PRIVATE KEY-----\nMIIE...\n-----END PRIVATE KEY-----', + createdAt: now, + updatedAt: now, + }) + + const conn = getCloudRunConnection(config, 'demo') + expect(conn).toBeDefined() + expect(conn?.gcpProjectId).toBe('openclaw-nyc') + expect(conn?.authMode).toBe('service-account') + expect(conn?.clientEmail).toBe('sa@openclaw-nyc.iam.gserviceaccount.com') + expect(conn?.privateKey).toContain('PRIVATE KEY') +}) + +test('cloud-run config supports an OAuth-mode connection', () => { + const config = makeConfig() + const now = new Date().toISOString() + + upsertCloudRunConnection(config, { + projectName: 'demo', + gcpProjectId: 'openclaw-nyc', + authMode: 'oauth', + refreshToken: 'rt_xxx', + tokenExpiresAt: new Date(Date.now() + 3600_000).toISOString(), + scopes: ['https://www.googleapis.com/auth/logging.read'], + createdAt: now, + updatedAt: now, + }) + + const conn = getCloudRunConnection(config, 'demo') + expect(conn?.authMode).toBe('oauth') + expect(conn?.refreshToken).toBe('rt_xxx') + expect(conn?.scopes).toContain('https://www.googleapis.com/auth/logging.read') + expect(conn?.privateKey).toBeUndefined() +}) + +test('upsertCloudRunConnection replaces the existing entry for the same project', () => { + const config = makeConfig() + const now = new Date().toISOString() + + upsertCloudRunConnection(config, { + projectName: 'demo', + gcpProjectId: 'openclaw-nyc', + authMode: 'service-account', + clientEmail: 'sa1@x', + privateKey: 'pk1', + createdAt: now, + updatedAt: now, + }) + upsertCloudRunConnection(config, { + projectName: 'demo', + gcpProjectId: 'openclaw-nyc', + authMode: 'service-account', + clientEmail: 'sa2@x', + privateKey: 'pk2', + createdAt: now, + updatedAt: now, + }) + + expect(listCloudRunConnections(config).length).toBe(1) + expect(getCloudRunConnection(config, 'demo')?.clientEmail).toBe('sa2@x') +}) + +test('removeCloudRunConnection deletes the entry and prunes the empty cloudRun block', () => { + const config = makeConfig() + const now = new Date().toISOString() + + upsertCloudRunConnection(config, { + projectName: 'demo', + gcpProjectId: 'openclaw-nyc', + authMode: 'service-account', + clientEmail: 'sa@x', + privateKey: 'pk', + createdAt: now, + updatedAt: now, + }) + + expect(removeCloudRunConnection(config, 'demo')).toBe(true) + expect(getCloudRunConnection(config, 'demo')).toBeUndefined() + expect(config.cloudRun).toBeUndefined() +}) + +test('removeCloudRunConnection returns false when nothing to delete', () => { + const config = makeConfig() + expect(removeCloudRunConnection(config, 'demo')).toBe(false) +}) diff --git a/packages/canonry/test/traffic-commands.test.ts b/packages/canonry/test/traffic-commands.test.ts new file mode 100644 index 00000000..bc5a4ab6 --- /dev/null +++ b/packages/canonry/test/traffic-commands.test.ts @@ -0,0 +1,191 @@ +import { describe, it, beforeEach, afterEach, expect } from 'vitest' +import os from 'node:os' +import fs from 'node:fs' +import path from 'node:path' +import crypto from 'node:crypto' +import { parse } from 'yaml' +import { createClient, migrate, apiKeys, trafficSources } from '@ainyc/canonry-db' +import { createServer } from '../src/server.js' +import { ApiClient } from '../src/client.js' +import { invokeCli, parseJsonOutput } from './cli-test-utils.js' + +describe('traffic CLI commands', () => { + let tmpDir: string + let origConfigDir: string | undefined + let client: ApiClient + let db: ReturnType + let close: () => Promise + + beforeEach(async () => { + tmpDir = path.join(os.tmpdir(), `canonry-cli-traffic-${crypto.randomUUID()}`) + fs.mkdirSync(tmpDir, { recursive: true }) + origConfigDir = process.env.CANONRY_CONFIG_DIR + process.env.CANONRY_CONFIG_DIR = tmpDir + + const dbPath = path.join(tmpDir, 'data.db') + const configPath = path.join(tmpDir, 'config.yaml') + + db = createClient(dbPath) + migrate(db) + + const apiKeyPlain = `cnry_${crypto.randomBytes(16).toString('hex')}` + const hashed = crypto.createHash('sha256').update(apiKeyPlain).digest('hex') + db.insert(apiKeys).values({ + id: crypto.randomUUID(), + name: 'test', + keyHash: hashed, + keyPrefix: apiKeyPlain.slice(0, 8), + createdAt: new Date().toISOString(), + }).run() + + const config = { + apiUrl: 'http://localhost:0', + database: dbPath, + apiKey: apiKeyPlain, + providers: {}, + } + fs.writeFileSync(configPath, JSON.stringify(config), 'utf-8') + + const app = await createServer({ + config: config as Parameters[0]['config'], + db, + logger: false, + }) + await app.listen({ host: '127.0.0.1', port: 0 }) + + const addr = app.server.address() + const port = typeof addr === 'object' && addr ? addr.port : 0 + const serverUrl = `http://127.0.0.1:${port}` + config.apiUrl = serverUrl + fs.writeFileSync(configPath, JSON.stringify(config), 'utf-8') + + close = () => app.close() + client = new ApiClient(serverUrl, apiKeyPlain) + + await client.putProject('test-proj', { + displayName: 'Test Project', + canonicalDomain: 'example.com', + country: 'US', + language: 'en', + }) + }) + + afterEach(async () => { + await close() + if (origConfigDir === undefined) delete process.env.CANONRY_CONFIG_DIR + else process.env.CANONRY_CONFIG_DIR = origConfigDir + fs.rmSync(tmpDir, { recursive: true, force: true }) + }) + + it('rejects traffic connect cloud-run without --gcp-project', async () => { + const result = await invokeCli([ + 'traffic', + 'connect', + 'cloud-run', + 'test-proj', + '--service-account-key', + '/tmp/does-not-exist.json', + ]) + expect(result.exitCode).not.toBe(0) + expect(result.stderr).toMatch(/--gcp-project/) + }) + + it('rejects traffic connect cloud-run without --service-account-key', async () => { + const result = await invokeCli([ + 'traffic', + 'connect', + 'cloud-run', + 'test-proj', + '--gcp-project', + 'openclaw-nyc', + ]) + expect(result.exitCode).not.toBe(0) + expect(result.stderr).toMatch(/--service-account-key/) + }) + + it('reports a clear error when the key file cannot be read', async () => { + const result = await invokeCli([ + 'traffic', + 'connect', + 'cloud-run', + 'test-proj', + '--gcp-project', + 'openclaw-nyc', + '--service-account-key', + '/tmp/this-file-really-does-not-exist-xyzzy.json', + ]) + expect(result.exitCode).not.toBe(0) + expect(result.stderr).toMatch(/failed to read --service-account-key/i) + }) + + it('connects via service-account key file and persists creds + source row', async () => { + const keyPath = path.join(tmpDir, 'sa-key.json') + fs.writeFileSync( + keyPath, + JSON.stringify({ + client_email: 'sa@openclaw-nyc.iam.gserviceaccount.com', + private_key: '-----BEGIN PRIVATE KEY-----\nfake\n-----END PRIVATE KEY-----', + }), + 'utf-8', + ) + + const result = await invokeCli([ + 'traffic', + 'connect', + 'cloud-run', + 'test-proj', + '--gcp-project', + 'openclaw-nyc', + '--service', + 'openclaw-nyc', + '--location', + 'us-east1', + '--service-account-key', + keyPath, + '--format', + 'json', + ]) + + expect(result.exitCode).toBeUndefined() + const dto = parseJsonOutput(result.stdout) as { id: string; status: string; config: Record } + expect(dto.id).toBeTruthy() + expect(dto.status).toBe('connected') + expect(dto.config.gcpProjectId).toBe('openclaw-nyc') + expect(dto.config.serviceName).toBe('openclaw-nyc') + + const rows = db.select().from(trafficSources).all() + expect(rows.length).toBe(1) + expect(rows[0].status).toBe('connected') + + // Credentials persisted to ~/.canonry/config.yaml (CANONRY_CONFIG_DIR points at tmpDir) + const yaml = parse(fs.readFileSync(path.join(tmpDir, 'config.yaml'), 'utf-8')) as { + cloudRun?: { connections?: Array<{ projectName: string; clientEmail: string }> } + } + expect(yaml.cloudRun?.connections?.[0]?.projectName).toBe('test-proj') + expect(yaml.cloudRun?.connections?.[0]?.clientEmail).toBe('sa@openclaw-nyc.iam.gserviceaccount.com') + }) + + it('rejects traffic sync without --source', async () => { + const result = await invokeCli(['traffic', 'sync', 'test-proj']) + expect(result.exitCode).not.toBe(0) + expect(result.stderr).toMatch(/--source/) + }) + + it('reports 404 for an unknown traffic source on sync', async () => { + const result = await invokeCli([ + 'traffic', + 'sync', + 'test-proj', + '--source', + 'no-such-source-id', + ]) + expect(result.exitCode).not.toBe(0) + expect(result.stderr).toMatch(/not found/i) + }) + + it('errors on bare `traffic` invocation', async () => { + const result = await invokeCli(['traffic']) + expect(result.exitCode).not.toBe(0) + expect(result.stderr).toMatch(/unknown traffic subcommand/i) + }) +}) diff --git a/packages/contracts/src/run.ts b/packages/contracts/src/run.ts index 79eb6023..9fa9ddcd 100644 --- a/packages/contracts/src/run.ts +++ b/packages/contracts/src/run.ts @@ -14,6 +14,7 @@ export const runKindSchema = z.enum([ 'bing-inspect', 'bing-inspect-sitemap', 'backlink-extract', + 'traffic-sync', ]) export type RunKind = z.infer export const RunKinds = runKindSchema.enum diff --git a/packages/contracts/src/traffic.ts b/packages/contracts/src/traffic.ts index 210abc45..06be2400 100644 --- a/packages/contracts/src/traffic.ts +++ b/packages/contracts/src/traffic.ts @@ -68,3 +68,61 @@ export const normalizedTrafficPullPageSchema = z.object({ filter: z.string(), }) export type NormalizedTrafficPullPage = z.infer + +export const trafficSourceStatusSchema = z.enum(['connected', 'paused', 'error', 'archived']) +export type TrafficSourceStatus = z.infer +export const TrafficSourceStatuses = trafficSourceStatusSchema.enum + +export const trafficSourceAuthModeSchema = z.enum(['oauth', 'service-account']) +export type TrafficSourceAuthMode = z.infer +export const TrafficSourceAuthModes = trafficSourceAuthModeSchema.enum + +export const cloudRunSourceConfigSchema = z.object({ + gcpProjectId: z.string().min(1), + serviceName: z.string().nullable().optional(), + location: z.string().nullable().optional(), + authMode: trafficSourceAuthModeSchema, +}) +export type CloudRunSourceConfig = z.infer + +export const trafficSourceDtoSchema = z.object({ + id: z.string(), + projectId: z.string(), + sourceType: trafficSourceTypeSchema, + displayName: z.string(), + status: trafficSourceStatusSchema, + lastSyncedAt: z.string().nullable(), + lastCursor: z.string().nullable(), + lastError: z.string().nullable(), + archivedAt: z.string().nullable(), + config: z.record(z.string(), z.unknown()), + createdAt: z.string(), + updatedAt: z.string(), +}) +export type TrafficSourceDto = z.infer + +export const trafficConnectCloudRunRequestSchema = z.object({ + gcpProjectId: z.string().min(1), + serviceName: z.string().min(1).optional(), + location: z.string().min(1).optional(), + displayName: z.string().min(1).optional(), + /** Service-account JSON content (string). When omitted, defaults to OAuth via `canonry google connect --type ga4` flow. */ + keyJson: z.string().optional(), +}) +export type TrafficConnectCloudRunRequest = z.infer + +export const trafficSyncResponseSchema = z.object({ + sourceId: z.string(), + runId: z.string(), + syncedAt: z.string(), + pulledEvents: z.number().int().nonnegative(), + crawlerHits: z.number().int().nonnegative(), + aiReferralHits: z.number().int().nonnegative(), + unknownHits: z.number().int().nonnegative(), + crawlerBucketRows: z.number().int().nonnegative(), + aiReferralBucketRows: z.number().int().nonnegative(), + sampleRows: z.number().int().nonnegative(), + windowStart: z.string(), + windowEnd: z.string(), +}) +export type TrafficSyncResponse = z.infer diff --git a/packages/db/src/migrate.ts b/packages/db/src/migrate.ts index 95385552..e8c65453 100644 --- a/packages/db/src/migrate.ts +++ b/packages/db/src/migrate.ts @@ -858,6 +858,80 @@ export const MIGRATION_VERSIONS: ReadonlyArray = [ normalizeLegacyQuerySchema(tx) }, }, + { + version: 49, + name: 'server-side-traffic-tables', + statements: [ + `CREATE TABLE IF NOT EXISTS traffic_sources ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE, + source_type TEXT NOT NULL, + display_name TEXT NOT NULL, + status TEXT NOT NULL, + last_synced_at TEXT, + last_cursor TEXT, + last_error TEXT, + archived_at TEXT, + config_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + )`, + `CREATE INDEX IF NOT EXISTS idx_traffic_sources_project ON traffic_sources(project_id)`, + `CREATE INDEX IF NOT EXISTS idx_traffic_sources_project_status ON traffic_sources(project_id, status)`, + `CREATE TABLE IF NOT EXISTS crawler_events_hourly ( + project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE, + source_id TEXT NOT NULL REFERENCES traffic_sources(id) ON DELETE CASCADE, + ts_hour TEXT NOT NULL, + bot_id TEXT NOT NULL, + operator TEXT NOT NULL, + verification_status TEXT NOT NULL, + path_normalized TEXT NOT NULL, + status INTEGER NOT NULL, + hits INTEGER NOT NULL DEFAULT 0, + sampled_user_agent TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + PRIMARY KEY (project_id, source_id, ts_hour, bot_id, verification_status, path_normalized, status) + )`, + `CREATE INDEX IF NOT EXISTS idx_crawler_hourly_project_ts ON crawler_events_hourly(project_id, ts_hour)`, + `CREATE INDEX IF NOT EXISTS idx_crawler_hourly_path ON crawler_events_hourly(project_id, path_normalized)`, + `CREATE TABLE IF NOT EXISTS ai_referral_events_hourly ( + project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE, + source_id TEXT NOT NULL REFERENCES traffic_sources(id) ON DELETE CASCADE, + ts_hour TEXT NOT NULL, + product TEXT NOT NULL, + operator TEXT NOT NULL, + source_domain TEXT NOT NULL, + evidence_type TEXT NOT NULL, + landing_path_normalized TEXT NOT NULL, + status INTEGER NOT NULL, + sessions_or_hits INTEGER NOT NULL DEFAULT 0, + users_estimated INTEGER, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + PRIMARY KEY (project_id, source_id, ts_hour, product, source_domain, evidence_type, landing_path_normalized, status) + )`, + `CREATE INDEX IF NOT EXISTS idx_ai_referral_hourly_project_ts ON ai_referral_events_hourly(project_id, ts_hour)`, + `CREATE INDEX IF NOT EXISTS idx_ai_referral_hourly_landing ON ai_referral_events_hourly(project_id, landing_path_normalized)`, + `CREATE TABLE IF NOT EXISTS raw_event_samples ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE, + source_id TEXT NOT NULL REFERENCES traffic_sources(id) ON DELETE CASCADE, + ts TEXT NOT NULL, + event_type TEXT NOT NULL, + ip_hash TEXT, + user_agent TEXT, + path_normalized TEXT NOT NULL, + status INTEGER, + referer_host TEXT, + classifier_details_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL + )`, + `CREATE INDEX IF NOT EXISTS idx_raw_event_samples_project_ts ON raw_event_samples(project_id, ts)`, + `CREATE INDEX IF NOT EXISTS idx_raw_event_samples_source_ts ON raw_event_samples(source_id, ts)`, + `CREATE INDEX IF NOT EXISTS idx_raw_event_samples_event_type ON raw_event_samples(event_type)`, + ], + }, ] /** diff --git a/packages/db/src/schema.ts b/packages/db/src/schema.ts index 186d89a6..ab79f982 100644 --- a/packages/db/src/schema.ts +++ b/packages/db/src/schema.ts @@ -1,4 +1,4 @@ -import { index, integer, sqliteTable, text, uniqueIndex } from 'drizzle-orm/sqlite-core' +import { index, integer, primaryKey, sqliteTable, text, uniqueIndex } from 'drizzle-orm/sqlite-core' export const projects = sqliteTable('projects', { id: text('id').primaryKey(), @@ -546,6 +546,115 @@ export const agentMemory = sqliteTable('agent_memory', { index('idx_agent_memory_project_updated').on(table.projectId, table.updatedAt), ]) +// --- Server-side traffic ingestion --- +// Per-source connection metadata. Credentials live in ~/.canonry/config.yaml, +// not here. `archived_at` retains the row after a host migration so historical +// crawler/referral buckets keep their FK target. +export const trafficSources = sqliteTable('traffic_sources', { + id: text('id').primaryKey(), + projectId: text('project_id').notNull().references(() => projects.id, { onDelete: 'cascade' }), + sourceType: text('source_type').notNull(), + displayName: text('display_name').notNull(), + status: text('status').notNull(), + lastSyncedAt: text('last_synced_at'), + lastCursor: text('last_cursor'), + lastError: text('last_error'), + archivedAt: text('archived_at'), + configJson: text('config_json').notNull().default('{}'), + createdAt: text('created_at').notNull(), + updatedAt: text('updated_at').notNull(), +}, (table) => [ + index('idx_traffic_sources_project').on(table.projectId), + index('idx_traffic_sources_project_status').on(table.projectId, table.status), +]) + +// Hourly rollup of server-observed crawler hits. Composite PK so the same +// (project, source, hour, bot, verification, path, status) tuple can be +// upserted to accumulate `hits` without a surrogate row id. +export const crawlerEventsHourly = sqliteTable('crawler_events_hourly', { + projectId: text('project_id').notNull().references(() => projects.id, { onDelete: 'cascade' }), + sourceId: text('source_id').notNull().references(() => trafficSources.id, { onDelete: 'cascade' }), + tsHour: text('ts_hour').notNull(), + botId: text('bot_id').notNull(), + operator: text('operator').notNull(), + verificationStatus: text('verification_status').notNull(), + pathNormalized: text('path_normalized').notNull(), + status: integer('status').notNull(), + hits: integer('hits').notNull().default(0), + sampledUserAgent: text('sampled_user_agent'), + createdAt: text('created_at').notNull(), + updatedAt: text('updated_at').notNull(), +}, (table) => [ + primaryKey({ + columns: [ + table.projectId, + table.sourceId, + table.tsHour, + table.botId, + table.verificationStatus, + table.pathNormalized, + table.status, + ], + }), + index('idx_crawler_hourly_project_ts').on(table.projectId, table.tsHour), + index('idx_crawler_hourly_path').on(table.projectId, table.pathNormalized), +]) + +// Hourly rollup of human visits with explicit AI-origin evidence (referer +// host or UTM source). Independent from `crawler_events_hourly` — never +// collapse the two; they answer different questions. +export const aiReferralEventsHourly = sqliteTable('ai_referral_events_hourly', { + projectId: text('project_id').notNull().references(() => projects.id, { onDelete: 'cascade' }), + sourceId: text('source_id').notNull().references(() => trafficSources.id, { onDelete: 'cascade' }), + tsHour: text('ts_hour').notNull(), + product: text('product').notNull(), + operator: text('operator').notNull(), + sourceDomain: text('source_domain').notNull(), + evidenceType: text('evidence_type').notNull(), + landingPathNormalized: text('landing_path_normalized').notNull(), + status: integer('status').notNull(), + sessionsOrHits: integer('sessions_or_hits').notNull().default(0), + usersEstimated: integer('users_estimated'), + createdAt: text('created_at').notNull(), + updatedAt: text('updated_at').notNull(), +}, (table) => [ + primaryKey({ + columns: [ + table.projectId, + table.sourceId, + table.tsHour, + table.product, + table.sourceDomain, + table.evidenceType, + table.landingPathNormalized, + table.status, + ], + }), + index('idx_ai_referral_hourly_project_ts').on(table.projectId, table.tsHour), + index('idx_ai_referral_hourly_landing').on(table.projectId, table.landingPathNormalized), +]) + +// Short-retention raw evidence for classifier debugging and replay. +// Default retention is 30 days; older rows are pruned out-of-band. +export const rawEventSamples = sqliteTable('raw_event_samples', { + id: text('id').primaryKey(), + projectId: text('project_id').notNull().references(() => projects.id, { onDelete: 'cascade' }), + sourceId: text('source_id').notNull().references(() => trafficSources.id, { onDelete: 'cascade' }), + ts: text('ts').notNull(), + eventType: text('event_type').notNull(), + ipHash: text('ip_hash'), + userAgent: text('user_agent'), + pathNormalized: text('path_normalized').notNull(), + status: integer('status'), + refererHost: text('referer_host'), + classifierDetailsJson: text('classifier_details_json').notNull().default('{}'), + createdAt: text('created_at').notNull(), +}, (table) => [ + index('idx_raw_event_samples_project_ts').on(table.projectId, table.ts), + index('idx_raw_event_samples_source_ts').on(table.sourceId, table.ts), + index('idx_raw_event_samples_event_type').on(table.eventType), +]) + /** * Internal bookkeeping for the migration runner. One row per applied * `MIGRATION_VERSIONS` entry. The migrator reads `MAX(version)` on boot and diff --git a/packages/db/test/traffic-tables.test.ts b/packages/db/test/traffic-tables.test.ts new file mode 100644 index 00000000..e3f6cffe --- /dev/null +++ b/packages/db/test/traffic-tables.test.ts @@ -0,0 +1,283 @@ +import { test, expect, onTestFinished } from 'vitest' +import fs from 'node:fs' +import os from 'node:os' +import path from 'node:path' +import { and, eq } from 'drizzle-orm' +import { + createClient, + migrate, + projects, + trafficSources, + crawlerEventsHourly, + aiReferralEventsHourly, + rawEventSamples, +} from '../src/index.js' + +function createTempDb() { + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'canonry-traffic-test-')) + const dbPath = path.join(tmpDir, 'test.db') + const db = createClient(dbPath) + migrate(db) + return { db, dbPath, tmpDir } +} + +function cleanup(tmpDir: string) { + fs.rmSync(tmpDir, { recursive: true, force: true }) +} + +function seedProject(db: ReturnType['db']) { + const now = new Date().toISOString() + db.insert(projects).values({ + id: 'proj_1', + name: 'test-project', + displayName: 'Test Project', + canonicalDomain: 'example.com', + country: 'US', + language: 'en', + createdAt: now, + updatedAt: now, + }).run() +} + +test('traffic_sources round-trips a connected cloud-run source', () => { + const { db, tmpDir } = createTempDb() + onTestFinished(() => cleanup(tmpDir)) + + seedProject(db) + + const now = new Date().toISOString() + db.insert(trafficSources).values({ + id: 'src_1', + projectId: 'proj_1', + sourceType: 'cloud-run', + displayName: 'Cloud Run · openclaw-nyc', + status: 'connected', + lastSyncedAt: null, + lastCursor: null, + lastError: null, + archivedAt: null, + configJson: JSON.stringify({ gcpProjectId: 'openclaw-nyc', serviceName: 'openclaw-nyc', location: 'us-east1', authMode: 'service-account' }), + createdAt: now, + updatedAt: now, + }).run() + + const [row] = db.select().from(trafficSources).where(eq(trafficSources.id, 'src_1')).all() + + expect(row).toBeDefined() + expect(row.sourceType).toBe('cloud-run') + expect(row.status).toBe('connected') + expect(row.archivedAt).toBeNull() + expect(row.lastSyncedAt).toBeNull() +}) + +test('traffic_sources supports archived status with archived_at', () => { + const { db, tmpDir } = createTempDb() + onTestFinished(() => cleanup(tmpDir)) + + seedProject(db) + + const now = new Date().toISOString() + db.insert(trafficSources).values({ + id: 'src_archived', + projectId: 'proj_1', + sourceType: 'cloud-run', + displayName: 'Old host', + status: 'archived', + archivedAt: now, + configJson: '{}', + createdAt: now, + updatedAt: now, + }).run() + + const [row] = db.select().from(trafficSources).where(eq(trafficSources.id, 'src_archived')).all() + + expect(row.status).toBe('archived') + expect(row.archivedAt).toBe(now) +}) + +test('crawler_events_hourly composite PK rejects duplicate inserts and accumulates via upsert', () => { + const { db, tmpDir } = createTempDb() + onTestFinished(() => cleanup(tmpDir)) + + seedProject(db) + + const now = new Date().toISOString() + db.insert(trafficSources).values({ + id: 'src_2', + projectId: 'proj_1', + sourceType: 'cloud-run', + displayName: 'Cloud Run', + status: 'connected', + configJson: '{}', + createdAt: now, + updatedAt: now, + }).run() + + const baseRow = { + projectId: 'proj_1', + sourceId: 'src_2', + tsHour: '2026-05-07T17:00:00.000Z', + botId: 'gptbot', + operator: 'OpenAI', + verificationStatus: 'claimed_unverified', + pathNormalized: '/blog/foo', + status: 200, + hits: 3, + sampledUserAgent: 'GPTBot/1.0', + createdAt: now, + updatedAt: now, + } + + db.insert(crawlerEventsHourly).values(baseRow).run() + + // Re-insert with a different hits count must conflict on the composite PK + // and let the caller upsert (set hits = hits + 5). + expect(() => db.insert(crawlerEventsHourly).values(baseRow).run()).toThrow() + + // Composite PK lookup works + const [row] = db + .select() + .from(crawlerEventsHourly) + .where( + and( + eq(crawlerEventsHourly.projectId, 'proj_1'), + eq(crawlerEventsHourly.sourceId, 'src_2'), + eq(crawlerEventsHourly.tsHour, '2026-05-07T17:00:00.000Z'), + eq(crawlerEventsHourly.botId, 'gptbot'), + eq(crawlerEventsHourly.verificationStatus, 'claimed_unverified'), + eq(crawlerEventsHourly.pathNormalized, '/blog/foo'), + eq(crawlerEventsHourly.status, 200), + ), + ) + .all() + expect(row.hits).toBe(3) + expect(row.operator).toBe('OpenAI') +}) + +test('ai_referral_events_hourly stores hourly buckets keyed by product+source+evidence', () => { + const { db, tmpDir } = createTempDb() + onTestFinished(() => cleanup(tmpDir)) + + seedProject(db) + + const now = new Date().toISOString() + db.insert(trafficSources).values({ + id: 'src_3', + projectId: 'proj_1', + sourceType: 'cloud-run', + displayName: 'Cloud Run', + status: 'connected', + configJson: '{}', + createdAt: now, + updatedAt: now, + }).run() + + db.insert(aiReferralEventsHourly).values({ + projectId: 'proj_1', + sourceId: 'src_3', + tsHour: '2026-05-07T17:00:00.000Z', + product: 'ChatGPT', + operator: 'OpenAI', + sourceDomain: 'chatgpt.com', + evidenceType: 'utm', + landingPathNormalized: '/blog/open-source-aeo-audit-tool', + status: 200, + sessionsOrHits: 2, + usersEstimated: null, + createdAt: now, + updatedAt: now, + }).run() + + const [row] = db + .select() + .from(aiReferralEventsHourly) + .where( + and( + eq(aiReferralEventsHourly.projectId, 'proj_1'), + eq(aiReferralEventsHourly.product, 'ChatGPT'), + ), + ) + .all() + expect(row).toBeDefined() + expect(row.evidenceType).toBe('utm') + expect(row.sessionsOrHits).toBe(2) + expect(row.usersEstimated).toBeNull() +}) + +test('raw_event_samples stores debug samples without full IPs', () => { + const { db, tmpDir } = createTempDb() + onTestFinished(() => cleanup(tmpDir)) + + seedProject(db) + + const now = new Date().toISOString() + db.insert(trafficSources).values({ + id: 'src_4', + projectId: 'proj_1', + sourceType: 'cloud-run', + displayName: 'Cloud Run', + status: 'connected', + configJson: '{}', + createdAt: now, + updatedAt: now, + }).run() + + db.insert(rawEventSamples).values({ + id: 'sample_1', + projectId: 'proj_1', + sourceId: 'src_4', + ts: '2026-05-07T17:32:00.000Z', + eventType: 'crawler', + ipHash: 'abc123def', + userAgent: 'GPTBot/1.0', + pathNormalized: '/pricing', + status: 200, + refererHost: null, + classifierDetailsJson: JSON.stringify({ botId: 'gptbot' }), + createdAt: now, + }).run() + + const [row] = db.select().from(rawEventSamples).where(eq(rawEventSamples.id, 'sample_1')).all() + expect(row.eventType).toBe('crawler') + expect(row.ipHash).toBe('abc123def') + expect(row.userAgent).toBe('GPTBot/1.0') +}) + +test('traffic_sources cascade deletes all dependent rows when project is removed', () => { + const { db, tmpDir } = createTempDb() + onTestFinished(() => cleanup(tmpDir)) + + seedProject(db) + + const now = new Date().toISOString() + db.insert(trafficSources).values({ + id: 'src_cascade', + projectId: 'proj_1', + sourceType: 'cloud-run', + displayName: 'Cloud Run', + status: 'connected', + configJson: '{}', + createdAt: now, + updatedAt: now, + }).run() + + db.insert(crawlerEventsHourly).values({ + projectId: 'proj_1', + sourceId: 'src_cascade', + tsHour: '2026-05-07T17:00:00.000Z', + botId: 'gptbot', + operator: 'OpenAI', + verificationStatus: 'claimed_unverified', + pathNormalized: '/x', + status: 200, + hits: 1, + sampledUserAgent: 'GPTBot/1.0', + createdAt: now, + updatedAt: now, + }).run() + + db.delete(projects).where(eq(projects.id, 'proj_1')).run() + + expect(db.select().from(trafficSources).all().length).toBe(0) + expect(db.select().from(crawlerEventsHourly).all().length).toBe(0) +}) diff --git a/packages/integration-cloud-run/src/auth.ts b/packages/integration-cloud-run/src/auth.ts new file mode 100644 index 00000000..722c2073 --- /dev/null +++ b/packages/integration-cloud-run/src/auth.ts @@ -0,0 +1,108 @@ +import crypto from 'node:crypto' + +const GOOGLE_TOKEN_URL = 'https://oauth2.googleapis.com/token' +const GOOGLE_OAUTH_REFRESH_URL = 'https://oauth2.googleapis.com/token' +export const CLOUD_LOGGING_READ_SCOPE = 'https://www.googleapis.com/auth/logging.read' +const TOKEN_REQUEST_TIMEOUT_MS = 30_000 + +export class CloudRunAuthError extends Error { + constructor(message: string, public readonly httpStatus?: number, public readonly body?: string) { + super(message) + this.name = 'CloudRunAuthError' + } +} + +function createServiceAccountJwt(clientEmail: string, privateKey: string, scope: string): string { + if (!clientEmail) throw new CloudRunAuthError('clientEmail is required') + if (!privateKey) throw new CloudRunAuthError('privateKey is required') + if (!scope) throw new CloudRunAuthError('scope is required') + + const now = Math.floor(Date.now() / 1000) + const header = { alg: 'RS256', typ: 'JWT' } + const payload = { + iss: clientEmail, + scope, + aud: GOOGLE_TOKEN_URL, + iat: now, + exp: now + 3600, + } + const encode = (obj: object) => Buffer.from(JSON.stringify(obj)).toString('base64url') + const headerB64 = encode(header) + const payloadB64 = encode(payload) + const signingInput = `${headerB64}.${payloadB64}` + const sign = crypto.createSign('RSA-SHA256') + sign.update(signingInput) + const signature = sign.sign(privateKey, 'base64url') + return `${signingInput}.${signature}` +} + +/** + * Exchange a service-account key for a Cloud Logging access token. The token + * scope is `logging.read`, which is the minimum needed for `entries.list`. + */ +export async function getCloudLoggingAccessToken( + clientEmail: string, + privateKey: string, +): Promise { + const jwt = createServiceAccountJwt(clientEmail, privateKey, CLOUD_LOGGING_READ_SCOPE) + const res = await fetch(GOOGLE_TOKEN_URL, { + method: 'POST', + headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, + body: new URLSearchParams({ + grant_type: 'urn:ietf:params:oauth:grant-type:jwt-bearer', + assertion: jwt, + }), + signal: AbortSignal.timeout(TOKEN_REQUEST_TIMEOUT_MS), + }) + + if (!res.ok) { + const body = await res.text().catch(() => '') + throw new CloudRunAuthError( + `Service-account token exchange failed (HTTP ${res.status})`, + res.status, + body.slice(0, 500), + ) + } + const data = (await res.json()) as { access_token?: string } + if (!data.access_token) { + throw new CloudRunAuthError('Service-account token response missing access_token', res.status) + } + return data.access_token +} + +/** + * Refresh a long-lived OAuth access token using the user's refresh token. The + * caller supplies the OAuth client_id/client_secret (typically the same Google + * OAuth app used by GSC). + */ +export async function refreshCloudLoggingAccessToken( + clientId: string, + clientSecret: string, + refreshToken: string, +): Promise<{ accessToken: string; expiresIn: number }> { + const res = await fetch(GOOGLE_OAUTH_REFRESH_URL, { + method: 'POST', + headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, + body: new URLSearchParams({ + grant_type: 'refresh_token', + client_id: clientId, + client_secret: clientSecret, + refresh_token: refreshToken, + }), + signal: AbortSignal.timeout(TOKEN_REQUEST_TIMEOUT_MS), + }) + + if (!res.ok) { + const body = await res.text().catch(() => '') + throw new CloudRunAuthError( + `OAuth refresh failed (HTTP ${res.status})`, + res.status, + body.slice(0, 500), + ) + } + const data = (await res.json()) as { access_token?: string; expires_in?: number } + if (!data.access_token) { + throw new CloudRunAuthError('OAuth refresh response missing access_token', res.status) + } + return { accessToken: data.access_token, expiresIn: data.expires_in ?? 3600 } +} diff --git a/packages/integration-cloud-run/src/index.ts b/packages/integration-cloud-run/src/index.ts index 9d17d252..26a96885 100644 --- a/packages/integration-cloud-run/src/index.ts +++ b/packages/integration-cloud-run/src/index.ts @@ -1,3 +1,4 @@ +export * from './auth.js' export * from './client.js' export * from './filter.js' export * from './normalize.js' diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index eacffac0..8b50d9b3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -184,6 +184,9 @@ importers: '@ainyc/canonry-integration-bing': specifier: workspace:* version: link:../integration-bing + '@ainyc/canonry-integration-cloud-run': + specifier: workspace:* + version: link:../integration-cloud-run '@ainyc/canonry-integration-commoncrawl': specifier: workspace:* version: link:../integration-commoncrawl @@ -193,6 +196,9 @@ importers: '@ainyc/canonry-integration-google-analytics': specifier: workspace:* version: link:../integration-google-analytics + '@ainyc/canonry-integration-traffic': + specifier: workspace:* + version: link:../integration-traffic '@ainyc/canonry-integration-wordpress': specifier: workspace:* version: link:../integration-wordpress @@ -278,12 +284,18 @@ importers: '@ainyc/canonry-integration-bing': specifier: workspace:* version: link:../integration-bing + '@ainyc/canonry-integration-cloud-run': + specifier: workspace:* + version: link:../integration-cloud-run '@ainyc/canonry-integration-commoncrawl': specifier: workspace:* version: link:../integration-commoncrawl '@ainyc/canonry-integration-google': specifier: workspace:* version: link:../integration-google + '@ainyc/canonry-integration-traffic': + specifier: workspace:* + version: link:../integration-traffic '@ainyc/canonry-integration-wordpress': specifier: workspace:* version: link:../integration-wordpress diff --git a/scripts/smoke-traffic-sync.ts b/scripts/smoke-traffic-sync.ts new file mode 100644 index 00000000..64690ef4 --- /dev/null +++ b/scripts/smoke-traffic-sync.ts @@ -0,0 +1,181 @@ +#!/usr/bin/env tsx +// End-to-end smoke test for the Phase 2 traffic-sync auth + pull + classifier +// pipeline. The DB upsert path is covered by `packages/api-routes/test/traffic.test.ts` +// against an in-memory SQLite — what those tests can't cover is real Cloud +// Logging access with a real SA key (or gcloud token), which is what this +// script exercises. +// +// Compare the totals printed here to: +// pnpm tsx scripts/test-cloud-run-traffic-pull.ts --gcp-project --use-gcloud --since +// They should match for the same window. +// +// Usage: +// pnpm tsx scripts/smoke-traffic-sync.ts \ +// --gcp-project openclaw-nyc \ +// --service openclaw-nyc \ +// --location us-east1 \ +// --service-account-key /Users/arberx/Downloads/openclaw-nyc-9ff6d4cfa430.json \ +// --since-minutes 1440 +// +// pnpm tsx scripts/smoke-traffic-sync.ts \ +// --gcp-project openclaw-nyc --service openclaw-nyc --location us-east1 \ +// --use-gcloud --since-minutes 1440 + +import fs from 'node:fs' +import { execFile } from 'node:child_process' +import { promisify } from 'node:util' +import { + listCloudRunTrafficEvents, + getCloudLoggingAccessToken, +} from '../packages/integration-cloud-run/src/index.js' +import { buildTrafficProbeReport } from '../packages/integration-traffic/src/index.js' + +const execFileAsync = promisify(execFile) + +interface Args { + gcpProject: string + service?: string + location?: string + saKeyPath?: string + useGcloud: boolean + sinceMinutes: number + pageSize: number + maxPages: number +} + +function parseArgs(argv: string[]): Args { + const args: Args = { + gcpProject: '', + sinceMinutes: 60, + pageSize: 1000, + maxPages: 5, + useGcloud: false, + } + for (let i = 0; i < argv.length; i++) { + const arg = argv[i] + switch (arg) { + case '--gcp-project': + args.gcpProject = argv[++i] ?? '' + break + case '--service': + args.service = argv[++i] + break + case '--location': + args.location = argv[++i] + break + case '--service-account-key': + args.saKeyPath = argv[++i] + break + case '--use-gcloud': + args.useGcloud = true + break + case '--since-minutes': + args.sinceMinutes = parseInt(argv[++i] ?? '60', 10) || 60 + break + case '--page-size': + args.pageSize = parseInt(argv[++i] ?? '1000', 10) || 1000 + break + case '--max-pages': + args.maxPages = parseInt(argv[++i] ?? '5', 10) || 5 + break + case '-h': + case '--help': + printUsage() + process.exit(0) + break + default: + if (arg) { + console.error(`Unknown argument: ${arg}`) + printUsage() + process.exit(1) + } + } + } + if (!args.gcpProject) { + console.error('Missing required: --gcp-project') + process.exit(1) + } + if (!args.saKeyPath && !args.useGcloud) { + console.error('Provide either --service-account-key or --use-gcloud') + process.exit(1) + } + return args +} + +function printUsage() { + console.log(`Usage: pnpm tsx scripts/smoke-traffic-sync.ts \\ + --gcp-project \\ + (--service-account-key | --use-gcloud) \\ + [--service ] [--location ] \\ + [--since-minutes 60] [--page-size 1000] [--max-pages 5]`) +} + +async function resolveAccessToken(args: Args): Promise { + if (args.useGcloud) { + const { stdout } = await execFileAsync('gcloud', ['auth', 'print-access-token']) + return stdout.trim() + } + if (!args.saKeyPath) throw new Error('No credential path supplied') + const raw = fs.readFileSync(args.saKeyPath, 'utf-8') + const parsed = JSON.parse(raw) as { client_email?: string; private_key?: string } + if (!parsed.client_email || !parsed.private_key) { + throw new Error('Service-account JSON missing client_email or private_key') + } + return getCloudLoggingAccessToken(parsed.client_email, parsed.private_key) +} + +async function main() { + const args = parseArgs(process.argv.slice(2)) + + console.log(`[1/3] Resolving Cloud Logging access token (${args.useGcloud ? 'gcloud' : 'service-account'})…`) + const accessToken = await resolveAccessToken(args) + console.log(` → token resolved (length=${accessToken.length})`) + + const windowEnd = new Date() + const windowStart = new Date(windowEnd.getTime() - args.sinceMinutes * 60_000) + console.log(`[2/3] Pulling Cloud Run logs ${windowStart.toISOString()} → ${windowEnd.toISOString()}`) + const page = await listCloudRunTrafficEvents(accessToken, { + gcpProjectId: args.gcpProject, + serviceName: args.service, + location: args.location, + startTime: windowStart.toISOString(), + endTime: windowEnd.toISOString(), + pageSize: args.pageSize, + maxPages: args.maxPages, + }) + console.log(` → ${page.events.length} normalized events (raw entries=${page.rawEntryCount})`) + console.log(` → filter: ${page.filter.split('\n').join(' | ')}`) + + console.log(`[3/3] Classifying events into hourly buckets (the same path the API route runs)…`) + const report = buildTrafficProbeReport(page.events, { sampleLimit: 50 }) + console.log(` totals:`) + console.log(` normalized events: ${report.totals.normalizedEvents}`) + console.log(` crawler hits: ${report.totals.crawlerHits}`) + console.log(` ai-referral hits: ${report.totals.aiReferralHits}`) + console.log(` unknown hits: ${report.totals.unknownHits}`) + console.log(` crawler hourly buckets: ${report.crawlerEventsHourly.length}`) + for (const bucket of report.crawlerEventsHourly.slice(0, 12)) { + console.log(` ${bucket.tsHour} ${bucket.botId.padEnd(22)} hits=${String(bucket.hits).padStart(4)} ${bucket.pathNormalized}`) + } + if (report.crawlerEventsHourly.length > 12) console.log(` … (+${report.crawlerEventsHourly.length - 12} more)`) + console.log(` ai-referral hourly buckets: ${report.aiReferralEventsHourly.length}`) + for (const bucket of report.aiReferralEventsHourly.slice(0, 12)) { + console.log(` ${bucket.tsHour} ${bucket.product.padEnd(15)} hits=${String(bucket.hits).padStart(4)} ${bucket.landingPathNormalized} (${bucket.evidenceType})`) + } + console.log(` top bots:`) + for (const bot of report.topBots) console.log(` ${bot.botId.padEnd(24)} ${bot.operator.padEnd(12)} hits=${bot.hits}`) + console.log(` top crawler paths:`) + for (const p of report.topCrawlerPaths.slice(0, 10)) console.log(` ${String(p.hits).padStart(4)} ${p.pathNormalized}`) + + console.log('') + console.log('Smoke passed. Equivalence check command:') + console.log(` pnpm tsx scripts/test-cloud-run-traffic-pull.ts \\`) + console.log(` --gcp-project ${args.gcpProject}${args.service ? ` --service ${args.service}` : ''}${args.location ? ` --location ${args.location}` : ''} \\`) + console.log(` --use-gcloud --since ${Math.ceil(args.sinceMinutes / 60)}h`) +} + +main().catch((err) => { + console.error('Smoke test failed:', err instanceof Error ? err.message : err) + if (err instanceof Error && err.stack) console.error(err.stack) + process.exit(1) +})