Skip to content

Commit

Permalink
console: use CH for writing and reading sync task logs.
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Dec 18, 2024
1 parent 34420de commit ecf96a9
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 36 deletions.
1 change: 0 additions & 1 deletion libs/core-functions/src/functions/lib/clickhouse-logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ export function createClickhouseLogger(): EventsStore {
clickhouse_settings: {
async_insert: 1,
wait_for_async_insert: 0,
async_insert_max_data_size: "10000000",
async_insert_busy_timeout_ms: 10000,
date_time_input_format: "best_effort",
},
Expand Down
30 changes: 18 additions & 12 deletions webapps/console/lib/server/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { FunctionLogger, SetOpts, Store, SyncFunction } from "@jitsu/protocols/f
import { mixpanelFacebookAdsSync, mixpanelGoogleAdsSync } from "./syncs/mixpanel";
import IJob = google.cloud.scheduler.v1.IJob;
import hash from "stable-hash";
import { clickhouse } from "./clickhouse";
const metricsSchema = process.env.CLICKHOUSE_METRICS_SCHEMA || process.env.CLICKHOUSE_DATABASE || "newjitsu_metrics";

const log = getServerLog("sync-scheduler");

Expand Down Expand Up @@ -57,9 +59,16 @@ async function dbLog({
level: string;
}) {
log.at(level).log(`Task ${taskId} sync ${syncId}: ${message}`);
await db.prisma().task_log.create({
data: {
timestamp: new Date(),
await clickhouse.insert({
table: metricsSchema + ".task_log",
format: "JSON",
clickhouse_settings: {
async_insert_busy_timeout_ms: 1000,
async_insert: 1,
wait_for_async_insert: 0,
},
values: {
timestamp: new Date().getTime(),
logger: "sync",
task_id: taskId,
sync_id: syncId,
Expand Down Expand Up @@ -445,19 +454,16 @@ export async function scheduleSync({
status: "RUNNING",
},
});

const runSynchronously = !destinationType.usesBulker && destinationType.syncs;
if (running) {
const msInMin = 1000 * 60;
if (ignoreRunning || (runSynchronously && Date.now() - running.updated_at.getTime() >= 2 * msInMin)) {
await db.prisma().task_log.create({
data: {
timestamp: new Date(),
logger: "sync",
task_id: running.task_id,
sync_id: sync.id,
message: `Synchronous task ${running.task_id} was running due to timeout`,
level: "ERROR",
},
await dbLog({
taskId: running.task_id,
syncId: sync.id,
message: `Synchronous task ${running.task_id} was running due to timeout`,
level: "ERROR",
});
await db.prisma().source_task.update({
where: {
Expand Down
91 changes: 71 additions & 20 deletions webapps/console/pages/api/[workspaceId]/sources/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ dayjs.extend(utc);
import { getServerLog } from "../../../../lib/server/log";
import zlib from "zlib";
import { pipeline } from "node:stream";
import { clickhouse } from "../../../../lib/server/clickhouse";

const log = getServerLog("sync-logs");
const metricsSchema = process.env.CLICKHOUSE_METRICS_SCHEMA || process.env.CLICKHOUSE_DATABASE || "newjitsu_metrics";

//Vercel Limit: https://vercel.com/docs/functions/streaming-functions#limitations-for-streaming-edge-functions
const maxStreamingResponseSize = 100_000_000;
// Too big responses may cause performance issues in the browser (that is compressed size - actual payload is much bigger)
Expand All @@ -29,6 +32,16 @@ export default createRoute()
.handler(async ({ user, query, res }) => {
const { workspaceId } = query;
await verifyAccess(user, workspaceId);
const existingLink = await db
.prisma()
.configurationObjectLink.findFirst({ where: { workspaceId: workspaceId, id: query.syncId, deleted: false } });
if (!existingLink) {
res.writeHead(404, {
"Content-Type": "application/json",
});
res.end(JSON.stringify({ ok: false, error: `sync with id ${query.syncId} not found in the workspace` }));
return;
}
let maxResponseSize = maxStreamingResponseSize;
if (query.download) {
res.writeHead(200, {
Expand All @@ -55,31 +68,69 @@ export default createRoute()
}
responsePromiseResolve();
});
db.prisma().$queryRaw;
await db.pgHelper().streamQuery(
`select tl.*
const sqlQuery = `select timestamp, level, logger, message from ${metricsSchema}.task_log where task_id = {taskId:String} AND sync_id = {syncId:String} order by timestamp desc`;
const chResult = await clickhouse.query({
query: sqlQuery,
query_params: {
taskId: query.taskId,
syncId: query.syncId,
},
format: "JSONCompactEachRow",
clickhouse_settings: {
wait_end_of_query: 1,
},
});
const stream = chResult.stream();
stream.on("data", rs => {
for (const rw of rs) {
const r = rw.json() as any;
if (gzip.bytesWritten < maxStreamingResponseSize) {
const line = `${dayjs(r[0]).utc().format("YYYY-MM-DD HH:mm:ss.SSS")} ${r[1]} [${r[2]}] ${r[3]}\n`;
gzip.write(line);
} else {
stream.destroy();
}
}
});
stream.on("error", err => {
log.atError().withCause(err).log("Error streaming data");
gzip.end();
});
// stream.on("close", () => {
// log.atInfo().log("STREAM CLOSED");
// });
stream.on("end", async () => {
log.atInfo().log("STREAM END: " + gzip.bytesWritten);
if (gzip.bytesWritten > 0) {
gzip.end();
return;
}
log.atInfo().log("ACCESSING POSTGRES LOGS");
await db.pgHelper().streamQuery(
`select tl.*
from newjitsu.task_log tl join newjitsu."ConfigurationObjectLink" link on tl.sync_id = link.id
where task_id = :task_id and link."workspaceId" = :workspace_id
order by timestamp desc`,
{ task_id: query.taskId, workspace_id: workspaceId },
r => {
const line = `${dayjs(r.timestamp).utc().format("YYYY-MM-DD HH:mm:ss.SSS")} ${r.level} [${r.logger}] ${
r.message
}\n`;
if (gzip.bytesWritten < maxResponseSize) {
gzip.write(line);
{ task_id: query.taskId, workspace_id: workspaceId },
r => {
const line = `${dayjs(r.timestamp).utc().format("YYYY-MM-DD HH:mm:ss.SSS")} ${r.level} [${r.logger}] ${
r.message
}\n`;
if (gzip.bytesWritten < maxResponseSize) {
gzip.write(line);
}
}
);
if (gzip.bytesWritten === 0) {
const task = await db.prisma().source_task.findFirst({ where: { task_id: query.taskId } });
if (!task || task.status === "RUNNING") {
gzip.write("The task is starting...");
} else {
gzip.write("No logs found for this task");
}
}
);
if (gzip.bytesWritten === 0) {
const task = await db.prisma().source_task.findFirst({ where: { task_id: query.taskId } });
if (!task || task.status === "RUNNING") {
gzip.write("The task is starting...");
} else {
gzip.write("No logs found for this task");
}
}
gzip.end();
gzip.end();
});
await responsePromise;
} catch (e: any) {
const errorId = randomId();
Expand Down
37 changes: 34 additions & 3 deletions webapps/console/pages/api/admin/events-log-init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ export default createRoute()
log.atError().withCause(e).log(`Failed to create ${metricsSchema} database.`);
throw new Error(`Failed to create ${metricsSchema} database.`);
}
const createTableQuery: string = `create table IF NOT EXISTS ${metricsSchema}.events_log ${onCluster}
const errors: Error[] = [];
const createEventsLogTableQuery: string = `create table IF NOT EXISTS ${metricsSchema}.events_log ${onCluster}
(
timestamp DateTime64(3),
actorId LowCardinality(String),
Expand All @@ -61,12 +62,42 @@ export default createRoute()

try {
await clickhouse.command({
query: createTableQuery,
query: createEventsLogTableQuery,
});
log.atInfo().log(`Table ${metricsSchema}.events_log created or already exists`);
} catch (e: any) {
log.atError().withCause(e).log(`Failed to create ${metricsSchema}.events_log table.`);
throw new Error(`Failed to create ${metricsSchema}.events_log table.`);
errors.push(new Error(`Failed to create ${metricsSchema}.events_log table.`));
}
const createTaskLogTableQuery: string = `create table IF NOT EXISTS ${metricsSchema}.task_log ${onCluster}
(
task_id String,
sync_id LowCardinality(String),
timestamp DateTime64(3),
level LowCardinality(String),
logger LowCardinality(String),
message String
)
engine = ${
metricsCluster
? "ReplicatedMergeTree('/clickhouse/tables/{shard}/" + metricsSchema + "/task_log', '{replica}')"
: "MergeTree()"
}
PARTITION BY toYYYYMM(timestamp)
ORDER BY (task_id, sync_id, timestamp)
TTL toDateTime(timestamp) + INTERVAL 3 MONTH DELETE`;

try {
await clickhouse.command({
query: createTaskLogTableQuery,
});
log.atInfo().log(`Table ${metricsSchema}.task_log created or already exists`);
} catch (e: any) {
log.atError().withCause(e).log(`Failed to create ${metricsSchema}.task_log table.`);
errors.push(new Error(`Failed to create ${metricsSchema}.task_log table.`));
}
if (errors.length > 0) {
throw new Error("Failed to initialize tables: " + errors.map(e => e.message).join(", "));
}
})
.toNextApiHandler();

0 comments on commit ecf96a9

Please sign in to comment.