From 755d147387cad5246820382bcf1900a502ce2d16 Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Thu, 16 Jan 2025 12:57:11 +0100 Subject: [PATCH 01/11] OPIK-795: Create rule logs table --- .../000010_create_evaluation_rule_log_table.sql | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_evaluation_rule_log_table.sql diff --git a/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_evaluation_rule_log_table.sql b/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_evaluation_rule_log_table.sql new file mode 100644 index 0000000000..b52039280f --- /dev/null +++ b/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_evaluation_rule_log_table.sql @@ -0,0 +1,17 @@ +--liquibase formatted sql +--changeset thiagohora:000010_create_automation_rule_log_table + +CREATE TABLE IF NOT EXISTS ${ANALYTICS_DB_DATABASE_NAME}.automation_rule_evaluator_logs ( + timestamp DateTime64(9, 'UTC') DEFAULT now64(9), + workspace_id String, + rule_id FixedString(36), + level Enum8('INFO'=0, 'ERROR'=1), + message String, + extra Map(String, String), + INDEX idx_workspace_rule_id (workspace_id, rule_id) TYPE bloom_filter(0.01) +) +ENGINE = MergeTree() +ORDER BY (timestamp, workspace_id, rule_id, level) +TTL toDateTime(timestamp + INTERVAL 6 MONTH); + +--rollback DROP TABLE IF EXISTS ${ANALYTICS_DB_DATABASE_NAME}.automation_rule_evaluator_logs; From 9fa8f0b17539d21320f78c4a55fe2c6c86eb839c Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Thu, 16 Jan 2025 22:38:53 +0100 Subject: [PATCH 02/11] OPIK-796: Implement clickhouse user facing logs --- apps/opik-backend/config.yml | 10 ++ .../v1/events/OnlineScoringEventListener.java | 111 +++++++++--- .../ClickHouseLogAppenderConfig.java | 18 ++ .../infrastructure/OpikConfiguration.java | 3 + .../db/DatabaseAnalyticsModule.java | 14 +- .../log/ClickHouseAppender.java | 159 ++++++++++++++++++ .../log/UserFacingRuleLoggingFactory.java | 42 +++++ ...00010_create_evaluation_rule_log_table.sql | 2 +- .../src/test/resources/config-test.yml | 10 ++ 9 files changed, 339 insertions(+), 30 deletions(-) create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ClickHouseLogAppenderConfig.java create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingRuleLoggingFactory.java diff --git a/apps/opik-backend/config.yml b/apps/opik-backend/config.yml index 3a452fb41c..dd94e35181 100644 --- a/apps/opik-backend/config.yml +++ b/apps/opik-backend/config.yml @@ -256,3 +256,13 @@ cacheManager: # Default: {} # Description: Dynamically created caches with their respective time to live in seconds automationRules: ${CACHE_MANAGER_AUTOMATION_RULES_DURATION:-PT1S} + +# Configuration for clickhouse log appender +clickHouseLogAppender: + # Default: 1000 + # Description: Number of log messages to be batched before sending to ClickHouse + batchSize: ${CLICKHOUSE_LOG_APPENDER_BATCH_SIZE:-1000} + + # Default: PT0.500S or 500ms + # Description: Time interval after which the log messages are sent to ClickHouse if the batch size is not reached + flushIntervalDuration: ${CLICKHOUSE_LOG_APPENDER_FLUSH_INTERVAL_DURATION:-PT0.500S} \ No newline at end of file diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java index 5a15f71ea8..27813c52c6 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java @@ -2,26 +2,30 @@ import com.comet.opik.api.AutomationRuleEvaluatorLlmAsJudge; import com.comet.opik.api.AutomationRuleEvaluatorType; +import com.comet.opik.api.FeedbackScoreBatchItem; import com.comet.opik.api.Trace; import com.comet.opik.api.events.TracesCreated; import com.comet.opik.domain.AutomationRuleEvaluatorService; import com.comet.opik.domain.ChatCompletionService; import com.comet.opik.domain.FeedbackScoreService; import com.comet.opik.infrastructure.auth.RequestContext; +import com.comet.opik.infrastructure.log.UserFacingRuleLoggingFactory; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; +import dev.langchain4j.model.chat.response.ChatResponse; import jakarta.inject.Inject; import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.MDC; import ru.vyarus.dropwizard.guice.module.installer.feature.eager.EagerSingleton; +import java.math.BigDecimal; import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.stream.Collectors; -import static com.comet.opik.api.AutomationRuleEvaluatorLlmAsJudge.LlmAsJudgeCode; - @EagerSingleton @Slf4j public class OnlineScoringEventListener { @@ -29,6 +33,7 @@ public class OnlineScoringEventListener { private final AutomationRuleEvaluatorService ruleEvaluatorService; private final ChatCompletionService aiProxyService; private final FeedbackScoreService feedbackScoreService; + private final Logger userFacingLogger; @Inject public OnlineScoringEventListener(EventBus eventBus, @@ -39,6 +44,7 @@ public OnlineScoringEventListener(EventBus eventBus, this.aiProxyService = aiProxyService; this.feedbackScoreService = feedbackScoreService; eventBus.register(this); + userFacingLogger = UserFacingRuleLoggingFactory.getLogger(OnlineScoringEventListener.class); } /** @@ -72,11 +78,29 @@ public void onTracesCreated(TracesCreated tracesBatch) { log.info("Found {} evaluators for project '{}' on workspace '{}'", evaluators.size(), projectId, tracesBatch.workspaceId()); - // for each rule, sample traces and score them - evaluators.forEach(evaluator -> traces.stream() - .filter(e -> random.nextFloat() < evaluator.getSamplingRate()) - .forEach(trace -> score(trace, evaluator.getCode(), tracesBatch.workspaceId(), - tracesBatch.userName()))); + // Important to set the workspaceId for logging purposes + try (MDC.MDCCloseable scope = MDC.putCloseable("workspace_id", tracesBatch.workspaceId())) { + + // for each rule, sample traces and score them + evaluators.forEach(evaluator -> traces.stream() + .filter(trace -> { + boolean sampled = random.nextFloat() < evaluator.getSamplingRate(); + + if (!sampled) { + MDC.put("rule_id", evaluator.getId().toString()); + MDC.put("trace_id", trace.id().toString()); + + userFacingLogger.info( + "The traceId '{}' was skipped for rule: '{}' and per the sampling rate '{}'", + trace.id(), evaluator.getName(), evaluator.getSamplingRate()); + } + + return sampled; + }) + .forEach(trace -> score(trace, evaluator, tracesBatch.workspaceId(), + tracesBatch.userName()))); + } + }); } @@ -85,31 +109,66 @@ public void onTracesCreated(TracesCreated tracesBatch) { * If the evaluator has multiple score definitions, it calls the LLM once per score definition. * * @param trace the trace to score - * @param evaluatorCode the automation rule to score the trace + * @param evaluator the automation rule to score the trace * @param workspaceId the workspace the trace belongs */ - private void score(Trace trace, LlmAsJudgeCode evaluatorCode, String workspaceId, + private void score(Trace trace, AutomationRuleEvaluatorLlmAsJudge evaluator, String workspaceId, String userName) { - var scoreRequest = OnlineScoringEngine.prepareLlmRequest(evaluatorCode, trace); - - var chatResponse = aiProxyService.scoreTrace(scoreRequest, evaluatorCode.model(), workspaceId); - - var scores = OnlineScoringEngine.toFeedbackScores(chatResponse).stream() - .map(item -> item.toBuilder() - .id(trace.id()) - .projectId(trace.projectId()) - .projectName(trace.projectName()) - .build()) - .toList(); + //This is crucial for logging purposes to identify the rule and trace + try (MDC.MDCCloseable ruleScope = MDC.putCloseable("rule_id", evaluator.getId().toString()); + MDC.MDCCloseable traceScope = MDC.putCloseable("trace_id", trace.id().toString())) { - log.info("Received {} scores for traceId '{}' in workspace '{}'. Storing them.", scores.size(), trace.id(), - workspaceId); + processScores(trace, evaluator, workspaceId, userName); + } + } - feedbackScoreService.scoreBatchOfTraces(scores) - .contextWrite(ctx -> ctx.put(RequestContext.USER_NAME, userName) - .put(RequestContext.WORKSPACE_ID, workspaceId)) - .block(); + private void processScores(Trace trace, AutomationRuleEvaluatorLlmAsJudge evaluator, String workspaceId, + String userName) { + userFacingLogger.info("Evaluating traceId '{}' sampled by rule '{}'", trace.id(), evaluator.getName()); + + var scoreRequest = OnlineScoringEngine.prepareLlmRequest(evaluator.getCode(), trace); + + userFacingLogger.info("Sending traceId '{}' to LLM using the following input:\n\n{}", trace.id(), scoreRequest); + + final ChatResponse chatResponse; + + try { + chatResponse = aiProxyService.scoreTrace(scoreRequest, evaluator.getCode().model(), workspaceId); + userFacingLogger.info("Received response for traceId '{}':\n\n{}", trace.id(), chatResponse); + } catch (Exception e) { + userFacingLogger.error("Unexpected while scoring traceId '{}' with rule '{}'", trace.id(), + evaluator.getName()); + throw e; + } + + try { + var scores = OnlineScoringEngine.toFeedbackScores(chatResponse).stream() + .map(item -> item.toBuilder() + .id(trace.id()) + .projectId(trace.projectId()) + .projectName(trace.projectName()) + .build()) + .toList(); + + log.info("Received {} scores for traceId '{}' in workspace '{}'. Storing them.", scores.size(), trace.id(), + workspaceId); + + feedbackScoreService.scoreBatchOfTraces(scores) + .contextWrite(ctx -> ctx.put(RequestContext.USER_NAME, userName) + .put(RequestContext.WORKSPACE_ID, workspaceId)) + .block(); + + Map> loggedScores = scores + .stream() + .collect(Collectors.groupingBy(FeedbackScoreBatchItem::name, + Collectors.mapping(FeedbackScoreBatchItem::value, Collectors.toList()))); + + userFacingLogger.info("Scores for traceId '{}' stored successfully:\n\n{}", trace.id(), loggedScores); + } catch (Exception e) { + userFacingLogger.error("Unexpected while storing scores for traceId '{}'", trace.id()); + throw e; + } } } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ClickHouseLogAppenderConfig.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ClickHouseLogAppenderConfig.java new file mode 100644 index 0000000000..c4d234dfb4 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ClickHouseLogAppenderConfig.java @@ -0,0 +1,18 @@ +package com.comet.opik.infrastructure; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +import java.time.Duration; + +@Data +public class ClickHouseLogAppenderConfig { + + @Valid @JsonProperty + private int batchSize = 500; + + @Valid @JsonProperty + @NotNull private Duration flushIntervalDuration; +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/OpikConfiguration.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/OpikConfiguration.java index d7ec4d7427..e98ee0c2c7 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/OpikConfiguration.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/OpikConfiguration.java @@ -53,4 +53,7 @@ public class OpikConfiguration extends JobConfiguration { @Valid @NotNull @JsonProperty private CacheConfiguration cacheManager = new CacheConfiguration(); + + @Valid @NotNull @JsonProperty + private ClickHouseLogAppenderConfig clickHouseLogAppender = new ClickHouseLogAppenderConfig(); } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java index 5afa853170..63001278c7 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java @@ -1,7 +1,9 @@ package com.comet.opik.infrastructure.db; +import com.comet.opik.infrastructure.ClickHouseLogAppenderConfig; import com.comet.opik.infrastructure.DatabaseAnalyticsFactory; import com.comet.opik.infrastructure.OpikConfiguration; +import com.comet.opik.infrastructure.log.UserFacingRuleLoggingFactory; import com.google.inject.Provides; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.r2dbc.v1_0.R2dbcTelemetry; @@ -19,14 +21,20 @@ public class DatabaseAnalyticsModule extends DropwizardAwareModule { + + private static final String INSERT_STATEMENT = """ + INSERT INTO automation_rule_evaluator_logs (timestamp, level, workspace_id, rule_id, message, extra) + VALUES , 9), + :level, + :workspace_id, + :rule_id, + :message, + mapFromArrays(:extra_keys, :extra_values) + ) + , + }> + ; + """; + + public static synchronized void init(@NonNull ConnectionFactory connectionFactory, int batchSize, + @NonNull Duration flushIntervalDuration) { + if (INSTANCE == null) { + INSTANCE = new ClickHouseAppender(connectionFactory, batchSize, flushIntervalDuration); + INSTANCE.start(); + } + } + + public static ClickHouseAppender getInstance() { + if (INSTANCE == null) { + throw new IllegalStateException("ClickHouseAppender is not initialized"); + } + return INSTANCE; + } + + private static ClickHouseAppender INSTANCE; + + private final ConnectionFactory connectionFactory; + private final int batchSize; + private final Duration flushIntervalDuration; + private volatile boolean running = true; + + private BlockingQueue logQueue; + private ScheduledExecutorService scheduler; + + @Override + public void start() { + if (connectionFactory == null) { + log.error("ClickHouse connection factory is not set"); + return; + } + + logQueue = new LinkedBlockingQueue<>(batchSize * 100); + scheduler = Executors.newSingleThreadScheduledExecutor(); + + // Background flush thread + scheduler.scheduleAtFixedRate(this::flushLogs, flushIntervalDuration.toMillis(), + flushIntervalDuration.toMillis(), TimeUnit.MILLISECONDS); + + super.start(); + } + + private void flushLogs() { + if (logQueue.isEmpty()) return; + + List batch = new ArrayList<>(logQueue.size()); + logQueue.drainTo(batch, logQueue.size()); + + if (batch.isEmpty()) return; + + Mono.from(connectionFactory.create()) + .flatMapMany(conn -> { + + var template = new ST(INSERT_STATEMENT); + List queryItems = getQueryItemPlaceHolder(batch.size()); + + template.add("items", queryItems); + + Statement statement = conn.createStatement(template.render()); + + for (int i = 0; i < batch.size(); i++) { + ILoggingEvent event = batch.get(i); + String logLevel = event.getLevel().toString(); + String workspaceId = Optional.ofNullable(event.getMDCPropertyMap().get("workspace_id")) + .orElseThrow(() -> failWithMessage("workspace_id is not set")); + String traceId = Optional.ofNullable(event.getMDCPropertyMap().get("trace_id")) + .orElseThrow(() -> failWithMessage("trace_id is not set")); + String ruleId = Optional.ofNullable(event.getMDCPropertyMap().get("rule_id")) + .orElseThrow(() -> failWithMessage("rule_id is not set")); + statement + .bind("timestamp" + i, event.getInstant().toString()) + .bind("level" + i, logLevel) + .bind("workspace_id" + i, workspaceId) + .bind("rule_id" + i, ruleId) + .bind("message" + i, event.getFormattedMessage()) + .bind("extra_keys" + i, new String[]{"trace_id"}) + .bind("extra_values" + i, new String[]{traceId}); + } + + return statement.execute(); + }) + .subscribe( + noop -> { + }, + e -> log.error("Failed to insert logs", e)); + } + + private IllegalStateException failWithMessage(String message) { + addError(message); + return new IllegalStateException(message); + } + + @Override + protected void append(ILoggingEvent event) { + if (!running) return; + + boolean added = logQueue.offer(event); + if (!added) { + log.warn("Log queue is full, dropping log: " + event.getFormattedMessage()); + } + + if (logQueue.size() >= batchSize) { + scheduler.execute(this::flushLogs); + } + } + + @Override + public void stop() { + running = false; + super.stop(); + flushLogs(); + scheduler.shutdown(); + } +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingRuleLoggingFactory.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingRuleLoggingFactory.java new file mode 100644 index 0000000000..192572c29a --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingRuleLoggingFactory.java @@ -0,0 +1,42 @@ +package com.comet.opik.infrastructure.log; + +import ch.qos.logback.classic.AsyncAppender; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.LoggerContext; +import io.r2dbc.spi.ConnectionFactory; +import lombok.NonNull; +import lombok.experimental.UtilityClass; +import org.slf4j.LoggerFactory; + +import java.time.Duration; + +@UtilityClass +public class UserFacingRuleLoggingFactory { + + static final LoggerContext CONTEXT = (LoggerContext) LoggerFactory.getILoggerFactory(); + static final AsyncAppender ASYNC_APPENDER = new AsyncAppender(); + + public static void init(@NonNull ConnectionFactory connectionFactory, int batchSize, + @NonNull Duration flushIntervalSeconds) { + ClickHouseAppender.init(connectionFactory, batchSize, flushIntervalSeconds); + + ClickHouseAppender clickHouseAppender = ClickHouseAppender.getInstance(); + clickHouseAppender.setContext(CONTEXT); + + ASYNC_APPENDER.setContext(CONTEXT); + ASYNC_APPENDER.setNeverBlock(true); + ASYNC_APPENDER.setIncludeCallerData(true); + ASYNC_APPENDER.addAppender(clickHouseAppender); + ASYNC_APPENDER.start(); + + Runtime.getRuntime().addShutdownHook(new Thread(CONTEXT::stop)); + } + + public static org.slf4j.Logger getLogger(Class clazz) { + Logger logger = CONTEXT.getLogger("%s.UserFacingLog".formatted(clazz.getName())); + logger.addAppender(ASYNC_APPENDER); + logger.setAdditive(false); + return logger; + } + +} diff --git a/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_evaluation_rule_log_table.sql b/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_evaluation_rule_log_table.sql index b52039280f..0717cfc2d3 100644 --- a/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_evaluation_rule_log_table.sql +++ b/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_evaluation_rule_log_table.sql @@ -5,7 +5,7 @@ CREATE TABLE IF NOT EXISTS ${ANALYTICS_DB_DATABASE_NAME}.automation_rule_evaluat timestamp DateTime64(9, 'UTC') DEFAULT now64(9), workspace_id String, rule_id FixedString(36), - level Enum8('INFO'=0, 'ERROR'=1), + level Enum8('INFO'=0, 'ERROR'=1, 'WARN'=2, 'DEBUG'=3, 'TRACE'=4), message String, extra Map(String, String), INDEX idx_workspace_rule_id (workspace_id, rule_id) TYPE bloom_filter(0.01) diff --git a/apps/opik-backend/src/test/resources/config-test.yml b/apps/opik-backend/src/test/resources/config-test.yml index e13a6540ea..76a00f89b0 100644 --- a/apps/opik-backend/src/test/resources/config-test.yml +++ b/apps/opik-backend/src/test/resources/config-test.yml @@ -218,3 +218,13 @@ cacheManager: # Default: {} # Description: Dynamically created caches with their respective time to live in seconds testCache: PT1S + +# Configuration for clickhouse log appender +clickHouseLogAppender: + # Default: 1000 + # Description: Number of log messages to be batched before sending to ClickHouse + batchSize: 1000 + + # Default: PT0.500S or 500ms + # Description: Time interval after which the log messages are sent to ClickHouse if the batch size is not reached + flushIntervalDuration: PT0.500S From e28974efb7e64827678e99892cd8b4b880b44e94 Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Fri, 17 Jan 2025 09:38:54 +0100 Subject: [PATCH 03/11] Address PR review --- apps/opik-backend/config.yml | 2 +- .../v1/events/OnlineScoringEventListener.java | 22 +++++++++++-------- .../log/ClickHouseAppender.java | 9 +++++--- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/apps/opik-backend/config.yml b/apps/opik-backend/config.yml index dd94e35181..d408df583b 100644 --- a/apps/opik-backend/config.yml +++ b/apps/opik-backend/config.yml @@ -265,4 +265,4 @@ clickHouseLogAppender: # Default: PT0.500S or 500ms # Description: Time interval after which the log messages are sent to ClickHouse if the batch size is not reached - flushIntervalDuration: ${CLICKHOUSE_LOG_APPENDER_FLUSH_INTERVAL_DURATION:-PT0.500S} \ No newline at end of file + flushIntervalDuration: ${CLICKHOUSE_LOG_APPENDER_FLUSH_INTERVAL_DURATION:-PT0.500S} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java index 27813c52c6..ed7cf7ae2a 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java @@ -24,7 +24,11 @@ import java.util.Map; import java.util.Random; import java.util.UUID; -import java.util.stream.Collectors; + +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; @EagerSingleton @Slf4j @@ -59,10 +63,10 @@ public void onTracesCreated(TracesCreated tracesBatch) { log.debug(tracesBatch.traces().toString()); Map> tracesByProject = tracesBatch.traces().stream() - .collect(Collectors.groupingBy(Trace::projectId)); + .collect(groupingBy(Trace::projectId)); Map countMap = tracesByProject.entrySet().stream() - .collect(Collectors.toMap(entry -> "projectId: " + entry.getKey(), + .collect(toMap(entry -> "projectId: " + entry.getKey(), entry -> entry.getValue().size())); log.debug("Received traces for workspace '{}': {}", tracesBatch.workspaceId(), countMap); @@ -116,8 +120,8 @@ private void score(Trace trace, AutomationRuleEvaluatorLlmAsJudge evaluator, Str String userName) { //This is crucial for logging purposes to identify the rule and trace - try (MDC.MDCCloseable ruleScope = MDC.putCloseable("rule_id", evaluator.getId().toString()); - MDC.MDCCloseable traceScope = MDC.putCloseable("trace_id", trace.id().toString())) { + try (var ruleScope = MDC.putCloseable("rule_id", evaluator.getId().toString()); + var traceScope = MDC.putCloseable("trace_id", trace.id().toString())) { processScores(trace, evaluator, workspaceId, userName); } @@ -137,7 +141,7 @@ private void processScores(Trace trace, AutomationRuleEvaluatorLlmAsJudge evalua chatResponse = aiProxyService.scoreTrace(scoreRequest, evaluator.getCode().model(), workspaceId); userFacingLogger.info("Received response for traceId '{}':\n\n{}", trace.id(), chatResponse); } catch (Exception e) { - userFacingLogger.error("Unexpected while scoring traceId '{}' with rule '{}'", trace.id(), + userFacingLogger.error("Unexpected error while scoring traceId '{}' with rule '{}'", trace.id(), evaluator.getName()); throw e; } @@ -161,12 +165,12 @@ private void processScores(Trace trace, AutomationRuleEvaluatorLlmAsJudge evalua Map> loggedScores = scores .stream() - .collect(Collectors.groupingBy(FeedbackScoreBatchItem::name, - Collectors.mapping(FeedbackScoreBatchItem::value, Collectors.toList()))); + .collect( + groupingBy(FeedbackScoreBatchItem::name, mapping(FeedbackScoreBatchItem::value, toList()))); userFacingLogger.info("Scores for traceId '{}' stored successfully:\n\n{}", trace.id(), loggedScores); } catch (Exception e) { - userFacingLogger.error("Unexpected while storing scores for traceId '{}'", trace.id()); + userFacingLogger.error("Unexpected error while storing scores for traceId '{}'", trace.id()); throw e; } } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java index 68e8b436ee..c34fd1e037 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java @@ -45,6 +45,7 @@ INSERT INTO automation_rule_evaluator_logs (timestamp, level, workspace_id, rule public static synchronized void init(@NonNull ConnectionFactory connectionFactory, int batchSize, @NonNull Duration flushIntervalDuration) { + if (INSTANCE == null) { INSTANCE = new ClickHouseAppender(connectionFactory, batchSize, flushIntervalDuration); INSTANCE.start(); @@ -75,7 +76,7 @@ public void start() { return; } - logQueue = new LinkedBlockingQueue<>(batchSize * 100); + logQueue = new LinkedBlockingQueue<>(); scheduler = Executors.newSingleThreadScheduledExecutor(); // Background flush thread @@ -105,6 +106,7 @@ private void flushLogs() { for (int i = 0; i < batch.size(); i++) { ILoggingEvent event = batch.get(i); + String logLevel = event.getLevel().toString(); String workspaceId = Optional.ofNullable(event.getMDCPropertyMap().get("workspace_id")) .orElseThrow(() -> failWithMessage("workspace_id is not set")); @@ -112,6 +114,7 @@ private void flushLogs() { .orElseThrow(() -> failWithMessage("trace_id is not set")); String ruleId = Optional.ofNullable(event.getMDCPropertyMap().get("rule_id")) .orElseThrow(() -> failWithMessage("rule_id is not set")); + statement .bind("timestamp" + i, event.getInstant().toString()) .bind("level" + i, logLevel) @@ -131,7 +134,7 @@ private void flushLogs() { } private IllegalStateException failWithMessage(String message) { - addError(message); + log.error(message); return new IllegalStateException(message); } @@ -141,7 +144,7 @@ protected void append(ILoggingEvent event) { boolean added = logQueue.offer(event); if (!added) { - log.warn("Log queue is full, dropping log: " + event.getFormattedMessage()); + log.warn("Log queue is full, dropping log: {}", event.getFormattedMessage()); } if (logQueue.size() >= batchSize) { From e6863eb92b6d6fdc7e3264a4d4c4398e02192bc0 Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Thu, 16 Jan 2025 12:57:11 +0100 Subject: [PATCH 04/11] OPIK-795: Create rule logs table --- .../000010_create_evaluation_rule_log_table.sql | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_evaluation_rule_log_table.sql diff --git a/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_evaluation_rule_log_table.sql b/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_evaluation_rule_log_table.sql new file mode 100644 index 0000000000..0717cfc2d3 --- /dev/null +++ b/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_evaluation_rule_log_table.sql @@ -0,0 +1,17 @@ +--liquibase formatted sql +--changeset thiagohora:000010_create_automation_rule_log_table + +CREATE TABLE IF NOT EXISTS ${ANALYTICS_DB_DATABASE_NAME}.automation_rule_evaluator_logs ( + timestamp DateTime64(9, 'UTC') DEFAULT now64(9), + workspace_id String, + rule_id FixedString(36), + level Enum8('INFO'=0, 'ERROR'=1, 'WARN'=2, 'DEBUG'=3, 'TRACE'=4), + message String, + extra Map(String, String), + INDEX idx_workspace_rule_id (workspace_id, rule_id) TYPE bloom_filter(0.01) +) +ENGINE = MergeTree() +ORDER BY (timestamp, workspace_id, rule_id, level) +TTL toDateTime(timestamp + INTERVAL 6 MONTH); + +--rollback DROP TABLE IF EXISTS ${ANALYTICS_DB_DATABASE_NAME}.automation_rule_evaluator_logs; From 6ab6a1488b31a610f0259c3586aef2fac56eca53 Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Fri, 17 Jan 2025 12:14:50 +0100 Subject: [PATCH 05/11] Fix file name and column name --- ... 000010_create_automation_rule_evaluator_logs_table.sql} | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) rename apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/{000010_create_evaluation_rule_log_table.sql => 000010_create_automation_rule_evaluator_logs_table.sql} (76%) diff --git a/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_evaluation_rule_log_table.sql b/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_automation_rule_evaluator_logs_table.sql similarity index 76% rename from apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_evaluation_rule_log_table.sql rename to apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_automation_rule_evaluator_logs_table.sql index 0717cfc2d3..0253fb537b 100644 --- a/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_evaluation_rule_log_table.sql +++ b/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_automation_rule_evaluator_logs_table.sql @@ -1,13 +1,13 @@ --liquibase formatted sql ---changeset thiagohora:000010_create_automation_rule_log_table +--changeset thiagohora:create_automation_rule_evaluator_logs_table CREATE TABLE IF NOT EXISTS ${ANALYTICS_DB_DATABASE_NAME}.automation_rule_evaluator_logs ( timestamp DateTime64(9, 'UTC') DEFAULT now64(9), workspace_id String, rule_id FixedString(36), - level Enum8('INFO'=0, 'ERROR'=1, 'WARN'=2, 'DEBUG'=3, 'TRACE'=4), + level Enum8('TRACE'=0, 'DEBUG'=1, 'INFO'=2, 'WARM'=3, 'ERROR'=4), message String, - extra Map(String, String), + markers Map(String, String), INDEX idx_workspace_rule_id (workspace_id, rule_id) TYPE bloom_filter(0.01) ) ENGINE = MergeTree() From 249d38410d74db8f89dd92095d9586207a3debb3 Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Fri, 17 Jan 2025 10:15:41 +0100 Subject: [PATCH 06/11] Fix tests --- .../ClickHouseLogAppenderConfig.java | 2 +- .../log/ClickHouseAppender.java | 17 ++++++++----- .../log/UserFacingRuleLoggingFactory.java | 25 +++++++++++-------- 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ClickHouseLogAppenderConfig.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ClickHouseLogAppenderConfig.java index c4d234dfb4..9f571f38ea 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ClickHouseLogAppenderConfig.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ClickHouseLogAppenderConfig.java @@ -11,7 +11,7 @@ public class ClickHouseLogAppenderConfig { @Valid @JsonProperty - private int batchSize = 500; + private int batchSize = 1000; @Valid @JsonProperty @NotNull private Duration flushIntervalDuration; diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java index c34fd1e037..ce4df2a993 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java @@ -43,23 +43,27 @@ INSERT INTO automation_rule_evaluator_logs (timestamp, level, workspace_id, rule ; """; + private static ClickHouseAppender instance; + public static synchronized void init(@NonNull ConnectionFactory connectionFactory, int batchSize, @NonNull Duration flushIntervalDuration) { - if (INSTANCE == null) { - INSTANCE = new ClickHouseAppender(connectionFactory, batchSize, flushIntervalDuration); - INSTANCE.start(); + if (instance == null) { + setInstance(new ClickHouseAppender(connectionFactory, batchSize, flushIntervalDuration)); + instance.start(); } } public static ClickHouseAppender getInstance() { - if (INSTANCE == null) { + if (instance == null) { throw new IllegalStateException("ClickHouseAppender is not initialized"); } - return INSTANCE; + return instance; } - private static ClickHouseAppender INSTANCE; + private static synchronized void setInstance(ClickHouseAppender instance) { + ClickHouseAppender.instance = instance; + } private final ConnectionFactory connectionFactory; private final int batchSize; @@ -157,6 +161,7 @@ public void stop() { running = false; super.stop(); flushLogs(); + setInstance(null); scheduler.shutdown(); } } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingRuleLoggingFactory.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingRuleLoggingFactory.java index 192572c29a..60798bf7fe 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingRuleLoggingFactory.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingRuleLoggingFactory.java @@ -13,28 +13,33 @@ @UtilityClass public class UserFacingRuleLoggingFactory { - static final LoggerContext CONTEXT = (LoggerContext) LoggerFactory.getILoggerFactory(); - static final AsyncAppender ASYNC_APPENDER = new AsyncAppender(); + private static final LoggerContext CONTEXT = (LoggerContext) LoggerFactory.getILoggerFactory(); + private static AsyncAppender asyncAppender; - public static void init(@NonNull ConnectionFactory connectionFactory, int batchSize, + public static synchronized void init(@NonNull ConnectionFactory connectionFactory, int batchSize, @NonNull Duration flushIntervalSeconds) { ClickHouseAppender.init(connectionFactory, batchSize, flushIntervalSeconds); ClickHouseAppender clickHouseAppender = ClickHouseAppender.getInstance(); clickHouseAppender.setContext(CONTEXT); - ASYNC_APPENDER.setContext(CONTEXT); - ASYNC_APPENDER.setNeverBlock(true); - ASYNC_APPENDER.setIncludeCallerData(true); - ASYNC_APPENDER.addAppender(clickHouseAppender); - ASYNC_APPENDER.start(); + asyncAppender = new AsyncAppender(); + asyncAppender.setContext(CONTEXT); + asyncAppender.setNeverBlock(true); + asyncAppender.setIncludeCallerData(true); + asyncAppender.addAppender(clickHouseAppender); + asyncAppender.start(); + addShutdownHook(); + } + + private static void addShutdownHook() { Runtime.getRuntime().addShutdownHook(new Thread(CONTEXT::stop)); } - public static org.slf4j.Logger getLogger(Class clazz) { + public static org.slf4j.Logger getLogger(@NonNull Class clazz) { Logger logger = CONTEXT.getLogger("%s.UserFacingLog".formatted(clazz.getName())); - logger.addAppender(ASYNC_APPENDER); + logger.addAppender(asyncAppender); logger.setAdditive(false); return logger; } From 69fc3e3c42d49215dfeb6987f181934dda54513f Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Fri, 17 Jan 2025 16:22:41 +0100 Subject: [PATCH 07/11] Change table sortable key --- .../000010_create_automation_rule_evaluator_logs_table.sql | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_automation_rule_evaluator_logs_table.sql b/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_automation_rule_evaluator_logs_table.sql index 0253fb537b..7c3fa82058 100644 --- a/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_automation_rule_evaluator_logs_table.sql +++ b/apps/opik-backend/src/main/resources/liquibase/db-app-analytics/migrations/000010_create_automation_rule_evaluator_logs_table.sql @@ -7,11 +7,10 @@ CREATE TABLE IF NOT EXISTS ${ANALYTICS_DB_DATABASE_NAME}.automation_rule_evaluat rule_id FixedString(36), level Enum8('TRACE'=0, 'DEBUG'=1, 'INFO'=2, 'WARM'=3, 'ERROR'=4), message String, - markers Map(String, String), - INDEX idx_workspace_rule_id (workspace_id, rule_id) TYPE bloom_filter(0.01) + markers Map(String, String) ) ENGINE = MergeTree() -ORDER BY (timestamp, workspace_id, rule_id, level) +ORDER BY (workspace_id, rule_id, timestamp) TTL toDateTime(timestamp + INTERVAL 6 MONTH); --rollback DROP TABLE IF EXISTS ${ANALYTICS_DB_DATABASE_NAME}.automation_rule_evaluator_logs; From 22feff1fcc45120db7de6d81bade9c773c781e4d Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Fri, 17 Jan 2025 16:37:34 +0100 Subject: [PATCH 08/11] Fix marker field ingestion --- .../log/ClickHouseAppender.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java index ce4df2a993..178f9f63e0 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java @@ -28,18 +28,18 @@ class ClickHouseAppender extends AppenderBase { private static final String INSERT_STATEMENT = """ - INSERT INTO automation_rule_evaluator_logs (timestamp, level, workspace_id, rule_id, message, extra) + INSERT INTO automation_rule_evaluator_logs (timestamp, level, workspace_id, rule_id, message, markers) VALUES , 9), - :level, - :workspace_id, - :rule_id, - :message, - mapFromArrays(:extra_keys, :extra_values) - ) - , - }> + ( + parseDateTime64BestEffort(:timestamp, 9), + :level, + :workspace_id, + :rule_id, + :message, + mapFromArrays(:marker_keys, :marker_values) + ) + , + }> ; """; @@ -125,8 +125,8 @@ private void flushLogs() { .bind("workspace_id" + i, workspaceId) .bind("rule_id" + i, ruleId) .bind("message" + i, event.getFormattedMessage()) - .bind("extra_keys" + i, new String[]{"trace_id"}) - .bind("extra_values" + i, new String[]{traceId}); + .bind("marker_keys" + i, new String[]{"trace_id"}) + .bind("marker_values" + i, new String[]{traceId}); } return statement.execute(); From 357be9795af675a3e1cd8f266677cdac71e13772 Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Mon, 20 Jan 2025 13:48:32 +0100 Subject: [PATCH 09/11] Address PR comments --- .../v1/events/OnlineScoringEventListener.java | 15 +++++----- .../db/DatabaseAnalyticsModule.java | 4 +-- .../log/ClickHouseAppender.java | 28 ++++++++++++++----- ...ory.java => UserFacingLoggingFactory.java} | 2 +- 4 files changed, 32 insertions(+), 17 deletions(-) rename apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/{UserFacingRuleLoggingFactory.java => UserFacingLoggingFactory.java} (97%) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java index ed7cf7ae2a..7e898e6c3a 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java @@ -9,11 +9,12 @@ import com.comet.opik.domain.ChatCompletionService; import com.comet.opik.domain.FeedbackScoreService; import com.comet.opik.infrastructure.auth.RequestContext; -import com.comet.opik.infrastructure.log.UserFacingRuleLoggingFactory; +import com.comet.opik.infrastructure.log.UserFacingLoggingFactory; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; import dev.langchain4j.model.chat.response.ChatResponse; import jakarta.inject.Inject; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.MDC; @@ -37,18 +38,18 @@ public class OnlineScoringEventListener { private final AutomationRuleEvaluatorService ruleEvaluatorService; private final ChatCompletionService aiProxyService; private final FeedbackScoreService feedbackScoreService; - private final Logger userFacingLogger; + private final @NonNull Logger userFacingLogger; @Inject - public OnlineScoringEventListener(EventBus eventBus, - AutomationRuleEvaluatorService ruleEvaluatorService, - ChatCompletionService aiProxyService, - FeedbackScoreService feedbackScoreService) { + public OnlineScoringEventListener(@NonNull EventBus eventBus, + @NonNull AutomationRuleEvaluatorService ruleEvaluatorService, + @NonNull ChatCompletionService aiProxyService, + @NonNull FeedbackScoreService feedbackScoreService) { this.ruleEvaluatorService = ruleEvaluatorService; this.aiProxyService = aiProxyService; this.feedbackScoreService = feedbackScoreService; eventBus.register(this); - userFacingLogger = UserFacingRuleLoggingFactory.getLogger(OnlineScoringEventListener.class); + userFacingLogger = UserFacingLoggingFactory.getLogger(OnlineScoringEventListener.class); } /** diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java index 63001278c7..3a4bccf61b 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java @@ -3,7 +3,7 @@ import com.comet.opik.infrastructure.ClickHouseLogAppenderConfig; import com.comet.opik.infrastructure.DatabaseAnalyticsFactory; import com.comet.opik.infrastructure.OpikConfiguration; -import com.comet.opik.infrastructure.log.UserFacingRuleLoggingFactory; +import com.comet.opik.infrastructure.log.UserFacingLoggingFactory; import com.google.inject.Provides; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.r2dbc.v1_0.R2dbcTelemetry; @@ -27,7 +27,7 @@ protected void configure() { ClickHouseLogAppenderConfig clickHouseLogAppenderConfig = configuration(ClickHouseLogAppenderConfig.class); // Initialize the UserFacingRuleLollingFactory - UserFacingRuleLoggingFactory.init(connectionFactory, clickHouseLogAppenderConfig.getBatchSize(), + UserFacingLoggingFactory.init(connectionFactory, clickHouseLogAppenderConfig.getBatchSize(), clickHouseLogAppenderConfig.getFlushIntervalDuration()); } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java index 178f9f63e0..bce05f7c01 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java @@ -65,9 +65,9 @@ private static synchronized void setInstance(ClickHouseAppender instance) { ClickHouseAppender.instance = instance; } - private final ConnectionFactory connectionFactory; + private final @NonNull ConnectionFactory connectionFactory; + private final @NonNull Duration flushIntervalDuration; private final int batchSize; - private final Duration flushIntervalDuration; private volatile boolean running = true; private BlockingQueue logQueue; @@ -75,10 +75,6 @@ private static synchronized void setInstance(ClickHouseAppender instance) { @Override public void start() { - if (connectionFactory == null) { - log.error("ClickHouse connection factory is not set"); - return; - } logQueue = new LinkedBlockingQueue<>(); scheduler = Executors.newSingleThreadScheduledExecutor(); @@ -144,7 +140,10 @@ private IllegalStateException failWithMessage(String message) { @Override protected void append(ILoggingEvent event) { - if (!running) return; + if (!running) { + log.debug("ClickHouseAppender is stopped, dropping log: {}", event.getFormattedMessage()); + return; + } boolean added = logQueue.offer(event); if (!added) { @@ -163,5 +162,20 @@ public void stop() { flushLogs(); setInstance(null); scheduler.shutdown(); + awaitTermination(); + } + + private void awaitTermination() { + try { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { // Final attempt + log.error("ClickHouseAppender did not terminate"); + } + } + } catch (InterruptedException ie) { + scheduler.shutdownNow(); + log.warn("ClickHouseAppender interrupted while waiting for termination", ie); + } } } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingRuleLoggingFactory.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingLoggingFactory.java similarity index 97% rename from apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingRuleLoggingFactory.java rename to apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingLoggingFactory.java index 60798bf7fe..04c3732dd2 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingRuleLoggingFactory.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingLoggingFactory.java @@ -11,7 +11,7 @@ import java.time.Duration; @UtilityClass -public class UserFacingRuleLoggingFactory { +public class UserFacingLoggingFactory { private static final LoggerContext CONTEXT = (LoggerContext) LoggerFactory.getILoggerFactory(); private static AsyncAppender asyncAppender; From 5a2e109c953c6d33ca21d50588bb43d9cabe35b9 Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Mon, 20 Jan 2025 13:55:49 +0100 Subject: [PATCH 10/11] Fix tests --- .../com/comet/opik/infrastructure/log/ClickHouseAppender.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java index bce05f7c01..fa1046e4da 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java @@ -49,7 +49,7 @@ public static synchronized void init(@NonNull ConnectionFactory connectionFactor @NonNull Duration flushIntervalDuration) { if (instance == null) { - setInstance(new ClickHouseAppender(connectionFactory, batchSize, flushIntervalDuration)); + setInstance(new ClickHouseAppender(connectionFactory, flushIntervalDuration, batchSize)); instance.start(); } } From f9245b1d36e74beb783117bd9b70d02756e28fe4 Mon Sep 17 00:00:00 2001 From: Thiago Hora Date: Mon, 20 Jan 2025 14:50:18 +0100 Subject: [PATCH 11/11] Extract DAO from log appender --- .../v1/events/OnlineScoringEventListener.java | 4 +- .../java/com/comet/opik/domain/UserLog.java | 9 ++ .../log/ClickHouseAppender.java | 102 ++++++------------ .../log/UserFacingLoggingFactory.java | 5 +- .../tables/AutomationRuleEvaluatorLogDAO.java | 83 ++++++++++++++ .../log/tables/UserLogTableFactory.java | 37 +++++++ 6 files changed, 168 insertions(+), 72 deletions(-) create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/domain/UserLog.java create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/AutomationRuleEvaluatorLogDAO.java create mode 100644 apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/UserLogTableFactory.java diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java index 7e898e6c3a..f9409b2632 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java @@ -8,6 +8,7 @@ import com.comet.opik.domain.AutomationRuleEvaluatorService; import com.comet.opik.domain.ChatCompletionService; import com.comet.opik.domain.FeedbackScoreService; +import com.comet.opik.domain.UserLog; import com.comet.opik.infrastructure.auth.RequestContext; import com.comet.opik.infrastructure.log.UserFacingLoggingFactory; import com.google.common.eventbus.EventBus; @@ -84,7 +85,8 @@ public void onTracesCreated(TracesCreated tracesBatch) { projectId, tracesBatch.workspaceId()); // Important to set the workspaceId for logging purposes - try (MDC.MDCCloseable scope = MDC.putCloseable("workspace_id", tracesBatch.workspaceId())) { + try (MDC.MDCCloseable logScope = MDC.putCloseable(UserLog.MARKER, UserLog.AUTOMATION_RULE_EVALUATOR.name()); + MDC.MDCCloseable scope = MDC.putCloseable("workspace_id", tracesBatch.workspaceId())) { // for each rule, sample traces and score them evaluators.forEach(evaluator -> traces.stream() diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/UserLog.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/UserLog.java new file mode 100644 index 0000000000..f4c7c9878d --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/UserLog.java @@ -0,0 +1,9 @@ +package com.comet.opik.domain; + +public enum UserLog { + AUTOMATION_RULE_EVALUATOR + + ; + + public static final String MARKER = "user_log"; +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java index fa1046e4da..b24f3b4114 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java @@ -2,59 +2,41 @@ import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.AppenderBase; -import com.comet.opik.utils.TemplateUtils; -import io.r2dbc.spi.ConnectionFactory; -import io.r2dbc.spi.Statement; +import com.comet.opik.domain.UserLog; +import com.comet.opik.infrastructure.log.tables.UserLogTableFactory; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.stringtemplate.v4.ST; -import reactor.core.publisher.Mono; import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.Optional; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; -import static com.comet.opik.utils.TemplateUtils.getQueryItemPlaceHolder; +import static java.util.stream.Collectors.groupingBy; @RequiredArgsConstructor(access = lombok.AccessLevel.PRIVATE) @Slf4j class ClickHouseAppender extends AppenderBase { - private static final String INSERT_STATEMENT = """ - INSERT INTO automation_rule_evaluator_logs (timestamp, level, workspace_id, rule_id, message, markers) - VALUES , 9), - :level, - :workspace_id, - :rule_id, - :message, - mapFromArrays(:marker_keys, :marker_values) - ) - , - }> - ; - """; - private static ClickHouseAppender instance; - public static synchronized void init(@NonNull ConnectionFactory connectionFactory, int batchSize, + public static synchronized void init(@NonNull UserLogTableFactory userLogTableFactory, int batchSize, @NonNull Duration flushIntervalDuration) { if (instance == null) { - setInstance(new ClickHouseAppender(connectionFactory, flushIntervalDuration, batchSize)); + setInstance(new ClickHouseAppender(userLogTableFactory, flushIntervalDuration, batchSize)); instance.start(); } } - public static ClickHouseAppender getInstance() { + public static synchronized ClickHouseAppender getInstance() { if (instance == null) { throw new IllegalStateException("ClickHouseAppender is not initialized"); } @@ -65,7 +47,7 @@ private static synchronized void setInstance(ClickHouseAppender instance) { ClickHouseAppender.instance = instance; } - private final @NonNull ConnectionFactory connectionFactory; + private final @NonNull UserLogTableFactory userLogTableFactory; private final @NonNull Duration flushIntervalDuration; private final int batchSize; private volatile boolean running = true; @@ -94,48 +76,28 @@ private void flushLogs() { if (batch.isEmpty()) return; - Mono.from(connectionFactory.create()) - .flatMapMany(conn -> { - - var template = new ST(INSERT_STATEMENT); - List queryItems = getQueryItemPlaceHolder(batch.size()); - - template.add("items", queryItems); - - Statement statement = conn.createStatement(template.render()); - - for (int i = 0; i < batch.size(); i++) { - ILoggingEvent event = batch.get(i); - - String logLevel = event.getLevel().toString(); - String workspaceId = Optional.ofNullable(event.getMDCPropertyMap().get("workspace_id")) - .orElseThrow(() -> failWithMessage("workspace_id is not set")); - String traceId = Optional.ofNullable(event.getMDCPropertyMap().get("trace_id")) - .orElseThrow(() -> failWithMessage("trace_id is not set")); - String ruleId = Optional.ofNullable(event.getMDCPropertyMap().get("rule_id")) - .orElseThrow(() -> failWithMessage("rule_id is not set")); - - statement - .bind("timestamp" + i, event.getInstant().toString()) - .bind("level" + i, logLevel) - .bind("workspace_id" + i, workspaceId) - .bind("rule_id" + i, ruleId) - .bind("message" + i, event.getFormattedMessage()) - .bind("marker_keys" + i, new String[]{"trace_id"}) - .bind("marker_values" + i, new String[]{traceId}); + Map> eventsPerTable = batch.stream() + .collect(groupingBy(event -> event.getMDCPropertyMap().getOrDefault(UserLog.MARKER, ""))); + + eventsPerTable + .forEach((userLog, events) -> { + + if (userLog.isBlank()) { + log.error("UserLog marker is not set for events: {}", events.stream() + .map(ILoggingEvent::getFormattedMessage) + .collect(Collectors.joining(", "))); + } else { + UserLogTableFactory.UserLogTableDAO tableDAO = userLogTableFactory + .getDAO(UserLog.valueOf(userLog)); + + tableDAO + .saveAll(events) + .subscribe( + noop -> { + }, + e -> log.error("Failed to insert logs", e)); } - - return statement.execute(); - }) - .subscribe( - noop -> { - }, - e -> log.error("Failed to insert logs", e)); - } - - private IllegalStateException failWithMessage(String message) { - log.error(message); - return new IllegalStateException(message); + }); } @Override @@ -173,9 +135,9 @@ private void awaitTermination() { log.error("ClickHouseAppender did not terminate"); } } - } catch (InterruptedException ie) { + } catch (InterruptedException ex) { scheduler.shutdownNow(); - log.warn("ClickHouseAppender interrupted while waiting for termination", ie); + log.warn("ClickHouseAppender interrupted while waiting for termination", ex); } } } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingLoggingFactory.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingLoggingFactory.java index 04c3732dd2..57c23e0f93 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingLoggingFactory.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingLoggingFactory.java @@ -3,6 +3,7 @@ import ch.qos.logback.classic.AsyncAppender; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.LoggerContext; +import com.comet.opik.infrastructure.log.tables.UserLogTableFactory; import io.r2dbc.spi.ConnectionFactory; import lombok.NonNull; import lombok.experimental.UtilityClass; @@ -18,7 +19,9 @@ public class UserFacingLoggingFactory { public static synchronized void init(@NonNull ConnectionFactory connectionFactory, int batchSize, @NonNull Duration flushIntervalSeconds) { - ClickHouseAppender.init(connectionFactory, batchSize, flushIntervalSeconds); + + UserLogTableFactory tableFactory = UserLogTableFactory.getInstance(connectionFactory); + ClickHouseAppender.init(tableFactory, batchSize, flushIntervalSeconds); ClickHouseAppender clickHouseAppender = ClickHouseAppender.getInstance(); clickHouseAppender.setContext(CONTEXT); diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/AutomationRuleEvaluatorLogDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/AutomationRuleEvaluatorLogDAO.java new file mode 100644 index 0000000000..83f2a33307 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/AutomationRuleEvaluatorLogDAO.java @@ -0,0 +1,83 @@ +package com.comet.opik.infrastructure.log.tables; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import com.comet.opik.utils.TemplateUtils; +import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.Statement; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.stringtemplate.v4.ST; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.Optional; + +import static com.comet.opik.infrastructure.log.tables.UserLogTableFactory.UserLogTableDAO; +import static com.comet.opik.utils.TemplateUtils.getQueryItemPlaceHolder; + +@RequiredArgsConstructor +@Slf4j +class AutomationRuleEvaluatorLogDAO implements UserLogTableDAO { + + private final ConnectionFactory factory; + + private static final String INSERT_STATEMENT = """ + INSERT INTO automation_rule_evaluator_logs (timestamp, level, workspace_id, rule_id, message, markers) + VALUES , 9), + :level, + :workspace_id, + :rule_id, + :message, + mapFromArrays(:marker_keys, :marker_values) + ) + , + }> + ; + """; + + @Override + public Mono saveAll(List events) { + return Mono.from(factory.create()) + .flatMapMany(connection -> { + var template = new ST(INSERT_STATEMENT); + List queryItems = getQueryItemPlaceHolder(events.size()); + + template.add("items", queryItems); + + Statement statement = connection.createStatement(template.render()); + + for (int i = 0; i < events.size(); i++) { + ILoggingEvent event = events.get(i); + + String logLevel = event.getLevel().toString(); + String workspaceId = Optional.ofNullable(event.getMDCPropertyMap().get("workspace_id")) + .orElseThrow(() -> failWithMessage("workspace_id is not set")); + String traceId = Optional.ofNullable(event.getMDCPropertyMap().get("trace_id")) + .orElseThrow(() -> failWithMessage("trace_id is not set")); + String ruleId = Optional.ofNullable(event.getMDCPropertyMap().get("rule_id")) + .orElseThrow(() -> failWithMessage("rule_id is not set")); + + statement + .bind("timestamp" + i, event.getInstant().toString()) + .bind("level" + i, logLevel) + .bind("workspace_id" + i, workspaceId) + .bind("rule_id" + i, ruleId) + .bind("message" + i, event.getFormattedMessage()) + .bind("marker_keys" + i, new String[]{"trace_id"}) + .bind("marker_values" + i, new String[]{traceId}); + } + + return statement.execute(); + }) + .collectList() + .then(); + } + + private IllegalStateException failWithMessage(String message) { + log.error(message); + return new IllegalStateException(message); + } + +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/UserLogTableFactory.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/UserLogTableFactory.java new file mode 100644 index 0000000000..9bce54ce50 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/UserLogTableFactory.java @@ -0,0 +1,37 @@ +package com.comet.opik.infrastructure.log.tables; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import com.comet.opik.domain.UserLog; +import io.r2dbc.spi.ConnectionFactory; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import reactor.core.publisher.Mono; + +import java.util.List; + +public interface UserLogTableFactory { + + static UserLogTableFactory getInstance(ConnectionFactory factory) { + return new UserLogTableFactoryImpl(factory); + } + + interface UserLogTableDAO { + Mono saveAll(List events); + } + + UserLogTableDAO getDAO(UserLog userLog); + +} + +@RequiredArgsConstructor +class UserLogTableFactoryImpl implements UserLogTableFactory { + + private final ConnectionFactory factory; + + @Override + public UserLogTableDAO getDAO(@NonNull UserLog userLog) { + return switch (userLog) { + case AUTOMATION_RULE_EVALUATOR -> new AutomationRuleEvaluatorLogDAO(factory); + }; + } +}