diff --git a/apps/opik-backend/config.yml b/apps/opik-backend/config.yml index 3a452fb41c..d408df583b 100644 --- a/apps/opik-backend/config.yml +++ b/apps/opik-backend/config.yml @@ -256,3 +256,13 @@ cacheManager: # Default: {} # Description: Dynamically created caches with their respective time to live in seconds automationRules: ${CACHE_MANAGER_AUTOMATION_RULES_DURATION:-PT1S} + +# Configuration for clickhouse log appender +clickHouseLogAppender: + # Default: 1000 + # Description: Number of log messages to be batched before sending to ClickHouse + batchSize: ${CLICKHOUSE_LOG_APPENDER_BATCH_SIZE:-1000} + + # Default: PT0.500S or 500ms + # Description: Time interval after which the log messages are sent to ClickHouse if the batch size is not reached + flushIntervalDuration: ${CLICKHOUSE_LOG_APPENDER_FLUSH_INTERVAL_DURATION:-PT0.500S} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java index 5a15f71ea8..f9409b2632 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java @@ -2,25 +2,35 @@ import com.comet.opik.api.AutomationRuleEvaluatorLlmAsJudge; import com.comet.opik.api.AutomationRuleEvaluatorType; +import com.comet.opik.api.FeedbackScoreBatchItem; import com.comet.opik.api.Trace; import com.comet.opik.api.events.TracesCreated; import com.comet.opik.domain.AutomationRuleEvaluatorService; import com.comet.opik.domain.ChatCompletionService; import com.comet.opik.domain.FeedbackScoreService; +import com.comet.opik.domain.UserLog; import com.comet.opik.infrastructure.auth.RequestContext; +import com.comet.opik.infrastructure.log.UserFacingLoggingFactory; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; +import dev.langchain4j.model.chat.response.ChatResponse; import jakarta.inject.Inject; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.MDC; import ru.vyarus.dropwizard.guice.module.installer.feature.eager.EagerSingleton; +import java.math.BigDecimal; import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; -import java.util.stream.Collectors; -import static com.comet.opik.api.AutomationRuleEvaluatorLlmAsJudge.LlmAsJudgeCode; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; @EagerSingleton @Slf4j @@ -29,16 +39,18 @@ public class OnlineScoringEventListener { private final AutomationRuleEvaluatorService ruleEvaluatorService; private final ChatCompletionService aiProxyService; private final FeedbackScoreService feedbackScoreService; + private final @NonNull Logger userFacingLogger; @Inject - public OnlineScoringEventListener(EventBus eventBus, - AutomationRuleEvaluatorService ruleEvaluatorService, - ChatCompletionService aiProxyService, - FeedbackScoreService feedbackScoreService) { + public OnlineScoringEventListener(@NonNull EventBus eventBus, + @NonNull AutomationRuleEvaluatorService ruleEvaluatorService, + @NonNull ChatCompletionService aiProxyService, + @NonNull FeedbackScoreService feedbackScoreService) { this.ruleEvaluatorService = ruleEvaluatorService; this.aiProxyService = aiProxyService; this.feedbackScoreService = feedbackScoreService; eventBus.register(this); + userFacingLogger = UserFacingLoggingFactory.getLogger(OnlineScoringEventListener.class); } /** @@ -53,10 +65,10 @@ public void onTracesCreated(TracesCreated tracesBatch) { log.debug(tracesBatch.traces().toString()); Map> tracesByProject = tracesBatch.traces().stream() - .collect(Collectors.groupingBy(Trace::projectId)); + .collect(groupingBy(Trace::projectId)); Map countMap = tracesByProject.entrySet().stream() - .collect(Collectors.toMap(entry -> "projectId: " + entry.getKey(), + .collect(toMap(entry -> "projectId: " + entry.getKey(), entry -> entry.getValue().size())); log.debug("Received traces for workspace '{}': {}", tracesBatch.workspaceId(), countMap); @@ -72,11 +84,30 @@ public void onTracesCreated(TracesCreated tracesBatch) { log.info("Found {} evaluators for project '{}' on workspace '{}'", evaluators.size(), projectId, tracesBatch.workspaceId()); - // for each rule, sample traces and score them - evaluators.forEach(evaluator -> traces.stream() - .filter(e -> random.nextFloat() < evaluator.getSamplingRate()) - .forEach(trace -> score(trace, evaluator.getCode(), tracesBatch.workspaceId(), - tracesBatch.userName()))); + // Important to set the workspaceId for logging purposes + try (MDC.MDCCloseable logScope = MDC.putCloseable(UserLog.MARKER, UserLog.AUTOMATION_RULE_EVALUATOR.name()); + MDC.MDCCloseable scope = MDC.putCloseable("workspace_id", tracesBatch.workspaceId())) { + + // for each rule, sample traces and score them + evaluators.forEach(evaluator -> traces.stream() + .filter(trace -> { + boolean sampled = random.nextFloat() < evaluator.getSamplingRate(); + + if (!sampled) { + MDC.put("rule_id", evaluator.getId().toString()); + MDC.put("trace_id", trace.id().toString()); + + userFacingLogger.info( + "The traceId '{}' was skipped for rule: '{}' and per the sampling rate '{}'", + trace.id(), evaluator.getName(), evaluator.getSamplingRate()); + } + + return sampled; + }) + .forEach(trace -> score(trace, evaluator, tracesBatch.workspaceId(), + tracesBatch.userName()))); + } + }); } @@ -85,31 +116,66 @@ public void onTracesCreated(TracesCreated tracesBatch) { * If the evaluator has multiple score definitions, it calls the LLM once per score definition. * * @param trace the trace to score - * @param evaluatorCode the automation rule to score the trace + * @param evaluator the automation rule to score the trace * @param workspaceId the workspace the trace belongs */ - private void score(Trace trace, LlmAsJudgeCode evaluatorCode, String workspaceId, + private void score(Trace trace, AutomationRuleEvaluatorLlmAsJudge evaluator, String workspaceId, String userName) { - var scoreRequest = OnlineScoringEngine.prepareLlmRequest(evaluatorCode, trace); + //This is crucial for logging purposes to identify the rule and trace + try (var ruleScope = MDC.putCloseable("rule_id", evaluator.getId().toString()); + var traceScope = MDC.putCloseable("trace_id", trace.id().toString())) { - var chatResponse = aiProxyService.scoreTrace(scoreRequest, evaluatorCode.model(), workspaceId); - - var scores = OnlineScoringEngine.toFeedbackScores(chatResponse).stream() - .map(item -> item.toBuilder() - .id(trace.id()) - .projectId(trace.projectId()) - .projectName(trace.projectName()) - .build()) - .toList(); - - log.info("Received {} scores for traceId '{}' in workspace '{}'. Storing them.", scores.size(), trace.id(), - workspaceId); + processScores(trace, evaluator, workspaceId, userName); + } + } - feedbackScoreService.scoreBatchOfTraces(scores) - .contextWrite(ctx -> ctx.put(RequestContext.USER_NAME, userName) - .put(RequestContext.WORKSPACE_ID, workspaceId)) - .block(); + private void processScores(Trace trace, AutomationRuleEvaluatorLlmAsJudge evaluator, String workspaceId, + String userName) { + userFacingLogger.info("Evaluating traceId '{}' sampled by rule '{}'", trace.id(), evaluator.getName()); + + var scoreRequest = OnlineScoringEngine.prepareLlmRequest(evaluator.getCode(), trace); + + userFacingLogger.info("Sending traceId '{}' to LLM using the following input:\n\n{}", trace.id(), scoreRequest); + + final ChatResponse chatResponse; + + try { + chatResponse = aiProxyService.scoreTrace(scoreRequest, evaluator.getCode().model(), workspaceId); + userFacingLogger.info("Received response for traceId '{}':\n\n{}", trace.id(), chatResponse); + } catch (Exception e) { + userFacingLogger.error("Unexpected error while scoring traceId '{}' with rule '{}'", trace.id(), + evaluator.getName()); + throw e; + } + + try { + var scores = OnlineScoringEngine.toFeedbackScores(chatResponse).stream() + .map(item -> item.toBuilder() + .id(trace.id()) + .projectId(trace.projectId()) + .projectName(trace.projectName()) + .build()) + .toList(); + + log.info("Received {} scores for traceId '{}' in workspace '{}'. Storing them.", scores.size(), trace.id(), + workspaceId); + + feedbackScoreService.scoreBatchOfTraces(scores) + .contextWrite(ctx -> ctx.put(RequestContext.USER_NAME, userName) + .put(RequestContext.WORKSPACE_ID, workspaceId)) + .block(); + + Map> loggedScores = scores + .stream() + .collect( + groupingBy(FeedbackScoreBatchItem::name, mapping(FeedbackScoreBatchItem::value, toList()))); + + userFacingLogger.info("Scores for traceId '{}' stored successfully:\n\n{}", trace.id(), loggedScores); + } catch (Exception e) { + userFacingLogger.error("Unexpected error while storing scores for traceId '{}'", trace.id()); + throw e; + } } } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/UserLog.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/UserLog.java new file mode 100644 index 0000000000..f4c7c9878d --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/UserLog.java @@ -0,0 +1,9 @@ +package com.comet.opik.domain; + +public enum UserLog { + AUTOMATION_RULE_EVALUATOR + + ; + + public static final String MARKER = "user_log"; +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ClickHouseLogAppenderConfig.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ClickHouseLogAppenderConfig.java new file mode 100644 index 0000000000..9f571f38ea --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/ClickHouseLogAppenderConfig.java @@ -0,0 +1,18 @@ +package com.comet.opik.infrastructure; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +import java.time.Duration; + +@Data +public class ClickHouseLogAppenderConfig { + + @Valid @JsonProperty + private int batchSize = 1000; + + @Valid @JsonProperty + @NotNull private Duration flushIntervalDuration; +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/OpikConfiguration.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/OpikConfiguration.java index d7ec4d7427..e98ee0c2c7 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/OpikConfiguration.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/OpikConfiguration.java @@ -53,4 +53,7 @@ public class OpikConfiguration extends JobConfiguration { @Valid @NotNull @JsonProperty private CacheConfiguration cacheManager = new CacheConfiguration(); + + @Valid @NotNull @JsonProperty + private ClickHouseLogAppenderConfig clickHouseLogAppender = new ClickHouseLogAppenderConfig(); } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java index 5afa853170..3a4bccf61b 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java @@ -1,7 +1,9 @@ package com.comet.opik.infrastructure.db; +import com.comet.opik.infrastructure.ClickHouseLogAppenderConfig; import com.comet.opik.infrastructure.DatabaseAnalyticsFactory; import com.comet.opik.infrastructure.OpikConfiguration; +import com.comet.opik.infrastructure.log.UserFacingLoggingFactory; import com.google.inject.Provides; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.r2dbc.v1_0.R2dbcTelemetry; @@ -19,14 +21,20 @@ public class DatabaseAnalyticsModule extends DropwizardAwareModule { + + private static ClickHouseAppender instance; + + public static synchronized void init(@NonNull UserLogTableFactory userLogTableFactory, int batchSize, + @NonNull Duration flushIntervalDuration) { + + if (instance == null) { + setInstance(new ClickHouseAppender(userLogTableFactory, flushIntervalDuration, batchSize)); + instance.start(); + } + } + + public static synchronized ClickHouseAppender getInstance() { + if (instance == null) { + throw new IllegalStateException("ClickHouseAppender is not initialized"); + } + return instance; + } + + private static synchronized void setInstance(ClickHouseAppender instance) { + ClickHouseAppender.instance = instance; + } + + private final @NonNull UserLogTableFactory userLogTableFactory; + private final @NonNull Duration flushIntervalDuration; + private final int batchSize; + private volatile boolean running = true; + + private BlockingQueue logQueue; + private ScheduledExecutorService scheduler; + + @Override + public void start() { + + logQueue = new LinkedBlockingQueue<>(); + scheduler = Executors.newSingleThreadScheduledExecutor(); + + // Background flush thread + scheduler.scheduleAtFixedRate(this::flushLogs, flushIntervalDuration.toMillis(), + flushIntervalDuration.toMillis(), TimeUnit.MILLISECONDS); + + super.start(); + } + + private void flushLogs() { + if (logQueue.isEmpty()) return; + + List batch = new ArrayList<>(logQueue.size()); + logQueue.drainTo(batch, logQueue.size()); + + if (batch.isEmpty()) return; + + Map> eventsPerTable = batch.stream() + .collect(groupingBy(event -> event.getMDCPropertyMap().getOrDefault(UserLog.MARKER, ""))); + + eventsPerTable + .forEach((userLog, events) -> { + + if (userLog.isBlank()) { + log.error("UserLog marker is not set for events: {}", events.stream() + .map(ILoggingEvent::getFormattedMessage) + .collect(Collectors.joining(", "))); + } else { + UserLogTableFactory.UserLogTableDAO tableDAO = userLogTableFactory + .getDAO(UserLog.valueOf(userLog)); + + tableDAO + .saveAll(events) + .subscribe( + noop -> { + }, + e -> log.error("Failed to insert logs", e)); + } + }); + } + + @Override + protected void append(ILoggingEvent event) { + if (!running) { + log.debug("ClickHouseAppender is stopped, dropping log: {}", event.getFormattedMessage()); + return; + } + + boolean added = logQueue.offer(event); + if (!added) { + log.warn("Log queue is full, dropping log: {}", event.getFormattedMessage()); + } + + if (logQueue.size() >= batchSize) { + scheduler.execute(this::flushLogs); + } + } + + @Override + public void stop() { + running = false; + super.stop(); + flushLogs(); + setInstance(null); + scheduler.shutdown(); + awaitTermination(); + } + + private void awaitTermination() { + try { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { // Final attempt + log.error("ClickHouseAppender did not terminate"); + } + } + } catch (InterruptedException ex) { + scheduler.shutdownNow(); + log.warn("ClickHouseAppender interrupted while waiting for termination", ex); + } + } +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingLoggingFactory.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingLoggingFactory.java new file mode 100644 index 0000000000..57c23e0f93 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingLoggingFactory.java @@ -0,0 +1,50 @@ +package com.comet.opik.infrastructure.log; + +import ch.qos.logback.classic.AsyncAppender; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.LoggerContext; +import com.comet.opik.infrastructure.log.tables.UserLogTableFactory; +import io.r2dbc.spi.ConnectionFactory; +import lombok.NonNull; +import lombok.experimental.UtilityClass; +import org.slf4j.LoggerFactory; + +import java.time.Duration; + +@UtilityClass +public class UserFacingLoggingFactory { + + private static final LoggerContext CONTEXT = (LoggerContext) LoggerFactory.getILoggerFactory(); + private static AsyncAppender asyncAppender; + + public static synchronized void init(@NonNull ConnectionFactory connectionFactory, int batchSize, + @NonNull Duration flushIntervalSeconds) { + + UserLogTableFactory tableFactory = UserLogTableFactory.getInstance(connectionFactory); + ClickHouseAppender.init(tableFactory, batchSize, flushIntervalSeconds); + + ClickHouseAppender clickHouseAppender = ClickHouseAppender.getInstance(); + clickHouseAppender.setContext(CONTEXT); + + asyncAppender = new AsyncAppender(); + asyncAppender.setContext(CONTEXT); + asyncAppender.setNeverBlock(true); + asyncAppender.setIncludeCallerData(true); + asyncAppender.addAppender(clickHouseAppender); + asyncAppender.start(); + + addShutdownHook(); + } + + private static void addShutdownHook() { + Runtime.getRuntime().addShutdownHook(new Thread(CONTEXT::stop)); + } + + public static org.slf4j.Logger getLogger(@NonNull Class clazz) { + Logger logger = CONTEXT.getLogger("%s.UserFacingLog".formatted(clazz.getName())); + logger.addAppender(asyncAppender); + logger.setAdditive(false); + return logger; + } + +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/AutomationRuleEvaluatorLogDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/AutomationRuleEvaluatorLogDAO.java new file mode 100644 index 0000000000..83f2a33307 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/AutomationRuleEvaluatorLogDAO.java @@ -0,0 +1,83 @@ +package com.comet.opik.infrastructure.log.tables; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import com.comet.opik.utils.TemplateUtils; +import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.Statement; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.stringtemplate.v4.ST; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.Optional; + +import static com.comet.opik.infrastructure.log.tables.UserLogTableFactory.UserLogTableDAO; +import static com.comet.opik.utils.TemplateUtils.getQueryItemPlaceHolder; + +@RequiredArgsConstructor +@Slf4j +class AutomationRuleEvaluatorLogDAO implements UserLogTableDAO { + + private final ConnectionFactory factory; + + private static final String INSERT_STATEMENT = """ + INSERT INTO automation_rule_evaluator_logs (timestamp, level, workspace_id, rule_id, message, markers) + VALUES , 9), + :level, + :workspace_id, + :rule_id, + :message, + mapFromArrays(:marker_keys, :marker_values) + ) + , + }> + ; + """; + + @Override + public Mono saveAll(List events) { + return Mono.from(factory.create()) + .flatMapMany(connection -> { + var template = new ST(INSERT_STATEMENT); + List queryItems = getQueryItemPlaceHolder(events.size()); + + template.add("items", queryItems); + + Statement statement = connection.createStatement(template.render()); + + for (int i = 0; i < events.size(); i++) { + ILoggingEvent event = events.get(i); + + String logLevel = event.getLevel().toString(); + String workspaceId = Optional.ofNullable(event.getMDCPropertyMap().get("workspace_id")) + .orElseThrow(() -> failWithMessage("workspace_id is not set")); + String traceId = Optional.ofNullable(event.getMDCPropertyMap().get("trace_id")) + .orElseThrow(() -> failWithMessage("trace_id is not set")); + String ruleId = Optional.ofNullable(event.getMDCPropertyMap().get("rule_id")) + .orElseThrow(() -> failWithMessage("rule_id is not set")); + + statement + .bind("timestamp" + i, event.getInstant().toString()) + .bind("level" + i, logLevel) + .bind("workspace_id" + i, workspaceId) + .bind("rule_id" + i, ruleId) + .bind("message" + i, event.getFormattedMessage()) + .bind("marker_keys" + i, new String[]{"trace_id"}) + .bind("marker_values" + i, new String[]{traceId}); + } + + return statement.execute(); + }) + .collectList() + .then(); + } + + private IllegalStateException failWithMessage(String message) { + log.error(message); + return new IllegalStateException(message); + } + +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/UserLogTableFactory.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/UserLogTableFactory.java new file mode 100644 index 0000000000..9bce54ce50 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/UserLogTableFactory.java @@ -0,0 +1,37 @@ +package com.comet.opik.infrastructure.log.tables; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import com.comet.opik.domain.UserLog; +import io.r2dbc.spi.ConnectionFactory; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import reactor.core.publisher.Mono; + +import java.util.List; + +public interface UserLogTableFactory { + + static UserLogTableFactory getInstance(ConnectionFactory factory) { + return new UserLogTableFactoryImpl(factory); + } + + interface UserLogTableDAO { + Mono saveAll(List events); + } + + UserLogTableDAO getDAO(UserLog userLog); + +} + +@RequiredArgsConstructor +class UserLogTableFactoryImpl implements UserLogTableFactory { + + private final ConnectionFactory factory; + + @Override + public UserLogTableDAO getDAO(@NonNull UserLog userLog) { + return switch (userLog) { + case AUTOMATION_RULE_EVALUATOR -> new AutomationRuleEvaluatorLogDAO(factory); + }; + } +} diff --git a/apps/opik-backend/src/test/resources/config-test.yml b/apps/opik-backend/src/test/resources/config-test.yml index e13a6540ea..76a00f89b0 100644 --- a/apps/opik-backend/src/test/resources/config-test.yml +++ b/apps/opik-backend/src/test/resources/config-test.yml @@ -218,3 +218,13 @@ cacheManager: # Default: {} # Description: Dynamically created caches with their respective time to live in seconds testCache: PT1S + +# Configuration for clickhouse log appender +clickHouseLogAppender: + # Default: 1000 + # Description: Number of log messages to be batched before sending to ClickHouse + batchSize: 1000 + + # Default: PT0.500S or 500ms + # Description: Time interval after which the log messages are sent to ClickHouse if the batch size is not reached + flushIntervalDuration: PT0.500S