Skip to content

Commit aa59d0d

Browse files
Support configurable ExchangeManager
Add support for specifying exchange manager config file location in config.
1 parent 59f6866 commit aa59d0d

19 files changed

+134
-20
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.exchange;
15+
16+
import io.airlift.configuration.Config;
17+
import org.assertj.core.util.Strings;
18+
19+
import java.io.File;
20+
21+
public class ExchangeManagerConfig
22+
{
23+
private File exchangeManagerConfigFile;
24+
25+
public File getExchangeManagerConfigFile()
26+
{
27+
return exchangeManagerConfigFile;
28+
}
29+
30+
@Config("exchange-manager.config-file")
31+
public ExchangeManagerConfig setExchangeManagerConfigFile(String exchangeManagerConfigFile)
32+
{
33+
if (!Strings.isNullOrEmpty(exchangeManagerConfigFile)) {
34+
this.exchangeManagerConfigFile = new File(exchangeManagerConfigFile);
35+
}
36+
return this;
37+
}
38+
}

core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerModule.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
import com.google.inject.Module;
1818
import com.google.inject.Scopes;
1919

20+
import static io.airlift.configuration.ConfigBinder.configBinder;
21+
2022
public class ExchangeManagerModule
2123
implements Module
2224
{
2325
@Override
2426
public void configure(Binder binder)
2527
{
28+
configBinder(binder).bindConfig(ExchangeManagerConfig.class);
2629
binder.bind(ExchangeManagerRegistry.class).in(Scopes.SINGLETON);
2730
}
2831
}

core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerRegistry.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,19 @@ public class ExchangeManagerRegistry
5252

5353
private volatile ExchangeManager exchangeManager;
5454
private final SecretsResolver secretsResolver;
55+
private final File configFile;
5556

5657
@Inject
5758
public ExchangeManagerRegistry(
5859
OpenTelemetry openTelemetry,
5960
Tracer tracer,
60-
SecretsResolver secretsResolver)
61+
SecretsResolver secretsResolver,
62+
ExchangeManagerConfig config)
6163
{
6264
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
6365
this.tracer = requireNonNull(tracer, "tracer is null");
6466
this.secretsResolver = requireNonNull(secretsResolver, "secretsResolver is null");
67+
this.configFile = config.getExchangeManagerConfigFile();
6568
}
6669

6770
public void addExchangeManagerFactory(ExchangeManagerFactory factory)
@@ -74,13 +77,14 @@ public void addExchangeManagerFactory(ExchangeManagerFactory factory)
7477

7578
public void loadExchangeManager()
7679
{
77-
if (!CONFIG_FILE.exists()) {
80+
File configFile = (this.configFile != null) ? this.configFile : CONFIG_FILE;
81+
if (!configFile.exists()) {
7882
return;
7983
}
8084

81-
Map<String, String> properties = loadProperties(CONFIG_FILE);
85+
Map<String, String> properties = loadProperties(configFile);
8286
String name = properties.remove(EXCHANGE_MANAGER_NAME_PROPERTY);
83-
checkArgument(!isNullOrEmpty(name), "Exchange manager configuration %s does not contain %s", CONFIG_FILE, EXCHANGE_MANAGER_NAME_PROPERTY);
87+
checkArgument(!isNullOrEmpty(name), "Exchange manager configuration %s does not contain %s", configFile, EXCHANGE_MANAGER_NAME_PROPERTY);
8488

8589
loadExchangeManager(name, properties);
8690
}

core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import io.trino.dispatcher.DispatchManager;
5151
import io.trino.eventlistener.EventListenerConfig;
5252
import io.trino.eventlistener.EventListenerManager;
53+
import io.trino.exchange.ExchangeManagerConfig;
5354
import io.trino.exchange.ExchangeManagerRegistry;
5455
import io.trino.execution.FailureInjector;
5556
import io.trino.execution.FailureInjector.InjectedFailureType;
@@ -312,6 +313,7 @@ private TestingTrinoServer(
312313
.addBinding()
313314
.to(TracingServletFilter.class);
314315
binder.bind(EventListenerConfig.class).in(Scopes.SINGLETON);
316+
binder.bind(ExchangeManagerConfig.class).in(Scopes.SINGLETON);
315317
binder.bind(AccessControlConfig.class).in(Scopes.SINGLETON);
316318
binder.bind(TestingAccessControlManager.class).in(Scopes.SINGLETON);
317319
binder.bind(TestingGroupProvider.class).in(Scopes.SINGLETON);

core/trino-main/src/main/java/io/trino/testing/PlanTester.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import io.trino.cost.TaskCountEstimator;
6161
import io.trino.eventlistener.EventListenerConfig;
6262
import io.trino.eventlistener.EventListenerManager;
63+
import io.trino.exchange.ExchangeManagerConfig;
6364
import io.trino.exchange.ExchangeManagerRegistry;
6465
import io.trino.execution.DynamicFilterConfig;
6566
import io.trino.execution.NodeTaskMap;
@@ -473,7 +474,7 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
473474
ImmutableSet.of(),
474475
ImmutableSet.of(new ExcludeColumnsFunction()));
475476

476-
exchangeManagerRegistry = new ExchangeManagerRegistry(noop(), noopTracer(), secretsResolver);
477+
exchangeManagerRegistry = new ExchangeManagerRegistry(noop(), noopTracer(), secretsResolver, new ExchangeManagerConfig());
477478
spoolingManagerRegistry = new SpoolingManagerRegistry(
478479
new InternalNode("nodeId", URI.create("http://localhost:8080"), NodeVersion.UNKNOWN, false),
479480
new ServerConfig(),
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.exchange;
15+
16+
import com.google.common.collect.ImmutableMap;
17+
import org.junit.jupiter.api.Test;
18+
19+
import java.io.IOException;
20+
import java.nio.file.Files;
21+
import java.nio.file.Path;
22+
import java.util.Map;
23+
24+
import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
25+
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
26+
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
27+
28+
public class TestExchangeConfig
29+
{
30+
@Test
31+
public void testDefaults()
32+
{
33+
assertRecordedDefaults(recordDefaults(ExchangeManagerConfig.class)
34+
.setExchangeManagerConfigFile(null));
35+
}
36+
37+
@Test
38+
public void testExplicitPropertyMappings()
39+
throws IOException
40+
{
41+
Path exchangeConfig = Files.createTempFile(null, null);
42+
43+
Map<String, String> properties = ImmutableMap.of("exchange-manager.config-file", exchangeConfig.toString());
44+
45+
ExchangeManagerConfig expected = new ExchangeManagerConfig()
46+
.setExchangeManagerConfigFile(exchangeConfig.toFile().getPath());
47+
48+
assertFullMapping(properties, expected);
49+
}
50+
}

core/trino-main/src/test/java/io/trino/exchange/TestLazyExchangeDataSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public void testIsBlockedCancellationIsolationInInitializationPhase()
4545
throw new UnsupportedOperationException();
4646
},
4747
RetryPolicy.NONE,
48-
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())))) {
48+
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()))) {
4949
ListenableFuture<Void> first = source.isBlocked();
5050
ListenableFuture<Void> second = source.isBlocked();
5151
assertThat(first)

core/trino-main/src/test/java/io/trino/execution/BaseTestSqlTaskManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.trino.connector.CatalogHandle;
3030
import io.trino.connector.ConnectorServices;
3131
import io.trino.connector.ConnectorServicesProvider;
32+
import io.trino.exchange.ExchangeManagerConfig;
3233
import io.trino.exchange.ExchangeManagerRegistry;
3334
import io.trino.execution.buffer.BufferResult;
3435
import io.trino.execution.buffer.BufferState;
@@ -337,7 +338,7 @@ private SqlTaskManager createSqlTaskManager(TaskManagerConfig taskManagerConfig,
337338
new NodeSpillConfig(),
338339
new TestingGcMonitor(),
339340
noopTracer(),
340-
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())));
341+
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()));
341342
}
342343

343344
private TaskInfo createTask(SqlTaskManager sqlTaskManager, TaskId taskId, Set<ScheduledSplit> splits, OutputBuffers outputBuffers)

core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.opentelemetry.api.trace.Span;
3232
import io.trino.Session;
3333
import io.trino.cost.StatsAndCosts;
34+
import io.trino.exchange.ExchangeManagerConfig;
3435
import io.trino.exchange.ExchangeManagerRegistry;
3536
import io.trino.execution.NodeTaskMap.PartitionedSplitCountTracker;
3637
import io.trino.execution.buffer.LazyOutputBuffer;
@@ -235,7 +236,7 @@ public MockRemoteTask(
235236
DataSize.ofBytes(1),
236237
() -> new SimpleLocalMemoryContext(newSimpleAggregatedMemoryContext(), "test"),
237238
() -> {},
238-
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())));
239+
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()));
239240

240241
this.fragment = requireNonNull(fragment, "fragment is null");
241242
this.nodeId = requireNonNull(nodeId, "nodeId is null");

core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.trino.connector.CatalogHandle;
2323
import io.trino.connector.CatalogServiceProvider;
2424
import io.trino.cost.StatsAndCosts;
25+
import io.trino.exchange.ExchangeManagerConfig;
2526
import io.trino.exchange.ExchangeManagerRegistry;
2627
import io.trino.execution.BaseTestSqlTaskManager.MockDirectExchangeClientSupplier;
2728
import io.trino.execution.buffer.OutputBuffers;
@@ -180,7 +181,7 @@ public static LocalExecutionPlanner createTestingPlanner()
180181
blockTypeOperators,
181182
PLANNER_CONTEXT.getTypeOperators(),
182183
new TableExecuteContextManager(),
183-
new ExchangeManagerRegistry(noop(), noopTracer(), new SecretsResolver(ImmutableMap.of())),
184+
new ExchangeManagerRegistry(noop(), noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()),
184185
new NodeVersion("test"),
185186
new CompilerConfig());
186187
}

0 commit comments

Comments
 (0)