Skip to content

Commit bff0aae

Browse files
committed
Extract Streamable interface
1 parent 1b5ff22 commit bff0aae

File tree

3 files changed

+29
-8
lines changed

3 files changed

+29
-8
lines changed

core/trino-main/src/main/java/io/trino/memory/ClusterMemoryManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import io.trino.spi.QueryId;
4343
import io.trino.spi.TrinoException;
4444
import io.trino.spi.memory.MemoryPoolInfo;
45+
import io.trino.util.Streamable;
4546
import jakarta.annotation.PreDestroy;
4647
import org.weakref.jmx.JmxException;
4748
import org.weakref.jmx.MBeanExporter;
@@ -60,7 +61,6 @@
6061
import java.util.concurrent.atomic.AtomicLong;
6162
import java.util.function.Consumer;
6263
import java.util.function.Function;
63-
import java.util.function.Supplier;
6464
import java.util.stream.Stream;
6565

6666
import static com.google.common.base.Preconditions.checkState;
@@ -174,7 +174,7 @@ public synchronized void addChangeListener(Consumer<MemoryPoolInfo> listener)
174174
changeListeners.add(listener);
175175
}
176176

177-
public synchronized void process(Supplier<Stream<QueryExecution>> allQueries, Function<QueryId, Optional<QueryExecution>> executionInfoSupplier)
177+
public synchronized void process(Streamable<QueryExecution> allQueries, Function<QueryId, Optional<QueryExecution>> executionInfoSupplier)
178178
{
179179
// TODO revocable memory reservations can also leak and may need to be detected in the future
180180
// We are only concerned about the leaks in the memory pool.
@@ -186,7 +186,7 @@ public synchronized void process(Supplier<Stream<QueryExecution>> allQueries, Fu
186186
long totalUserMemoryBytes = 0L;
187187
long totalMemoryBytes = 0L;
188188
int queriesCount = 0;
189-
Iterator<QueryExecution> iterator = allQueries.get()
189+
Iterator<QueryExecution> iterator = allQueries.stream()
190190
.filter(query -> query.getState() == QueryState.RUNNING)
191191
.iterator();
192192

@@ -233,7 +233,7 @@ public synchronized void process(Supplier<Stream<QueryExecution>> allQueries, Fu
233233

234234
if (!lowMemoryKillers.isEmpty() && outOfMemory && !queryKilled) {
235235
if (isLastKillTargetGone()) {
236-
callOomKiller(allQueries.get(), executionInfoSupplier);
236+
callOomKiller(allQueries.stream(), executionInfoSupplier);
237237
}
238238
else {
239239
log.debug("Last killed target is still not gone: %s", lastKillTarget);

core/trino-main/src/main/java/io/trino/security/AccessControlUtil.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
*/
1414
package io.trino.security;
1515

16-
import com.google.common.base.Supplier;
1716
import io.trino.SessionRepresentation;
1817
import io.trino.server.BasicQueryInfo;
1918
import io.trino.spi.security.Identity;
19+
import io.trino.util.Streamable;
2020

2121
import java.util.Collection;
2222
import java.util.stream.Stream;
@@ -35,10 +35,10 @@ public static void checkCanViewQueryOwnedBy(Identity identity, Identity queryOwn
3535
accessControl.checkCanViewQueryOwnedBy(identity, queryOwner);
3636
}
3737

38-
public static Stream<BasicQueryInfo> filterQueries(Supplier<Stream<BasicQueryInfo>> queries, Identity identity, AccessControl accessControl)
38+
public static Stream<BasicQueryInfo> filterQueries(Streamable<BasicQueryInfo> queries, Identity identity, AccessControl accessControl)
3939
{
4040
Collection<Identity> owners = queries
41-
.get()
41+
.stream()
4242
.map(BasicQueryInfo::getSession)
4343
.map(SessionRepresentation::toIdentity)
4444
.filter(owner -> !owner.getUser().equals(identity.getUser()))
@@ -47,7 +47,7 @@ public static Stream<BasicQueryInfo> filterQueries(Supplier<Stream<BasicQueryInf
4747
Collection<Identity> allowedOwners = accessControl.filterQueriesOwnedBy(identity, owners);
4848

4949
return queries
50-
.get()
50+
.stream()
5151
.filter(queryInfo -> {
5252
Identity queryIdentity = queryInfo.getSession().toIdentity();
5353
return queryIdentity.getUser().equals(identity.getUser()) || allowedOwners.contains(queryIdentity);
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.util;
15+
16+
import java.util.stream.Stream;
17+
18+
public interface Streamable<T>
19+
{
20+
Stream<T> stream();
21+
}

0 commit comments

Comments
 (0)