Skip to content

Commit c59bb7d

Browse files
andythsuvishalya
authored andcommitted
Show RoutingGroup info in query history
1 parent 01e62e9 commit c59bb7d

25 files changed

+213
-91
lines changed

gateway-ha/src/main/java/io/trino/gateway/ha/handler/ProxyUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.io.IOException;
2323
import java.io.InputStreamReader;
24+
import java.net.URI;
2425
import java.util.List;
2526
import java.util.Optional;
2627
import java.util.regex.Matcher;
@@ -118,8 +119,8 @@ else if (path.startsWith(TRINO_UI_PATH)) {
118119
return Optional.empty();
119120
}
120121

121-
public static String buildUriWithNewBackend(String backendHost, HttpServletRequest request)
122+
public static URI buildUriWithNewCluster(String backendHost, HttpServletRequest request)
122123
{
123-
return backendHost + request.getRequestURI() + (request.getQueryString() != null ? "?" + request.getQueryString() : "");
124+
return URI.create(backendHost + request.getRequestURI() + (request.getQueryString() != null ? "?" + request.getQueryString() : ""));
124125
}
125126
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.handler;
15+
16+
import java.net.URI;
17+
18+
public record RoutingDestination(String routingGroup, String clusterHost, URI clusterUri) {}

gateway-ha/src/main/java/io/trino/gateway/ha/handler/RoutingTargetHandler.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import static io.trino.gateway.ha.handler.HttpUtils.V1_INFO_PATH;
3737
import static io.trino.gateway.ha.handler.HttpUtils.V1_NODE_PATH;
3838
import static io.trino.gateway.ha.handler.HttpUtils.V1_QUERY_PATH;
39-
import static io.trino.gateway.ha.handler.ProxyUtils.buildUriWithNewBackend;
39+
import static io.trino.gateway.ha.handler.ProxyUtils.buildUriWithNewCluster;
4040
import static io.trino.gateway.ha.handler.ProxyUtils.extractQueryIdIfPresent;
4141
import static java.util.Objects.requireNonNull;
4242

@@ -66,13 +66,27 @@ public RoutingTargetHandler(
6666
cookiesEnabled = GatewayCookieConfigurationPropertiesProvider.getInstance().isEnabled();
6767
}
6868

69-
public String getRoutingDestination(HttpServletRequest request)
69+
public RoutingDestination getRoutingDestination(HttpServletRequest request)
7070
{
71-
Optional<String> previousBackend = getPreviousBackend(request);
72-
String clusterHost = previousBackend.orElseGet(() -> getBackendFromRoutingGroup(request));
73-
logRewrite(clusterHost, request);
71+
Optional<String> queryId = extractQueryIdIfPresent(request, statementPaths, requestAnalyserClientsUseV2Format, requestAnalyserMaxBodySize);
72+
Optional<String> previousCluster = getPreviousCluster(queryId, request);
73+
RoutingDestination routingDestination = previousCluster.map(cluster -> {
74+
String routingGroup = queryId.map(routingManager::findRoutingGroupForQueryId)
75+
.orElse("adhoc");
76+
return new RoutingDestination(routingGroup, cluster, buildUriWithNewCluster(cluster, request));
77+
}).orElse(getClusterFromRoutingGroup(request));
78+
logRewrite(routingDestination.clusterHost(), request);
79+
return routingDestination;
80+
}
7481

75-
return buildUriWithNewBackend(clusterHost, request);
82+
private RoutingDestination getClusterFromRoutingGroup(HttpServletRequest request)
83+
{
84+
Optional<String> routingGroup = routingGroupSelector.findRoutingGroup(request);
85+
String user = request.getHeader(USER_HEADER);
86+
// This falls back on adhoc routing group if there is no cluster found for the routing group.
87+
String group = routingGroup.orElse("adhoc");
88+
String clusterHost = routingManager.provideClusterForRoutingGroup(group, user);
89+
return new RoutingDestination(group, clusterHost, buildUriWithNewCluster(clusterHost, request));
7690
}
7791

7892
public boolean isPathWhiteListed(String path)
@@ -87,20 +101,8 @@ public boolean isPathWhiteListed(String path)
87101
|| extraWhitelistPaths.stream().anyMatch(pattern -> pattern.matcher(path).matches());
88102
}
89103

90-
private String getBackendFromRoutingGroup(HttpServletRequest request)
91-
{
92-
String routingGroup = routingGroupSelector.findRoutingGroup(request);
93-
String user = request.getHeader(USER_HEADER);
94-
if (!isNullOrEmpty(routingGroup)) {
95-
// This falls back on adhoc backend if there is no cluster found for the routing group.
96-
return routingManager.provideBackendForRoutingGroup(routingGroup, user);
97-
}
98-
return routingManager.provideAdhocBackend(user);
99-
}
100-
101-
private Optional<String> getPreviousBackend(HttpServletRequest request)
104+
private Optional<String> getPreviousCluster(Optional<String> queryId, HttpServletRequest request)
102105
{
103-
Optional<String> queryId = extractQueryIdIfPresent(request, statementPaths, requestAnalyserClientsUseV2Format, requestAnalyserMaxBodySize);
104106
if (queryId.isPresent()) {
105107
return queryId.map(routingManager::findBackendForQueryId);
106108
}
@@ -129,6 +131,6 @@ private void logRewrite(String newBackend, HttpServletRequest request)
129131
request.getServerPort(),
130132
request.getRequestURI(),
131133
(request.getQueryString() != null ? "?" + request.getQueryString() : ""),
132-
buildUriWithNewBackend(newBackend, request));
134+
buildUriWithNewCluster(newBackend, request));
133135
}
134136
}

gateway-ha/src/main/java/io/trino/gateway/ha/persistence/dao/QueryHistory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ public record QueryHistory(
2424
@ColumnName("backend_url") String backendUrl,
2525
@ColumnName("user_name") @Nullable String userName,
2626
@ColumnName("source") @Nullable String source,
27-
@ColumnName("created") long created)
27+
@ColumnName("created") long created,
28+
@ColumnName("routing_group") String routingGroup)
2829
{
2930
public QueryHistory
3031
{

gateway-ha/src/main/java/io/trino/gateway/ha/persistence/dao/QueryHistoryDao.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ default List<QueryHistory> findRecentQueriesByUserName(String userName, boolean
7777
""")
7878
String findBackendUrlByQueryId(String queryId);
7979

80+
@SqlQuery("""
81+
SELECT routing_group FROM query_history
82+
WHERE query_id = :queryId
83+
""")
84+
String findRoutingGroupByQueryId(String queryId);
85+
8086
@SqlQuery("""
8187
SELECT * FROM query_history
8288
WHERE 1 = 1 <condition>
@@ -104,10 +110,10 @@ GROUP BY FLOOR(created / 1000 / 60), backend_url
104110
List<Map<String, Object>> findDistribution(long created);
105111

106112
@SqlUpdate("""
107-
INSERT INTO query_history (query_id, query_text, backend_url, user_name, source, created)
108-
VALUES (:queryId, :queryText, :backendUrl, :userName, :source, :created)
113+
INSERT INTO query_history (query_id, query_text, backend_url, user_name, source, created, routing_group)
114+
VALUES (:queryId, :queryText, :backendUrl, :userName, :source, :created, :routingGroup)
109115
""")
110-
void insertHistory(String queryId, String queryText, String backendUrl, String userName, String source, long created);
116+
void insertHistory(String queryId, String queryText, String backendUrl, String userName, String source, long created, String routingGroup);
111117

112118
@SqlUpdate("""
113119
DELETE FROM query_history

gateway-ha/src/main/java/io/trino/gateway/ha/router/ExternalRoutingGroupSelector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public class ExternalRoutingGroupSelector
7878
}
7979

8080
@Override
81-
public String findRoutingGroup(HttpServletRequest servletRequest)
81+
public Optional<String> findRoutingGroup(HttpServletRequest servletRequest)
8282
{
8383
try {
8484
RoutingGroupExternalBody requestBody = createRequestBody(servletRequest);
@@ -100,13 +100,13 @@ public String findRoutingGroup(HttpServletRequest servletRequest)
100100
else if (response.errors() != null && !response.errors().isEmpty()) {
101101
throw new RuntimeException("Response with error: " + String.join(", ", response.errors()));
102102
}
103-
return response.routingGroup();
103+
return Optional.ofNullable(response.routingGroup());
104104
}
105105
catch (Exception e) {
106106
log.error(e, "Error occurred while retrieving routing group "
107107
+ "from external routing rules processing at " + uri);
108108
}
109-
return servletRequest.getHeader(ROUTING_GROUP_HEADER);
109+
return Optional.ofNullable(servletRequest.getHeader(ROUTING_GROUP_HEADER));
110110
}
111111

112112
private RoutingGroupExternalBody createRequestBody(HttpServletRequest request)

gateway-ha/src/main/java/io/trino/gateway/ha/router/FileBasedRoutingGroupSelector.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.HashMap;
3131
import java.util.List;
3232
import java.util.Map;
33+
import java.util.Optional;
3334

3435
import static com.google.common.base.Suppliers.memoizeWithExpiration;
3536
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -60,7 +61,7 @@ public FileBasedRoutingGroupSelector(String rulesPath, Duration rulesRefreshPeri
6061
}
6162

6263
@Override
63-
public String findRoutingGroup(HttpServletRequest request)
64+
public Optional<String> findRoutingGroup(HttpServletRequest request)
6465
{
6566
Map<String, String> result = new HashMap<>();
6667
Map<String, Object> state = new HashMap<>();
@@ -84,7 +85,7 @@ public String findRoutingGroup(HttpServletRequest request)
8485
rule.evaluateAction(result, data, state);
8586
}
8687
});
87-
return result.get(RESULTS_ROUTING_GROUP_KEY);
88+
return Optional.ofNullable(result.get(RESULTS_ROUTING_GROUP_KEY));
8889
}
8990

9091
public List<RoutingRule> readRulesFromPath(Path rulesPath)

gateway-ha/src/main/java/io/trino/gateway/ha/router/HaQueryHistoryManager.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ public void submitQueryDetail(QueryDetail queryDetail)
6060
queryDetail.getBackendUrl(),
6161
queryDetail.getUser(),
6262
queryDetail.getSource(),
63-
queryDetail.getCaptureTime());
63+
queryDetail.getCaptureTime(),
64+
queryDetail.getRoutingGroup());
6465
}
6566

6667
@Override
@@ -87,6 +88,7 @@ private static List<QueryHistoryManager.QueryDetail> upcast(List<QueryHistory> q
8788
queryDetail.setBackendUrl(dao.backendUrl());
8889
queryDetail.setUser(dao.userName());
8990
queryDetail.setSource(dao.source());
91+
queryDetail.setRoutingGroup(dao.routingGroup());
9092
queryDetails.add(queryDetail);
9193
}
9294
return queryDetails;
@@ -98,6 +100,12 @@ public String getBackendForQueryId(String queryId)
98100
return dao.findBackendUrlByQueryId(queryId);
99101
}
100102

103+
@Override
104+
public String getRoutingGroupForQueryId(String queryId)
105+
{
106+
return dao.findRoutingGroupByQueryId(queryId);
107+
}
108+
101109
@Override
102110
public TableData<QueryDetail> findQueryHistory(QueryHistoryRequest query)
103111
{

gateway-ha/src/main/java/io/trino/gateway/ha/router/QueryCountBasedRouter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,16 +222,16 @@ private synchronized Optional<String> getBackendForRoutingGroup(String routingGr
222222
}
223223

224224
@Override
225-
public String provideAdhocBackend(String user)
225+
public String provideAdhocCluster(String user)
226226
{
227227
return getBackendForRoutingGroup("adhoc", user).orElseThrow(() -> new RouterException("did not find any cluster for the adhoc routing group"));
228228
}
229229

230230
@Override
231-
public String provideBackendForRoutingGroup(String routingGroup, String user)
231+
public String provideClusterForRoutingGroup(String routingGroup, String user)
232232
{
233233
return getBackendForRoutingGroup(routingGroup, user)
234-
.orElseGet(() -> provideAdhocBackend(user));
234+
.orElse(provideAdhocCluster(user));
235235
}
236236

237237
@Override

gateway-ha/src/main/java/io/trino/gateway/ha/router/QueryHistoryManager.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public interface QueryHistoryManager
3232

3333
String getBackendForQueryId(String queryId);
3434

35+
String getRoutingGroupForQueryId(String queryId);
36+
3537
TableData<QueryDetail> findQueryHistory(QueryHistoryRequest query);
3638

3739
List<DistributionResponse.LineChart> findDistribution(Long ts);
@@ -45,6 +47,7 @@ class QueryDetail
4547
private String source;
4648
private String backendUrl;
4749
private long captureTime;
50+
private String routingGroup;
4851

4952
public QueryDetail() {}
5053

@@ -125,6 +128,17 @@ public void setCaptureTime(long captureTime)
125128
this.captureTime = captureTime;
126129
}
127130

131+
@JsonProperty
132+
public String getRoutingGroup()
133+
{
134+
return this.routingGroup;
135+
}
136+
137+
public void setRoutingGroup(String routingGroup)
138+
{
139+
this.routingGroup = routingGroup;
140+
}
141+
128142
@Override
129143
public boolean equals(Object o)
130144
{
@@ -140,13 +154,14 @@ public boolean equals(Object o)
140154
Objects.equals(queryText, that.queryText) &&
141155
Objects.equals(user, that.user) &&
142156
Objects.equals(source, that.source) &&
143-
Objects.equals(backendUrl, that.backendUrl);
157+
Objects.equals(backendUrl, that.backendUrl) &&
158+
Objects.equals(routingGroup, that.routingGroup);
144159
}
145160

146161
@Override
147162
public int hashCode()
148163
{
149-
return Objects.hash(queryId, queryText, user, source, backendUrl, captureTime);
164+
return Objects.hash(queryId, queryText, user, source, backendUrl, captureTime, routingGroup);
150165
}
151166

152167
@Override
@@ -159,6 +174,7 @@ public String toString()
159174
.add("source", source)
160175
.add("backendUrl", backendUrl)
161176
.add("captureTime", captureTime)
177+
.add("routingGroup", routingGroup)
162178
.toString();
163179
}
164180
}

gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingGroupSelector.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import io.trino.gateway.ha.config.RulesExternalConfiguration;
2020
import jakarta.servlet.http.HttpServletRequest;
2121

22+
import java.util.Optional;
23+
2224
/**
2325
* RoutingGroupSelector provides a way to match an HTTP request to a Gateway routing group.
2426
*/
@@ -32,7 +34,7 @@ public interface RoutingGroupSelector
3234
*/
3335
static RoutingGroupSelector byRoutingGroupHeader()
3436
{
35-
return request -> request.getHeader(ROUTING_GROUP_HEADER);
37+
return request -> Optional.ofNullable(request.getHeader(ROUTING_GROUP_HEADER));
3638
}
3739

3840
/**
@@ -60,5 +62,5 @@ static RoutingGroupSelector byRoutingExternal(
6062
* Given an HTTP request find a routing group to direct the request to. If a routing group cannot
6163
* be determined return null.
6264
*/
63-
String findRoutingGroup(HttpServletRequest request);
65+
Optional<String> findRoutingGroup(HttpServletRequest request);
6466
}

0 commit comments

Comments
 (0)