Skip to content

Commit a073be3

Browse files
Support configurable ExchangeManager
1 parent 59f6866 commit a073be3

18 files changed

+126
-21
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.exchange;
15+
16+
import io.airlift.configuration.Config;
17+
18+
import java.io.File;
19+
20+
public class ExchangeManagerConfig
21+
{
22+
private File exchangeManagerConfigFile = new File("etc/exchange-manager.properties");
23+
24+
public File getExchangeManagerConfigFile()
25+
{
26+
return exchangeManagerConfigFile;
27+
}
28+
29+
@Config("exchange-manager.config-file")
30+
public ExchangeManagerConfig setExchangeManagerConfigFile(String exchangeManagerConfigFile)
31+
{
32+
this.exchangeManagerConfigFile = new File(exchangeManagerConfigFile);
33+
return this;
34+
}
35+
}

core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerModule.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
import com.google.inject.Module;
1818
import com.google.inject.Scopes;
1919

20+
import static io.airlift.configuration.ConfigBinder.configBinder;
21+
2022
public class ExchangeManagerModule
2123
implements Module
2224
{
2325
@Override
2426
public void configure(Binder binder)
2527
{
28+
configBinder(binder).bindConfig(ExchangeManagerConfig.class);
2629
binder.bind(ExchangeManagerRegistry.class).in(Scopes.SINGLETON);
2730
}
2831
}

core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerRegistry.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ public class ExchangeManagerRegistry
4343
{
4444
private static final Logger log = Logger.get(ExchangeManagerRegistry.class);
4545

46-
private static final File CONFIG_FILE = new File("etc/exchange-manager.properties");
4746
private static final String EXCHANGE_MANAGER_NAME_PROPERTY = "exchange-manager.name";
4847

4948
private final OpenTelemetry openTelemetry;
@@ -52,16 +51,19 @@ public class ExchangeManagerRegistry
5251

5352
private volatile ExchangeManager exchangeManager;
5453
private final SecretsResolver secretsResolver;
54+
private final ExchangeManagerConfig config;
5555

5656
@Inject
5757
public ExchangeManagerRegistry(
5858
OpenTelemetry openTelemetry,
5959
Tracer tracer,
60-
SecretsResolver secretsResolver)
60+
SecretsResolver secretsResolver,
61+
ExchangeManagerConfig config)
6162
{
6263
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
6364
this.tracer = requireNonNull(tracer, "tracer is null");
6465
this.secretsResolver = requireNonNull(secretsResolver, "secretsResolver is null");
66+
this.config = requireNonNull(config, "config is null");
6567
}
6668

6769
public void addExchangeManagerFactory(ExchangeManagerFactory factory)
@@ -74,13 +76,14 @@ public void addExchangeManagerFactory(ExchangeManagerFactory factory)
7476

7577
public void loadExchangeManager()
7678
{
77-
if (!CONFIG_FILE.exists()) {
79+
File configFile = config.getExchangeManagerConfigFile();
80+
if (!configFile.exists()) {
7881
return;
7982
}
8083

81-
Map<String, String> properties = loadProperties(CONFIG_FILE);
84+
Map<String, String> properties = loadProperties(configFile);
8285
String name = properties.remove(EXCHANGE_MANAGER_NAME_PROPERTY);
83-
checkArgument(!isNullOrEmpty(name), "Exchange manager configuration %s does not contain %s", CONFIG_FILE, EXCHANGE_MANAGER_NAME_PROPERTY);
86+
checkArgument(!isNullOrEmpty(name), "Exchange manager configuration %s does not contain %s", configFile, EXCHANGE_MANAGER_NAME_PROPERTY);
8487

8588
loadExchangeManager(name, properties);
8689
}

core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import io.trino.dispatcher.DispatchManager;
5151
import io.trino.eventlistener.EventListenerConfig;
5252
import io.trino.eventlistener.EventListenerManager;
53+
import io.trino.exchange.ExchangeManagerConfig;
5354
import io.trino.exchange.ExchangeManagerRegistry;
5455
import io.trino.execution.FailureInjector;
5556
import io.trino.execution.FailureInjector.InjectedFailureType;
@@ -312,6 +313,7 @@ private TestingTrinoServer(
312313
.addBinding()
313314
.to(TracingServletFilter.class);
314315
binder.bind(EventListenerConfig.class).in(Scopes.SINGLETON);
316+
binder.bind(ExchangeManagerConfig.class).in(Scopes.SINGLETON);
315317
binder.bind(AccessControlConfig.class).in(Scopes.SINGLETON);
316318
binder.bind(TestingAccessControlManager.class).in(Scopes.SINGLETON);
317319
binder.bind(TestingGroupProvider.class).in(Scopes.SINGLETON);

core/trino-main/src/main/java/io/trino/testing/PlanTester.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import io.trino.cost.TaskCountEstimator;
6161
import io.trino.eventlistener.EventListenerConfig;
6262
import io.trino.eventlistener.EventListenerManager;
63+
import io.trino.exchange.ExchangeManagerConfig;
6364
import io.trino.exchange.ExchangeManagerRegistry;
6465
import io.trino.execution.DynamicFilterConfig;
6566
import io.trino.execution.NodeTaskMap;
@@ -473,7 +474,7 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
473474
ImmutableSet.of(),
474475
ImmutableSet.of(new ExcludeColumnsFunction()));
475476

476-
exchangeManagerRegistry = new ExchangeManagerRegistry(noop(), noopTracer(), secretsResolver);
477+
exchangeManagerRegistry = new ExchangeManagerRegistry(noop(), noopTracer(), secretsResolver, new ExchangeManagerConfig());
477478
spoolingManagerRegistry = new SpoolingManagerRegistry(
478479
new InternalNode("nodeId", URI.create("http://localhost:8080"), NodeVersion.UNKNOWN, false),
479480
new ServerConfig(),
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.exchange;
15+
16+
import com.google.common.collect.ImmutableMap;
17+
import org.junit.jupiter.api.Test;
18+
19+
import java.io.IOException;
20+
import java.nio.file.Files;
21+
import java.nio.file.Path;
22+
import java.util.Map;
23+
24+
import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
25+
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
26+
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
27+
28+
public class TestExchangeConfig
29+
{
30+
@Test
31+
public void testDefaults()
32+
{
33+
assertRecordedDefaults(recordDefaults(ExchangeManagerConfig.class)
34+
.setExchangeManagerConfigFile("etc/exchange-manager.properties"));
35+
}
36+
37+
@Test
38+
public void testExplicitPropertyMappings()
39+
throws IOException
40+
{
41+
Path exchangeConfig = Files.createTempFile(null, null);
42+
43+
Map<String, String> properties = ImmutableMap.of("exchange-manager.config-file", exchangeConfig.toString());
44+
45+
ExchangeManagerConfig expected = new ExchangeManagerConfig()
46+
.setExchangeManagerConfigFile(exchangeConfig.toFile().getPath());
47+
48+
assertFullMapping(properties, expected);
49+
}
50+
}

core/trino-main/src/test/java/io/trino/exchange/TestLazyExchangeDataSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public void testIsBlockedCancellationIsolationInInitializationPhase()
4545
throw new UnsupportedOperationException();
4646
},
4747
RetryPolicy.NONE,
48-
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())))) {
48+
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()))) {
4949
ListenableFuture<Void> first = source.isBlocked();
5050
ListenableFuture<Void> second = source.isBlocked();
5151
assertThat(first)

core/trino-main/src/test/java/io/trino/execution/BaseTestSqlTaskManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.trino.connector.CatalogHandle;
3030
import io.trino.connector.ConnectorServices;
3131
import io.trino.connector.ConnectorServicesProvider;
32+
import io.trino.exchange.ExchangeManagerConfig;
3233
import io.trino.exchange.ExchangeManagerRegistry;
3334
import io.trino.execution.buffer.BufferResult;
3435
import io.trino.execution.buffer.BufferState;
@@ -337,7 +338,7 @@ private SqlTaskManager createSqlTaskManager(TaskManagerConfig taskManagerConfig,
337338
new NodeSpillConfig(),
338339
new TestingGcMonitor(),
339340
noopTracer(),
340-
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())));
341+
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()));
341342
}
342343

343344
private TaskInfo createTask(SqlTaskManager sqlTaskManager, TaskId taskId, Set<ScheduledSplit> splits, OutputBuffers outputBuffers)

core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.opentelemetry.api.trace.Span;
3232
import io.trino.Session;
3333
import io.trino.cost.StatsAndCosts;
34+
import io.trino.exchange.ExchangeManagerConfig;
3435
import io.trino.exchange.ExchangeManagerRegistry;
3536
import io.trino.execution.NodeTaskMap.PartitionedSplitCountTracker;
3637
import io.trino.execution.buffer.LazyOutputBuffer;
@@ -235,7 +236,7 @@ public MockRemoteTask(
235236
DataSize.ofBytes(1),
236237
() -> new SimpleLocalMemoryContext(newSimpleAggregatedMemoryContext(), "test"),
237238
() -> {},
238-
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())));
239+
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()));
239240

240241
this.fragment = requireNonNull(fragment, "fragment is null");
241242
this.nodeId = requireNonNull(nodeId, "nodeId is null");

core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.trino.connector.CatalogHandle;
2323
import io.trino.connector.CatalogServiceProvider;
2424
import io.trino.cost.StatsAndCosts;
25+
import io.trino.exchange.ExchangeManagerConfig;
2526
import io.trino.exchange.ExchangeManagerRegistry;
2627
import io.trino.execution.BaseTestSqlTaskManager.MockDirectExchangeClientSupplier;
2728
import io.trino.execution.buffer.OutputBuffers;
@@ -180,7 +181,7 @@ public static LocalExecutionPlanner createTestingPlanner()
180181
blockTypeOperators,
181182
PLANNER_CONTEXT.getTypeOperators(),
182183
new TableExecuteContextManager(),
183-
new ExchangeManagerRegistry(noop(), noopTracer(), new SecretsResolver(ImmutableMap.of())),
184+
new ExchangeManagerRegistry(noop(), noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()),
184185
new NodeVersion("test"),
185186
new CompilerConfig());
186187
}

0 commit comments

Comments
 (0)