-
Notifications
You must be signed in to change notification settings - Fork 310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OPIK-796: Implement ClickHouse user facing logs #1066
OPIK-796: Implement ClickHouse user facing logs #1066
Conversation
...backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java
Outdated
Show resolved
Hide resolved
...backend/src/main/java/com/comet/opik/api/resources/v1/events/OnlineScoringEventListener.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, left a few minor comments.
Do you plan to add tests in a later PR? Have you tested locally?
4217cd5
to
e28974e
Compare
You are right, sorry I forgot to add the description. Yes, it was manually tested since this feature still doesn't have automated tests due to the issue with API-KEYs running in our pipeline. |
755d147
to
e6863eb
Compare
…ora/OPIK-796_implement_clickhouse_user_logs
@thiagohora reviewing your base PR (the table model now). I'll get back to this in a sec. Please hold the merge. Better push them separately. |
Agree, don't worry I will not merge it. |
df53b40
to
249d384
Compare
…ora/OPIK-796_implement_clickhouse_user_logs
…ttps://github.com/comet-ml/opik into thiagohora/OPIK-796_implement_clickhouse_user_logs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's polish the details of this approach and add test coverage.
@@ -29,6 +37,7 @@ public class OnlineScoringEventListener { | |||
private final AutomationRuleEvaluatorService ruleEvaluatorService; | |||
private final ChatCompletionService aiProxyService; | |||
private final FeedbackScoreService feedbackScoreService; | |||
private final Logger userFacingLogger; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: let's add non-null validation to all these fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: after looking a the constructor, actually this annotation that I suggested previously doesn't do anything. But thanks for adding the others to the constructor arguments, those indeed validate.
import ch.qos.logback.classic.spi.ILoggingEvent; | ||
import ch.qos.logback.core.AppenderBase; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any way to not to rely on the particular log framework implementation (Logback) and just rely on the facade (SL4J)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, apparently, SLF4J doesn't have an abstraction for appenders.
apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
public static ClickHouseAppender getInstance() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be synchronized as well. But I think it's better to merge init
and getInstance
methods into a single one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably need to think about this because many configuration settings must be made statically. Doing this in the get instance is challenging because we must pass the connection factory. Another option would be to make the log factory a component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In your current implementation you always call both in a row. I'd personally just return the instance from init
and maybe remove it to getInstance
, so you end up with a method that both inits and returns the instance.
apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
protected void append(ILoggingEvent event) { | ||
if (!running) return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some logs to these conditionals, so we can know that they were executed. Ok to be at debug level if you think they can be too noisy. Review others in this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You missed a few of these. But not a blocker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added debug log
boolean added = logQueue.offer(event); | ||
if (!added) { | ||
log.warn("Log queue is full, dropping log: {}", event.getFormattedMessage()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider flushing before, to reduce the chances of dropping logs when the queue is full.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is the log to ran async, to I would about blocking for long during append
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the call below is in the async executor
if (logQueue.size() >= batchSize) {
scheduler.execute(this::flushLogs);
}
I didn't notice that in my first review.
Moving boolean added = logQueue.offer(event);
before is pointless therefore, ignore this comment then.
apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java
Show resolved
Hide resolved
@UtilityClass | ||
public class UserFacingRuleLoggingFactory { | ||
|
||
private static final LoggerContext CONTEXT = (LoggerContext) LoggerFactory.getILoggerFactory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: avoid the cast if possible. Ignore otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This cast is needed
...ik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingRuleLoggingFactory.java
Outdated
Show resolved
Hide resolved
f3ef241
to
357be97
Compare
The test is in the next PR since there were no tests for the online scoring before. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some more comments, but this can go as most of them are either minor or can be done in the next PR.
The main one is to cache the DAOs so they're just retrieved and used, instead of instantiated each time.
The rest have less importance.
logQueue = new LinkedBlockingQueue<>(); | ||
scheduler = Executors.newSingleThreadScheduledExecutor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I follow. If you move it here:
private BlockingQueue<ILoggingEvent> logQueue = new LinkedBlockingQueue<>();
private ScheduledExecutorService scheduler = = Executors.newSingleThreadScheduledExecutor();
Creation of those objects is moved to ClickHouseAppender
instantiation time. But you would still schedule only when invoking this start
method.
Anyway, this is a minor improvement for separation of responsibilities.
} | ||
|
||
private void flushLogs() { | ||
if (logQueue.isEmpty()) return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, but that's why you control the order in stop
method.
However, now that you have shutdownNow
with a grace period after shutdown
this isn't critical anymore. I agree it's better to allow the scheduler to continue flushing during that grace period.
boolean added = logQueue.offer(event); | ||
if (!added) { | ||
log.warn("Log queue is full, dropping log: {}", event.getFormattedMessage()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the call below is in the async executor
if (logQueue.size() >= batchSize) {
scheduler.execute(this::flushLogs);
}
I didn't notice that in my first review.
Moving boolean added = logQueue.offer(event);
before is pointless therefore, ignore this comment then.
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { | ||
scheduler.shutdownNow(); | ||
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { // Final attempt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: grace period could be configurable (consider that you wait twice).
log.error("ClickHouseAppender did not terminate"); | ||
} | ||
} | ||
} catch (InterruptedException ex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're missing the Thread.currentThread().interrupt();
call.
@@ -29,6 +37,7 @@ public class OnlineScoringEventListener { | |||
private final AutomationRuleEvaluatorService ruleEvaluatorService; | |||
private final ChatCompletionService aiProxyService; | |||
private final FeedbackScoreService feedbackScoreService; | |||
private final Logger userFacingLogger; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: after looking a the constructor, actually this annotation that I suggested previously doesn't do anything. But thanks for adding the others to the constructor arguments, those indeed validate.
} | ||
} | ||
|
||
public static ClickHouseAppender getInstance() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In your current implementation you always call both in a row. I'd personally just return the instance from init
and maybe remove it to getInstance
, so you end up with a method that both inits and returns the instance.
} | ||
|
||
private IllegalStateException failWithMessage(String message) { | ||
log.error(message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about this log:
subscribe(
noop -> {
},
e -> log.error("Failed to insert logs", e));
|
||
@Override | ||
protected void append(ILoggingEvent event) { | ||
if (!running) return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You missed a few of these. But not a blocker.
return new UserLogTableFactoryImpl(factory); | ||
} | ||
|
||
interface UserLogTableDAO { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a small joke this is probably the only DAO in our service that deserves having an interface.
I agree; I will merge this one and address this in the following PR. |
Details
Add a specialized logger to send logs to the user-facing rule logs table.
Issues
OPIK-796
#Testing