Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class HaGatewayConfiguration
private List<String> statementPaths = ImmutableList.of(V1_STATEMENT_PATH);
private boolean includeClusterHostInResponse;
private ProxyResponseConfiguration proxyResponseConfiguration = new ProxyResponseConfiguration();
private WriteBufferConfiguration writeBuffer = new WriteBufferConfiguration();

private RequestAnalyzerConfig requestAnalyzerConfig = new RequestAnalyzerConfig();

Expand Down Expand Up @@ -268,6 +269,16 @@ public void setIncludeClusterHostInResponse(boolean includeClusterHostInResponse
this.includeClusterHostInResponse = includeClusterHostInResponse;
}

public WriteBufferConfiguration getWriteBuffer()
{
return writeBuffer;
}

public void setWriteBuffer(WriteBufferConfiguration writeBuffer)
{
this.writeBuffer = writeBuffer;
}

public ProxyResponseConfiguration getProxyResponseConfiguration()
{
return this.proxyResponseConfiguration;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.config;

import io.airlift.units.Duration;

import java.util.concurrent.TimeUnit;

public class WriteBufferConfiguration
{
private boolean enabled;
private int maxCapacity = 10000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for this number 10,000?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a default value, same for the flushInterval. You can always overwrite them in the config file. I'll also update the default value settings in the doc. Do you recommend to set different default values for them?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just wondering what is the justification for this number 10,000. Maybe other projects use this number for buffer?

private Duration flushInterval = new Duration(2, TimeUnit.SECONDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for 2 seconds. What is the reason for this number?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


public boolean isEnabled()
{
return enabled;
}

public void setEnabled(boolean enabled)
{
this.enabled = enabled;
}

public int getMaxCapacity()
{
return maxCapacity;
}

public void setMaxCapacity(int maxCapacity)
{
if (maxCapacity <= 0) {
throw new IllegalArgumentException("maxCapacity must be positive");
}
this.maxCapacity = maxCapacity;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe check for negative/zero number?

}

public Duration getFlushInterval()
{
return flushInterval;
}

public void setFlushInterval(Duration flushInterval)
{
if (flushInterval.toMillis() <= 0) {
throw new IllegalArgumentException("flushInterval must be positive");
}
this.flushInterval = flushInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public HaGatewayProviderModule(HaGatewayConfiguration configuration)
JdbcConnectionManager connectionManager = new JdbcConnectionManager(jdbi, configuration.getDataStore());
resourceGroupsManager = new HaResourceGroupsManager(connectionManager);
gatewayBackendManager = new HaGatewayManager(jdbi, configuration.getRouting());
queryHistoryManager = new HaQueryHistoryManager(jdbi, configuration.getDataStore().getJdbcUrl().startsWith("jdbc:oracle"));
queryHistoryManager = new HaQueryHistoryManager(jdbi, configuration.getDataStore().getJdbcUrl().startsWith("jdbc:oracle"), configuration.getWriteBuffer());
}

private LbOAuthManager getOAuthManager(HaGatewayConfiguration configuration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@
package io.trino.gateway.ha.router;

import com.google.common.base.Strings;
import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.trino.gateway.ha.config.WriteBufferConfiguration;
import io.trino.gateway.ha.domain.TableData;
import io.trino.gateway.ha.domain.request.QueryHistoryRequest;
import io.trino.gateway.ha.domain.response.DistributionResponse;
import io.trino.gateway.ha.persistence.dao.QueryHistory;
import io.trino.gateway.ha.persistence.dao.QueryHistoryDao;
import jakarta.annotation.PreDestroy;
import org.jdbi.v3.core.ConnectionException;
import org.jdbi.v3.core.Jdbi;

import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
Expand All @@ -29,21 +35,45 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static java.util.Objects.requireNonNull;

public class HaQueryHistoryManager
implements QueryHistoryManager
{
private static final int FIRST_PAGE_NO = 1;
private static final Logger log = Logger.get(HaQueryHistoryManager.class);

private final QueryHistoryDao dao;
private final boolean isOracleBackend;
private final WriteBuffer<QueryDetail> writeBuffer;
private final ScheduledExecutorService scheduledExecutor;

public HaQueryHistoryManager(Jdbi jdbi, boolean isOracleBackend)
@Inject
public HaQueryHistoryManager(Jdbi jdbi, boolean isOracleBackend, WriteBufferConfiguration writeBufferConfig)
{
dao = requireNonNull(jdbi, "jdbi is null").onDemand(QueryHistoryDao.class);
this.isOracleBackend = isOracleBackend;
if (writeBufferConfig.isEnabled()) {
this.writeBuffer = new WriteBuffer<>(writeBufferConfig.getMaxCapacity());
this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "query-history-write-buffer");
t.setDaemon(true);
return t;
});
scheduledExecutor.scheduleWithFixedDelay(
this::flushBufferedWrites,
writeBufferConfig.getFlushInterval().toMillis(),
writeBufferConfig.getFlushInterval().toMillis(),
TimeUnit.MILLISECONDS);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't there be a race condition

Copy link
Member Author

@Peiyingy Peiyingy Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please elaborate on the race condition here? The flusher scheduledExecutor is a single thread here, and the buffering thread safe issue is handled in the WriteBuffer class

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The single-threaded scheduler doesn't eliminate the race condition—it just means only one thread calls flushAll(), but that thread still races with multiple threads calling buffer().

Even though the scheduledExecutor is single-threaded, the race condition occurs between:

  • Multiple application threads calling buffer() (synchronized)
  • Single scheduler thread calling flushAll() (NOT synchronized)
  • Any thread calling size() (NOT synchronized)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else {
this.writeBuffer = null;
this.scheduledExecutor = null;
}
}

@Override
Expand All @@ -54,15 +84,27 @@ public void submitQueryDetail(QueryDetail queryDetail)
return;
}

dao.insertHistory(
queryDetail.getQueryId(),
queryDetail.getQueryText(),
queryDetail.getBackendUrl(),
queryDetail.getUser(),
queryDetail.getSource(),
queryDetail.getCaptureTime(),
queryDetail.getRoutingGroup(),
queryDetail.getExternalUrl());
try {
dao.insertHistory(
queryDetail.getQueryId(),
queryDetail.getQueryText(),
queryDetail.getBackendUrl(),
queryDetail.getUser(),
queryDetail.getSource(),
queryDetail.getCaptureTime(),
queryDetail.getRoutingGroup(),
queryDetail.getExternalUrl());
}
catch (RuntimeException e) {
if (isConnectionIssue(e) && writeBuffer != null) {
writeBuffer.buffer(queryDetail);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if writer buffer is full?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we won't buffer this record and the query submission would fail - same behavior as the current code without this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is an improvement to possible issue of loosing record, how about we at least log error that information is lost and increase buffer and/or change flush interval?

log.warn(e, "DB unavailable; buffered query_history entry. queryId=%s, bufferSize=%s",
queryDetail.getQueryId(), writeBuffer.size());
}
else {
throw e;
}
}
}

@Override
Expand Down Expand Up @@ -166,4 +208,66 @@ private static int getStart(int pageNo, int pageSize)
}
return (pageNo - FIRST_PAGE_NO) * pageSize;
}

private static boolean isConnectionIssue(Throwable t)
{
// SQL State codes starting with "08" indicate connection exceptions per ANSI/ISO SQL standard.
// See: https://en.wikipedia.org/wiki/SQLSTATE
// Examples: 08000 (connection exception), 08001 (cannot establish connection),
// 08003 (connection does not exist), 08006 (connection failure), etc.
for (Throwable cur = t; cur != null; cur = cur.getCause()) {
if (cur instanceof ConnectionException) {
return true;
}
if (cur instanceof SQLException sql) {
String sqlState = sql.getSQLState();
if (sqlState != null && sqlState.startsWith("08")) {
return true;
}
}
}
return false;
}

private void flushBufferedWrites()
{
if (writeBuffer == null) {
return;
}
int before = writeBuffer.size();
int flushed = writeBuffer.flushAll(r -> {
dao.insertHistory(
r.getQueryId(),
r.getQueryText(),
r.getBackendUrl(),
r.getUser(),
r.getSource(),
r.getCaptureTime(),
r.getRoutingGroup(),
r.getExternalUrl());
});
if (flushed > 0) {
log.info("Flushed %s buffered query_history entries", flushed);
}
else if (before > 0 && writeBuffer.size() == before) {
log.warn("Failed to flush buffered query_history entries; will retry. bufferSize=%s", before);
}
}

@PreDestroy
public void stop()
{
if (scheduledExecutor == null) {
return;
}
try {
flushBufferedWrites();
}
catch (RuntimeException t) {
log.warn(t, "Error while flushing buffered query_history entries during shutdown");
}
finally {
scheduledExecutor.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.router;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Consumer;

public final class WriteBuffer<T>
{
private final BlockingDeque<T> deque;

public WriteBuffer(int maxCapacity)
{
this.deque = new LinkedBlockingDeque<>(maxCapacity);
}

/** Buffer an item for later flush. Drops the oldest if full. */
public void buffer(T item)
{
synchronized (this) {
if (!deque.offerLast(item)) {
deque.pollFirst();
deque.offerLast(item);
}
}
}

/**
* Flushes items in insertion order by applying the provided flusher.
* Stops immediately if flush can't be performed on item and re-inserts
* the failed item at the head of the buffer.
*
* @param flusher consumer invoked for each buffered item
* @return number of items successfully flushed
*/
public int flushAll(Consumer<T> flusher)
{
int flushed = 0;
for (T next; (next = deque.peekFirst()) != null; ) {
try {
flusher.accept(next);
}
catch (RuntimeException e) {
break; // stop after first failure
}
// Only remove after a successful flush
deque.pollFirst();
flushed++;
}
return flushed;
}

public int size()
{
return deque.size();
}
}
Comment on lines +48 to +69
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public int flushAll(Consumer<T> flusher)
{
int flushed = 0;
for (T next; (next = deque.peekFirst()) != null; ) {
try {
flusher.accept(next);
}
catch (RuntimeException e) {
break; // stop after first failure
}
// Only remove after a successful flush
deque.pollFirst();
flushed++;
}
return flushed;
}
public int size()
{
return deque.size();
}
}
public synchronized int flushAll(Consumer<T> flusher) // Add synchronized
{
int flushed = 0;
for (T next; (next = deque.peekFirst()) != null; ) {
try {
flusher.accept(next);
}
catch (RuntimeException e) {
break; // stop after first failure
}
// Only remove after a successful flush
deque.pollFirst();
flushed++;
}
return flushed;
}
public synchronized int size() // Add synchronized
{
return deque.size();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.gateway.ha.router;

import io.trino.gateway.ha.config.DataStoreConfiguration;
import io.trino.gateway.ha.config.WriteBufferConfiguration;
import io.trino.gateway.ha.persistence.FlywayMigration;
import io.trino.gateway.ha.persistence.JdbcConnectionManager;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -49,7 +50,8 @@ protected BaseExternalUrlQueryHistoryTest(JdbcDatabaseContainer<?> container)
true);
FlywayMigration.migrate(config);
JdbcConnectionManager jdbcConnectionManager = createTestingJdbcConnectionManager(container, config);
queryHistoryManager = new HaQueryHistoryManager(jdbcConnectionManager.getJdbi(), container.getJdbcUrl().startsWith("jdbc:oracle"));
WriteBufferConfiguration writeBufferConfiguration = new WriteBufferConfiguration();
queryHistoryManager = new HaQueryHistoryManager(jdbcConnectionManager.getJdbi(), container.getJdbcUrl().startsWith("jdbc:oracle"), writeBufferConfiguration);
}

@AfterAll
Expand Down
Loading