diff --git a/libs/core-functions/src/functions/lib/clickhouse-logger.ts b/libs/core-functions/src/functions/lib/clickhouse-logger.ts index 827776441..1f7d18072 100644 --- a/libs/core-functions/src/functions/lib/clickhouse-logger.ts +++ b/libs/core-functions/src/functions/lib/clickhouse-logger.ts @@ -33,7 +33,6 @@ export function createClickhouseLogger(): EventsStore { clickhouse_settings: { async_insert: 1, wait_for_async_insert: 0, - async_insert_max_data_size: "10000000", async_insert_busy_timeout_ms: 10000, date_time_input_format: "best_effort", }, diff --git a/webapps/console/lib/server/sync.ts b/webapps/console/lib/server/sync.ts index ce1c79b14..d6b37a75d 100644 --- a/webapps/console/lib/server/sync.ts +++ b/webapps/console/lib/server/sync.ts @@ -17,6 +17,8 @@ import { FunctionLogger, SetOpts, Store, SyncFunction } from "@jitsu/protocols/f import { mixpanelFacebookAdsSync, mixpanelGoogleAdsSync } from "./syncs/mixpanel"; import IJob = google.cloud.scheduler.v1.IJob; import hash from "stable-hash"; +import { clickhouse } from "./clickhouse"; +const metricsSchema = process.env.CLICKHOUSE_METRICS_SCHEMA || process.env.CLICKHOUSE_DATABASE || "newjitsu_metrics"; const log = getServerLog("sync-scheduler"); @@ -57,9 +59,16 @@ async function dbLog({ level: string; }) { log.at(level).log(`Task ${taskId} sync ${syncId}: ${message}`); - await db.prisma().task_log.create({ - data: { - timestamp: new Date(), + await clickhouse.insert({ + table: metricsSchema + ".task_log", + format: "JSON", + clickhouse_settings: { + async_insert_busy_timeout_ms: 1000, + async_insert: 1, + wait_for_async_insert: 0, + }, + values: { + timestamp: new Date().getTime(), logger: "sync", task_id: taskId, sync_id: syncId, @@ -445,19 +454,16 @@ export async function scheduleSync({ status: "RUNNING", }, }); + const runSynchronously = !destinationType.usesBulker && destinationType.syncs; if (running) { const msInMin = 1000 * 60; if (ignoreRunning || (runSynchronously && Date.now() - running.updated_at.getTime() >= 2 * msInMin)) { - await db.prisma().task_log.create({ - data: { - timestamp: new Date(), - logger: "sync", - task_id: running.task_id, - sync_id: sync.id, - message: `Synchronous task ${running.task_id} was running due to timeout`, - level: "ERROR", - }, + await dbLog({ + taskId: running.task_id, + syncId: sync.id, + message: `Synchronous task ${running.task_id} was running due to timeout`, + level: "ERROR", }); await db.prisma().source_task.update({ where: { diff --git a/webapps/console/pages/api/[workspaceId]/sources/logs.ts b/webapps/console/pages/api/[workspaceId]/sources/logs.ts index 237db7e7c..6bee59049 100644 --- a/webapps/console/pages/api/[workspaceId]/sources/logs.ts +++ b/webapps/console/pages/api/[workspaceId]/sources/logs.ts @@ -8,8 +8,11 @@ dayjs.extend(utc); import { getServerLog } from "../../../../lib/server/log"; import zlib from "zlib"; import { pipeline } from "node:stream"; +import { clickhouse } from "../../../../lib/server/clickhouse"; const log = getServerLog("sync-logs"); +const metricsSchema = process.env.CLICKHOUSE_METRICS_SCHEMA || process.env.CLICKHOUSE_DATABASE || "newjitsu_metrics"; + //Vercel Limit: https://vercel.com/docs/functions/streaming-functions#limitations-for-streaming-edge-functions const maxStreamingResponseSize = 100_000_000; // Too big responses may cause performance issues in the browser (that is compressed size - actual payload is much bigger) @@ -29,6 +32,16 @@ export default createRoute() .handler(async ({ user, query, res }) => { const { workspaceId } = query; await verifyAccess(user, workspaceId); + const existingLink = await db + .prisma() + .configurationObjectLink.findFirst({ where: { workspaceId: workspaceId, id: query.syncId, deleted: false } }); + if (!existingLink) { + res.writeHead(404, { + "Content-Type": "application/json", + }); + res.end(JSON.stringify({ ok: false, error: `sync with id ${query.syncId} not found in the workspace` })); + return; + } let maxResponseSize = maxStreamingResponseSize; if (query.download) { res.writeHead(200, { @@ -55,31 +68,69 @@ export default createRoute() } responsePromiseResolve(); }); - db.prisma().$queryRaw; - await db.pgHelper().streamQuery( - `select tl.* + const sqlQuery = `select timestamp, level, logger, message from ${metricsSchema}.task_log where task_id = {taskId:String} AND sync_id = {syncId:String} order by timestamp desc`; + const chResult = await clickhouse.query({ + query: sqlQuery, + query_params: { + taskId: query.taskId, + syncId: query.syncId, + }, + format: "JSONCompactEachRow", + clickhouse_settings: { + wait_end_of_query: 1, + }, + }); + const stream = chResult.stream(); + stream.on("data", rs => { + for (const rw of rs) { + const r = rw.json() as any; + if (gzip.bytesWritten < maxStreamingResponseSize) { + const line = `${dayjs(r[0]).utc().format("YYYY-MM-DD HH:mm:ss.SSS")} ${r[1]} [${r[2]}] ${r[3]}\n`; + gzip.write(line); + } else { + stream.destroy(); + } + } + }); + stream.on("error", err => { + log.atError().withCause(err).log("Error streaming data"); + gzip.end(); + }); + // stream.on("close", () => { + // log.atInfo().log("STREAM CLOSED"); + // }); + stream.on("end", async () => { + log.atInfo().log("STREAM END: " + gzip.bytesWritten); + if (gzip.bytesWritten > 0) { + gzip.end(); + return; + } + log.atInfo().log("ACCESSING POSTGRES LOGS"); + await db.pgHelper().streamQuery( + `select tl.* from newjitsu.task_log tl join newjitsu."ConfigurationObjectLink" link on tl.sync_id = link.id where task_id = :task_id and link."workspaceId" = :workspace_id order by timestamp desc`, - { task_id: query.taskId, workspace_id: workspaceId }, - r => { - const line = `${dayjs(r.timestamp).utc().format("YYYY-MM-DD HH:mm:ss.SSS")} ${r.level} [${r.logger}] ${ - r.message - }\n`; - if (gzip.bytesWritten < maxResponseSize) { - gzip.write(line); + { task_id: query.taskId, workspace_id: workspaceId }, + r => { + const line = `${dayjs(r.timestamp).utc().format("YYYY-MM-DD HH:mm:ss.SSS")} ${r.level} [${r.logger}] ${ + r.message + }\n`; + if (gzip.bytesWritten < maxResponseSize) { + gzip.write(line); + } + } + ); + if (gzip.bytesWritten === 0) { + const task = await db.prisma().source_task.findFirst({ where: { task_id: query.taskId } }); + if (!task || task.status === "RUNNING") { + gzip.write("The task is starting..."); + } else { + gzip.write("No logs found for this task"); } } - ); - if (gzip.bytesWritten === 0) { - const task = await db.prisma().source_task.findFirst({ where: { task_id: query.taskId } }); - if (!task || task.status === "RUNNING") { - gzip.write("The task is starting..."); - } else { - gzip.write("No logs found for this task"); - } - } - gzip.end(); + gzip.end(); + }); await responsePromise; } catch (e: any) { const errorId = randomId(); diff --git a/webapps/console/pages/api/admin/events-log-init.ts b/webapps/console/pages/api/admin/events-log-init.ts index 29f9f9148..4377d2244 100644 --- a/webapps/console/pages/api/admin/events-log-init.ts +++ b/webapps/console/pages/api/admin/events-log-init.ts @@ -43,7 +43,8 @@ export default createRoute() log.atError().withCause(e).log(`Failed to create ${metricsSchema} database.`); throw new Error(`Failed to create ${metricsSchema} database.`); } - const createTableQuery: string = `create table IF NOT EXISTS ${metricsSchema}.events_log ${onCluster} + const errors: Error[] = []; + const createEventsLogTableQuery: string = `create table IF NOT EXISTS ${metricsSchema}.events_log ${onCluster} ( timestamp DateTime64(3), actorId LowCardinality(String), @@ -61,12 +62,42 @@ export default createRoute() try { await clickhouse.command({ - query: createTableQuery, + query: createEventsLogTableQuery, }); log.atInfo().log(`Table ${metricsSchema}.events_log created or already exists`); } catch (e: any) { log.atError().withCause(e).log(`Failed to create ${metricsSchema}.events_log table.`); - throw new Error(`Failed to create ${metricsSchema}.events_log table.`); + errors.push(new Error(`Failed to create ${metricsSchema}.events_log table.`)); + } + const createTaskLogTableQuery: string = `create table IF NOT EXISTS ${metricsSchema}.task_log ${onCluster} + ( + task_id String, + sync_id LowCardinality(String), + timestamp DateTime64(3), + level LowCardinality(String), + logger LowCardinality(String), + message String + ) + engine = ${ + metricsCluster + ? "ReplicatedMergeTree('/clickhouse/tables/{shard}/" + metricsSchema + "/task_log', '{replica}')" + : "MergeTree()" + } + PARTITION BY toYYYYMM(timestamp) + ORDER BY (task_id, sync_id, timestamp) + TTL toDateTime(timestamp) + INTERVAL 3 MONTH DELETE`; + + try { + await clickhouse.command({ + query: createTaskLogTableQuery, + }); + log.atInfo().log(`Table ${metricsSchema}.task_log created or already exists`); + } catch (e: any) { + log.atError().withCause(e).log(`Failed to create ${metricsSchema}.task_log table.`); + errors.push(new Error(`Failed to create ${metricsSchema}.task_log table.`)); + } + if (errors.length > 0) { + throw new Error("Failed to initialize tables: " + errors.map(e => e.message).join(", ")); } }) .toNextApiHandler();