-
Notifications
You must be signed in to change notification settings - Fork 124
Introduce query history write buffer for DB resiliency #794
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Peiying Ye.
|
0703b78 to
0512583
Compare
|
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Peiying Ye.
|
0512583 to
8c52dd0
Compare
trigger build
85776ef to
caab5db
Compare
| { | ||
| dao = requireNonNull(jdbi, "jdbi is null").onDemand(QueryHistoryDao.class); | ||
| this.isOracleBackend = isOracleBackend; | ||
| if (writeBufferConfig != null && writeBufferConfig.isEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (writeBufferConfig != null && writeBufferConfig.isEnabled()) { | |
| if (writeBufferConfig.isEnabled()) { |
writeBufferConfig should never be null. If it's null for whatever reason, we should fail fast.
| public void buffer(T item) | ||
| { | ||
| if (!deque.offerLast(item)) { | ||
| deque.pollFirst(); | ||
| deque.offerLast(item); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs synchronized to be thread safe.
| int flushed = 0; | ||
| for (T next; (next = deque.pollFirst()) != null; ) { | ||
| try { | ||
| flusher.accept(next); | ||
| flushed++; | ||
| } | ||
| catch (RuntimeException e) { | ||
| deque.offerFirst(next); | ||
| break; // stop after first failure | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may fail to insert back to the queue when it's full.
Maybe peek() -> accept() -> remove() ?
| public class WriteBufferConfiguration | ||
| { | ||
| private boolean enabled; | ||
| private int maxCapacity = 10000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason for this number 10,000?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a default value, same for the flushInterval. You can always overwrite them in the config file. I'll also update the default value settings in the doc. Do you recommend to set different default values for them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just wondering what is the justification for this number 10,000. Maybe other projects use this number for buffer?
| { | ||
| private boolean enabled; | ||
| private int maxCapacity = 10000; | ||
| private Duration flushInterval = new Duration(2, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for 2 seconds. What is the reason for this number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #794 (comment)
|
|
||
| public void setMaxCapacity(int maxCapacity) | ||
| { | ||
| this.maxCapacity = maxCapacity; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe check for negative/zero number?
| public WriteBufferConfiguration() {} | ||
|
|
||
| public WriteBufferConfiguration(boolean enabled, int maxCapacity, Duration flushInterval) | ||
| { | ||
| this.enabled = enabled; | ||
| this.maxCapacity = maxCapacity; | ||
| this.flushInterval = flushInterval; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would you need two constructors?
| } | ||
| catch (RuntimeException e) { | ||
| if (isConnectionIssue(e) && writeBuffer != null) { | ||
| writeBuffer.buffer(queryDetail); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if writer buffer is full?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we won't buffer this record and the query submission would fail - same behavior as the current code without this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is an improvement to possible issue of loosing record, how about we at least log error that information is lost and increase buffer and/or change flush interval?
| writeBufferConfig.getFlushInterval().toMillis(), | ||
| writeBufferConfig.getFlushInterval().toMillis(), | ||
| TimeUnit.MILLISECONDS); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't there be a race condition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please elaborate on the race condition here? The flusher scheduledExecutor is a single thread here, and the buffering thread safe issue is handled in the WriteBuffer class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The single-threaded scheduler doesn't eliminate the race condition—it just means only one thread calls flushAll(), but that thread still races with multiple threads calling buffer().
Even though the scheduledExecutor is single-threaded, the race condition occurs between:
- Multiple application threads calling buffer() (synchronized)
- Single scheduler thread calling flushAll() (NOT synchronized)
- Any thread calling size() (NOT synchronized)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public int flushAll(Consumer<T> flusher) | ||
| { | ||
| int flushed = 0; | ||
| for (T next; (next = deque.peekFirst()) != null; ) { | ||
| try { | ||
| flusher.accept(next); | ||
| } | ||
| catch (RuntimeException e) { | ||
| break; // stop after first failure | ||
| } | ||
| // Only remove after a successful flush | ||
| deque.pollFirst(); | ||
| flushed++; | ||
| } | ||
| return flushed; | ||
| } | ||
|
|
||
| public int size() | ||
| { | ||
| return deque.size(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public int flushAll(Consumer<T> flusher) | |
| { | |
| int flushed = 0; | |
| for (T next; (next = deque.peekFirst()) != null; ) { | |
| try { | |
| flusher.accept(next); | |
| } | |
| catch (RuntimeException e) { | |
| break; // stop after first failure | |
| } | |
| // Only remove after a successful flush | |
| deque.pollFirst(); | |
| flushed++; | |
| } | |
| return flushed; | |
| } | |
| public int size() | |
| { | |
| return deque.size(); | |
| } | |
| } | |
| public synchronized int flushAll(Consumer<T> flusher) // Add synchronized | |
| { | |
| int flushed = 0; | |
| for (T next; (next = deque.peekFirst()) != null; ) { | |
| try { | |
| flusher.accept(next); | |
| } | |
| catch (RuntimeException e) { | |
| break; // stop after first failure | |
| } | |
| // Only remove after a successful flush | |
| deque.pollFirst(); | |
| flushed++; | |
| } | |
| return flushed; | |
| } | |
| public synchronized int size() // Add synchronized | |
| { | |
| return deque.size(); | |
| } | |
| } |
Description
Following the work in #783 to introduce caching in
HaGatewayManager, this PR adds a write buffer mechanism toQueryHistoryManager. With this change, if the database becomes unavailable, Trino-Gateway can continue to route queries using the cached Trino cluster data while temporarily storing query history records in the write buffer. When the database is available again, the buffered history entries will be flushed. This allows Trino-Gateway to avoid using the database as a single point of failure and improves overall resiliency.This approach has already been implemented and validated in production at LinkedIn.
Testing
mvn clean install