Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions gateway-ha/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.2.3</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
*/
package io.trino.gateway.ha.router;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.airlift.log.Logger;
import io.trino.gateway.ha.clustermonitor.ClusterStats;
import io.trino.gateway.ha.clustermonitor.TrinoStatus;
Expand All @@ -36,7 +35,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -123,7 +121,7 @@ public String findBackendForQueryId(String queryId)
try {
backendAddress = queryIdBackendCache.get(queryId);
}
catch (ExecutionException e) {
catch (RuntimeException e) {
log.warn("Exception while loading queryId from cache %s", e.getLocalizedMessage());
}
return backendAddress;
Expand All @@ -137,7 +135,7 @@ public String findExternalUrlForQueryId(String queryId)
try {
externalUrl = queryIdExternalUrlCache.get(queryId);
}
catch (ExecutionException e) {
catch (RuntimeException e) {
log.warn("Exception while loading queryId from cache %s", e.getLocalizedMessage());
}
return externalUrl;
Expand All @@ -155,7 +153,7 @@ public String findRoutingGroupForQueryId(String queryId)
try {
routingGroup = queryIdRoutingGroupCache.get(queryId);
}
catch (ExecutionException e) {
catch (RuntimeException e) {
log.warn("Exception while loading queryId from routing group cache %s", e.getLocalizedMessage());
}
return routingGroup;
Expand Down Expand Up @@ -245,35 +243,23 @@ private String searchAllBackendForQuery(String queryId)
*/
private String findRoutingGroupForUnknownQueryId(String queryId)
{
String routingGroup = queryHistoryManager.getRoutingGroupForQueryId(queryId);
setRoutingGroupForQueryId(queryId, routingGroup);
return routingGroup;
return queryHistoryManager.getRoutingGroupForQueryId(queryId);
}

/**
* Attempts to look up the external url associated with the query id from query history table
*/
private String findExternalUrlForUnknownQueryId(String queryId)
{
String externalUrl = queryHistoryManager.getExternalUrlForQueryId(queryId);
setExternalUrlForQueryId(queryId, externalUrl);
return externalUrl;
return queryHistoryManager.getExternalUrlForQueryId(queryId);
}

private LoadingCache<String, String> buildCache(Function<String, String> loader)
{
return CacheBuilder.newBuilder()
return Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterAccess(30, TimeUnit.MINUTES)
.build(
new CacheLoader<>()
{
@Override
public String load(String queryId)
{
return loader.apply(queryId);
}
});
.build(loader::apply);
}

private boolean isBackendHealthy(String backendId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.nimbusds.oauth2.sdk.ParseException;
import com.nimbusds.oauth2.sdk.Request;
import com.nimbusds.oauth2.sdk.token.BearerAccessToken;
Expand All @@ -42,7 +41,6 @@
import java.util.Base64;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static com.nimbusds.openid.connect.sdk.UserInfoResponse.parse;
Expand Down Expand Up @@ -198,7 +196,7 @@ private Optional<String> extractUserFromBearerAuth(String header, String userFie
userInfo = Optional.of(userInfoCache.orElseThrow().get(token));
return Optional.of(userInfo.orElseThrow().getSubject().toString());
}
catch (ExecutionException e) {
catch (RuntimeException e) {
log.error(e, "Could not get userInfo");
}
}
Expand All @@ -216,10 +214,10 @@ public TrinoRequestUserProvider(RequestAnalyzerConfig config)
userField = config.getTokenUserField();
if (config.getOauthTokenInfoUrl() != null) {
oauthUserInfoUrl = Optional.of(URI.create(config.getOauthTokenInfoUrl()));
userInfoCache = Optional.of(CacheBuilder.newBuilder()
userInfoCache = Optional.of(Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterAccess(10, TimeUnit.MINUTES)
.build(CacheLoader.from(this::getUserInfo)));
.build(this::getUserInfo));
}
else {
oauthUserInfoUrl = Optional.empty();
Expand Down