Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OPIK-796: Implement ClickHouse user facing logs #1066

Merged
merged 17 commits into from
Jan 20, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions apps/opik-backend/config.yml
Original file line number Diff line number Diff line change
@@ -256,3 +256,13 @@ cacheManager:
# Default: {}
# Description: Dynamically created caches with their respective time to live in seconds
automationRules: ${CACHE_MANAGER_AUTOMATION_RULES_DURATION:-PT1S}

# Configuration for clickhouse log appender
clickHouseLogAppender:
# Default: 1000
# Description: Number of log messages to be batched before sending to ClickHouse
batchSize: ${CLICKHOUSE_LOG_APPENDER_BATCH_SIZE:-1000}

# Default: PT0.500S or 500ms
# Description: Time interval after which the log messages are sent to ClickHouse if the batch size is not reached
flushIntervalDuration: ${CLICKHOUSE_LOG_APPENDER_FLUSH_INTERVAL_DURATION:-PT0.500S}
Original file line number Diff line number Diff line change
@@ -2,25 +2,35 @@

import com.comet.opik.api.AutomationRuleEvaluatorLlmAsJudge;
import com.comet.opik.api.AutomationRuleEvaluatorType;
import com.comet.opik.api.FeedbackScoreBatchItem;
import com.comet.opik.api.Trace;
import com.comet.opik.api.events.TracesCreated;
import com.comet.opik.domain.AutomationRuleEvaluatorService;
import com.comet.opik.domain.ChatCompletionService;
import com.comet.opik.domain.FeedbackScoreService;
import com.comet.opik.domain.UserLog;
import com.comet.opik.infrastructure.auth.RequestContext;
import com.comet.opik.infrastructure.log.UserFacingLoggingFactory;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import dev.langchain4j.model.chat.response.ChatResponse;
import jakarta.inject.Inject;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.MDC;
import ru.vyarus.dropwizard.guice.module.installer.feature.eager.EagerSingleton;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;

import static com.comet.opik.api.AutomationRuleEvaluatorLlmAsJudge.LlmAsJudgeCode;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;

@EagerSingleton
@Slf4j
@@ -29,16 +39,18 @@ public class OnlineScoringEventListener {
private final AutomationRuleEvaluatorService ruleEvaluatorService;
private final ChatCompletionService aiProxyService;
private final FeedbackScoreService feedbackScoreService;
private final @NonNull Logger userFacingLogger;

@Inject
public OnlineScoringEventListener(EventBus eventBus,
AutomationRuleEvaluatorService ruleEvaluatorService,
ChatCompletionService aiProxyService,
FeedbackScoreService feedbackScoreService) {
public OnlineScoringEventListener(@NonNull EventBus eventBus,
@NonNull AutomationRuleEvaluatorService ruleEvaluatorService,
@NonNull ChatCompletionService aiProxyService,
@NonNull FeedbackScoreService feedbackScoreService) {
this.ruleEvaluatorService = ruleEvaluatorService;
this.aiProxyService = aiProxyService;
this.feedbackScoreService = feedbackScoreService;
eventBus.register(this);
userFacingLogger = UserFacingLoggingFactory.getLogger(OnlineScoringEventListener.class);
}

/**
@@ -53,10 +65,10 @@ public void onTracesCreated(TracesCreated tracesBatch) {
log.debug(tracesBatch.traces().toString());

Map<UUID, List<Trace>> tracesByProject = tracesBatch.traces().stream()
.collect(Collectors.groupingBy(Trace::projectId));
.collect(groupingBy(Trace::projectId));

Map<String, Integer> countMap = tracesByProject.entrySet().stream()
.collect(Collectors.toMap(entry -> "projectId: " + entry.getKey(),
.collect(toMap(entry -> "projectId: " + entry.getKey(),
entry -> entry.getValue().size()));

log.debug("Received traces for workspace '{}': {}", tracesBatch.workspaceId(), countMap);
@@ -72,11 +84,30 @@ public void onTracesCreated(TracesCreated tracesBatch) {
log.info("Found {} evaluators for project '{}' on workspace '{}'", evaluators.size(),
projectId, tracesBatch.workspaceId());

// for each rule, sample traces and score them
evaluators.forEach(evaluator -> traces.stream()
.filter(e -> random.nextFloat() < evaluator.getSamplingRate())
.forEach(trace -> score(trace, evaluator.getCode(), tracesBatch.workspaceId(),
tracesBatch.userName())));
// Important to set the workspaceId for logging purposes
try (MDC.MDCCloseable logScope = MDC.putCloseable(UserLog.MARKER, UserLog.AUTOMATION_RULE_EVALUATOR.name());
MDC.MDCCloseable scope = MDC.putCloseable("workspace_id", tracesBatch.workspaceId())) {

// for each rule, sample traces and score them
evaluators.forEach(evaluator -> traces.stream()
.filter(trace -> {
boolean sampled = random.nextFloat() < evaluator.getSamplingRate();

if (!sampled) {
MDC.put("rule_id", evaluator.getId().toString());
MDC.put("trace_id", trace.id().toString());

userFacingLogger.info(
"The traceId '{}' was skipped for rule: '{}' and per the sampling rate '{}'",
trace.id(), evaluator.getName(), evaluator.getSamplingRate());
}

return sampled;
})
.forEach(trace -> score(trace, evaluator, tracesBatch.workspaceId(),
tracesBatch.userName())));
}

});
}

@@ -85,31 +116,66 @@ public void onTracesCreated(TracesCreated tracesBatch) {
* If the evaluator has multiple score definitions, it calls the LLM once per score definition.
*
* @param trace the trace to score
* @param evaluatorCode the automation rule to score the trace
* @param evaluator the automation rule to score the trace
* @param workspaceId the workspace the trace belongs
*/
private void score(Trace trace, LlmAsJudgeCode evaluatorCode, String workspaceId,
private void score(Trace trace, AutomationRuleEvaluatorLlmAsJudge evaluator, String workspaceId,
String userName) {

var scoreRequest = OnlineScoringEngine.prepareLlmRequest(evaluatorCode, trace);
//This is crucial for logging purposes to identify the rule and trace
try (var ruleScope = MDC.putCloseable("rule_id", evaluator.getId().toString());
var traceScope = MDC.putCloseable("trace_id", trace.id().toString())) {

var chatResponse = aiProxyService.scoreTrace(scoreRequest, evaluatorCode.model(), workspaceId);

var scores = OnlineScoringEngine.toFeedbackScores(chatResponse).stream()
.map(item -> item.toBuilder()
.id(trace.id())
.projectId(trace.projectId())
.projectName(trace.projectName())
.build())
.toList();

log.info("Received {} scores for traceId '{}' in workspace '{}'. Storing them.", scores.size(), trace.id(),
workspaceId);
processScores(trace, evaluator, workspaceId, userName);
}
}

feedbackScoreService.scoreBatchOfTraces(scores)
.contextWrite(ctx -> ctx.put(RequestContext.USER_NAME, userName)
.put(RequestContext.WORKSPACE_ID, workspaceId))
.block();
private void processScores(Trace trace, AutomationRuleEvaluatorLlmAsJudge evaluator, String workspaceId,
String userName) {
userFacingLogger.info("Evaluating traceId '{}' sampled by rule '{}'", trace.id(), evaluator.getName());

var scoreRequest = OnlineScoringEngine.prepareLlmRequest(evaluator.getCode(), trace);

userFacingLogger.info("Sending traceId '{}' to LLM using the following input:\n\n{}", trace.id(), scoreRequest);

final ChatResponse chatResponse;

try {
chatResponse = aiProxyService.scoreTrace(scoreRequest, evaluator.getCode().model(), workspaceId);
userFacingLogger.info("Received response for traceId '{}':\n\n{}", trace.id(), chatResponse);
} catch (Exception e) {
userFacingLogger.error("Unexpected error while scoring traceId '{}' with rule '{}'", trace.id(),
evaluator.getName());
throw e;
}

try {
var scores = OnlineScoringEngine.toFeedbackScores(chatResponse).stream()
.map(item -> item.toBuilder()
.id(trace.id())
.projectId(trace.projectId())
.projectName(trace.projectName())
.build())
.toList();

log.info("Received {} scores for traceId '{}' in workspace '{}'. Storing them.", scores.size(), trace.id(),
workspaceId);

feedbackScoreService.scoreBatchOfTraces(scores)
.contextWrite(ctx -> ctx.put(RequestContext.USER_NAME, userName)
.put(RequestContext.WORKSPACE_ID, workspaceId))
.block();

Map<String, List<BigDecimal>> loggedScores = scores
.stream()
.collect(
groupingBy(FeedbackScoreBatchItem::name, mapping(FeedbackScoreBatchItem::value, toList())));

userFacingLogger.info("Scores for traceId '{}' stored successfully:\n\n{}", trace.id(), loggedScores);
} catch (Exception e) {
userFacingLogger.error("Unexpected error while storing scores for traceId '{}'", trace.id());
throw e;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.comet.opik.domain;

public enum UserLog {
AUTOMATION_RULE_EVALUATOR

;

public static final String MARKER = "user_log";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.comet.opik.infrastructure;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import lombok.Data;

import java.time.Duration;

@Data
public class ClickHouseLogAppenderConfig {

@Valid @JsonProperty
private int batchSize = 1000;

@Valid @JsonProperty
@NotNull private Duration flushIntervalDuration;
}
Original file line number Diff line number Diff line change
@@ -53,4 +53,7 @@ public class OpikConfiguration extends JobConfiguration {

@Valid @NotNull @JsonProperty
private CacheConfiguration cacheManager = new CacheConfiguration();

@Valid @NotNull @JsonProperty
private ClickHouseLogAppenderConfig clickHouseLogAppender = new ClickHouseLogAppenderConfig();
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.comet.opik.infrastructure.db;

import com.comet.opik.infrastructure.ClickHouseLogAppenderConfig;
import com.comet.opik.infrastructure.DatabaseAnalyticsFactory;
import com.comet.opik.infrastructure.OpikConfiguration;
import com.comet.opik.infrastructure.log.UserFacingLoggingFactory;
import com.google.inject.Provides;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.r2dbc.v1_0.R2dbcTelemetry;
@@ -19,14 +21,20 @@ public class DatabaseAnalyticsModule extends DropwizardAwareModule<OpikConfigura
@Override
protected void configure() {
databaseAnalyticsFactory = configuration(DatabaseAnalyticsFactory.class);
connectionFactory = databaseAnalyticsFactory.build();
connectionFactory = R2dbcTelemetry.create(GlobalOpenTelemetry.get())
.wrapConnectionFactory(databaseAnalyticsFactory.build(), ConnectionFactoryOptions.builder().build());

ClickHouseLogAppenderConfig clickHouseLogAppenderConfig = configuration(ClickHouseLogAppenderConfig.class);

// Initialize the UserFacingRuleLollingFactory
UserFacingLoggingFactory.init(connectionFactory, clickHouseLogAppenderConfig.getBatchSize(),
clickHouseLogAppenderConfig.getFlushIntervalDuration());
}

@Provides
@Singleton
public ConnectionFactory getConnectionFactory() {
return R2dbcTelemetry.create(GlobalOpenTelemetry.get())
.wrapConnectionFactory(connectionFactory, ConnectionFactoryOptions.builder().build());
return connectionFactory;
}

@Provides
Loading