From b66e1db32cac7c050907340539f64cf27add8f28 Mon Sep 17 00:00:00 2001 From: yangshangqing95 Date: Fri, 12 Sep 2025 14:24:19 -0400 Subject: [PATCH] Support configurable ExchangeManager Add support for specifying exchange manager config file location in config. --- .../trino/exchange/ExchangeManagerConfig.java | 37 ++++++++++++++ .../trino/exchange/ExchangeManagerModule.java | 3 ++ .../exchange/ExchangeManagerRegistry.java | 13 +++-- .../server/testing/TestingTrinoServer.java | 2 + .../java/io/trino/testing/PlanTester.java | 3 +- .../io/trino/exchange/TestExchangeConfig.java | 50 +++++++++++++++++++ .../exchange/TestLazyExchangeDataSource.java | 2 +- .../execution/BaseTestSqlTaskManager.java | 3 +- .../execution/MockRemoteTaskFactory.java | 3 +- .../io/trino/execution/TaskTestUtils.java | 3 +- .../TestMemoryRevokingScheduler.java | 3 +- .../java/io/trino/execution/TestSqlTask.java | 3 +- ...estSqlTaskManagerRaceWithCatalogPrune.java | 3 +- .../TestTaskExecutorStuckSplits.java | 3 +- ...TestDeduplicatingDirectExchangeBuffer.java | 7 +-- .../operator/TestDirectExchangeClient.java | 5 +- .../trino/operator/TestExchangeOperator.java | 3 +- .../io/trino/operator/TestMergeOperator.java | 3 +- .../sphinx/admin/fault-tolerant-execution.md | 5 ++ 19 files changed, 134 insertions(+), 20 deletions(-) create mode 100644 core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerConfig.java create mode 100644 core/trino-main/src/test/java/io/trino/exchange/TestExchangeConfig.java diff --git a/core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerConfig.java b/core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerConfig.java new file mode 100644 index 000000000000..b8b13f60afd5 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerConfig.java @@ -0,0 +1,37 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.exchange; + +import io.airlift.configuration.Config; +import io.airlift.configuration.validation.FileExists; + +import java.io.File; +import java.util.Optional; + +public class ExchangeManagerConfig +{ + private Optional exchangeManagerConfigFile = Optional.empty(); + + public Optional<@FileExists File> getExchangeManagerConfigFile() + { + return exchangeManagerConfigFile; + } + + @Config("exchange-manager.config-file") + public ExchangeManagerConfig setExchangeManagerConfigFile(File exchangeManagerConfigFile) + { + this.exchangeManagerConfigFile = Optional.ofNullable(exchangeManagerConfigFile); + return this; + } +} diff --git a/core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerModule.java b/core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerModule.java index e05fe95d2de9..79bcdd537e83 100644 --- a/core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerModule.java +++ b/core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerModule.java @@ -17,12 +17,15 @@ import com.google.inject.Module; import com.google.inject.Scopes; +import static io.airlift.configuration.ConfigBinder.configBinder; + public class ExchangeManagerModule implements Module { @Override public void configure(Binder binder) { + configBinder(binder).bindConfig(ExchangeManagerConfig.class); binder.bind(ExchangeManagerRegistry.class).in(Scopes.SINGLETON); } } diff --git a/core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerRegistry.java b/core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerRegistry.java index ed66b2149930..241978fe74cc 100644 --- a/core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerRegistry.java +++ b/core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerRegistry.java @@ -29,6 +29,7 @@ import java.io.UncheckedIOException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import static com.google.common.base.Preconditions.checkArgument; @@ -52,16 +53,19 @@ public class ExchangeManagerRegistry private volatile ExchangeManager exchangeManager; private final SecretsResolver secretsResolver; + private final Optional configFile; @Inject public ExchangeManagerRegistry( OpenTelemetry openTelemetry, Tracer tracer, - SecretsResolver secretsResolver) + SecretsResolver secretsResolver, + ExchangeManagerConfig config) { this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null"); this.tracer = requireNonNull(tracer, "tracer is null"); this.secretsResolver = requireNonNull(secretsResolver, "secretsResolver is null"); + this.configFile = config.getExchangeManagerConfigFile(); } public void addExchangeManagerFactory(ExchangeManagerFactory factory) @@ -74,13 +78,14 @@ public void addExchangeManagerFactory(ExchangeManagerFactory factory) public void loadExchangeManager() { - if (!CONFIG_FILE.exists()) { + File configFile = this.configFile.orElse(CONFIG_FILE); + if (!configFile.exists()) { return; } - Map properties = loadProperties(CONFIG_FILE); + Map properties = loadProperties(configFile); String name = properties.remove(EXCHANGE_MANAGER_NAME_PROPERTY); - checkArgument(!isNullOrEmpty(name), "Exchange manager configuration %s does not contain %s", CONFIG_FILE, EXCHANGE_MANAGER_NAME_PROPERTY); + checkArgument(!isNullOrEmpty(name), "Exchange manager configuration %s does not contain %s", configFile, EXCHANGE_MANAGER_NAME_PROPERTY); loadExchangeManager(name, properties); } diff --git a/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java b/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java index e9900035f91b..fdbdb366fab0 100644 --- a/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java +++ b/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java @@ -50,6 +50,7 @@ import io.trino.dispatcher.DispatchManager; import io.trino.eventlistener.EventListenerConfig; import io.trino.eventlistener.EventListenerManager; +import io.trino.exchange.ExchangeManagerConfig; import io.trino.exchange.ExchangeManagerRegistry; import io.trino.execution.FailureInjector; import io.trino.execution.FailureInjector.InjectedFailureType; @@ -312,6 +313,7 @@ private TestingTrinoServer( .addBinding() .to(TracingServletFilter.class); binder.bind(EventListenerConfig.class).in(Scopes.SINGLETON); + binder.bind(ExchangeManagerConfig.class).in(Scopes.SINGLETON); binder.bind(AccessControlConfig.class).in(Scopes.SINGLETON); binder.bind(TestingAccessControlManager.class).in(Scopes.SINGLETON); binder.bind(TestingGroupProvider.class).in(Scopes.SINGLETON); diff --git a/core/trino-main/src/main/java/io/trino/testing/PlanTester.java b/core/trino-main/src/main/java/io/trino/testing/PlanTester.java index 99d6f8266a14..d07b00b448ea 100644 --- a/core/trino-main/src/main/java/io/trino/testing/PlanTester.java +++ b/core/trino-main/src/main/java/io/trino/testing/PlanTester.java @@ -60,6 +60,7 @@ import io.trino.cost.TaskCountEstimator; import io.trino.eventlistener.EventListenerConfig; import io.trino.eventlistener.EventListenerManager; +import io.trino.exchange.ExchangeManagerConfig; import io.trino.exchange.ExchangeManagerRegistry; import io.trino.execution.DynamicFilterConfig; import io.trino.execution.NodeTaskMap; @@ -473,7 +474,7 @@ private PlanTester(Session defaultSession, int nodeCountForStats) ImmutableSet.of(), ImmutableSet.of(new ExcludeColumnsFunction())); - exchangeManagerRegistry = new ExchangeManagerRegistry(noop(), noopTracer(), secretsResolver); + exchangeManagerRegistry = new ExchangeManagerRegistry(noop(), noopTracer(), secretsResolver, new ExchangeManagerConfig()); spoolingManagerRegistry = new SpoolingManagerRegistry( new InternalNode("nodeId", URI.create("http://localhost:8080"), NodeVersion.UNKNOWN, false), new ServerConfig(), diff --git a/core/trino-main/src/test/java/io/trino/exchange/TestExchangeConfig.java b/core/trino-main/src/test/java/io/trino/exchange/TestExchangeConfig.java new file mode 100644 index 000000000000..70f83fe0e45b --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/exchange/TestExchangeConfig.java @@ -0,0 +1,50 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.exchange; + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; + +final class TestExchangeConfig +{ + @Test + void testDefaults() + { + assertRecordedDefaults(recordDefaults(ExchangeManagerConfig.class) + .setExchangeManagerConfigFile(null)); + } + + @Test + void testExplicitPropertyMappings() + throws IOException + { + Path exchangeConfig = Files.createTempFile(null, null); + + Map properties = ImmutableMap.of("exchange-manager.config-file", exchangeConfig.toString()); + + ExchangeManagerConfig expected = new ExchangeManagerConfig() + .setExchangeManagerConfigFile(exchangeConfig.toFile()); + + assertFullMapping(properties, expected); + } +} diff --git a/core/trino-main/src/test/java/io/trino/exchange/TestLazyExchangeDataSource.java b/core/trino-main/src/test/java/io/trino/exchange/TestLazyExchangeDataSource.java index 39a8823e7c5b..cfd8fa3099c8 100644 --- a/core/trino-main/src/test/java/io/trino/exchange/TestLazyExchangeDataSource.java +++ b/core/trino-main/src/test/java/io/trino/exchange/TestLazyExchangeDataSource.java @@ -45,7 +45,7 @@ public void testIsBlockedCancellationIsolationInInitializationPhase() throw new UnsupportedOperationException(); }, RetryPolicy.NONE, - new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())))) { + new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()))) { ListenableFuture first = source.isBlocked(); ListenableFuture second = source.isBlocked(); assertThat(first) diff --git a/core/trino-main/src/test/java/io/trino/execution/BaseTestSqlTaskManager.java b/core/trino-main/src/test/java/io/trino/execution/BaseTestSqlTaskManager.java index 6326556b3519..dc7fc5208bc9 100644 --- a/core/trino-main/src/test/java/io/trino/execution/BaseTestSqlTaskManager.java +++ b/core/trino-main/src/test/java/io/trino/execution/BaseTestSqlTaskManager.java @@ -29,6 +29,7 @@ import io.trino.connector.CatalogHandle; import io.trino.connector.ConnectorServices; import io.trino.connector.ConnectorServicesProvider; +import io.trino.exchange.ExchangeManagerConfig; import io.trino.exchange.ExchangeManagerRegistry; import io.trino.execution.buffer.BufferResult; import io.trino.execution.buffer.BufferState; @@ -337,7 +338,7 @@ private SqlTaskManager createSqlTaskManager(TaskManagerConfig taskManagerConfig, new NodeSpillConfig(), new TestingGcMonitor(), noopTracer(), - new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()))); + new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig())); } private TaskInfo createTask(SqlTaskManager sqlTaskManager, TaskId taskId, Set splits, OutputBuffers outputBuffers) diff --git a/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java b/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java index 017cccf5bbc6..c5f40c3449ea 100644 --- a/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java +++ b/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java @@ -31,6 +31,7 @@ import io.opentelemetry.api.trace.Span; import io.trino.Session; import io.trino.cost.StatsAndCosts; +import io.trino.exchange.ExchangeManagerConfig; import io.trino.exchange.ExchangeManagerRegistry; import io.trino.execution.NodeTaskMap.PartitionedSplitCountTracker; import io.trino.execution.buffer.LazyOutputBuffer; @@ -235,7 +236,7 @@ public MockRemoteTask( DataSize.ofBytes(1), () -> new SimpleLocalMemoryContext(newSimpleAggregatedMemoryContext(), "test"), () -> {}, - new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()))); + new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig())); this.fragment = requireNonNull(fragment, "fragment is null"); this.nodeId = requireNonNull(nodeId, "nodeId is null"); diff --git a/core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java b/core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java index 481f408c4bf3..be09531acc26 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java +++ b/core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java @@ -22,6 +22,7 @@ import io.trino.connector.CatalogHandle; import io.trino.connector.CatalogServiceProvider; import io.trino.cost.StatsAndCosts; +import io.trino.exchange.ExchangeManagerConfig; import io.trino.exchange.ExchangeManagerRegistry; import io.trino.execution.BaseTestSqlTaskManager.MockDirectExchangeClientSupplier; import io.trino.execution.buffer.OutputBuffers; @@ -180,7 +181,7 @@ public static LocalExecutionPlanner createTestingPlanner() blockTypeOperators, PLANNER_CONTEXT.getTypeOperators(), new TableExecuteContextManager(), - new ExchangeManagerRegistry(noop(), noopTracer(), new SecretsResolver(ImmutableMap.of())), + new ExchangeManagerRegistry(noop(), noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()), new NodeVersion("test"), new CompilerConfig()); } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestMemoryRevokingScheduler.java b/core/trino-main/src/test/java/io/trino/execution/TestMemoryRevokingScheduler.java index 883f337e857a..0b3f81b5e4f2 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestMemoryRevokingScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestMemoryRevokingScheduler.java @@ -25,6 +25,7 @@ import io.airlift.tracing.Tracing; import io.airlift.units.DataSize; import io.opentelemetry.api.OpenTelemetry; +import io.trino.exchange.ExchangeManagerConfig; import io.trino.exchange.ExchangeManagerRegistry; import io.trino.execution.buffer.PipelinedOutputBuffers; import io.trino.execution.executor.TaskExecutor; @@ -282,7 +283,7 @@ private SqlTask newSqlTask(QueryId queryId) sqlTask -> {}, DataSize.of(32, MEGABYTE), DataSize.of(200, MEGABYTE), - new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())), + new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()), new CounterStat()); } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java index 793c0f3f36ab..0fb3248bf145 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java @@ -27,6 +27,7 @@ import io.airlift.units.Duration; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Span; +import io.trino.exchange.ExchangeManagerConfig; import io.trino.exchange.ExchangeManagerRegistry; import io.trino.execution.buffer.BufferResult; import io.trino.execution.buffer.BufferState; @@ -454,7 +455,7 @@ private SqlTask createInitialTask() sqlTask -> {}, DataSize.of(32, MEGABYTE), DataSize.of(200, MEGABYTE), - new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())), + new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()), new CounterStat()); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManagerRaceWithCatalogPrune.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManagerRaceWithCatalogPrune.java index 012c4798eb96..952582a5904d 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManagerRaceWithCatalogPrune.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManagerRaceWithCatalogPrune.java @@ -36,6 +36,7 @@ import io.trino.connector.MockConnectorFactory; import io.trino.connector.TestingLocalCatalogPruneTask; import io.trino.connector.WorkerDynamicCatalogManager; +import io.trino.exchange.ExchangeManagerConfig; import io.trino.exchange.ExchangeManagerRegistry; import io.trino.execution.buffer.PipelinedOutputBuffers; import io.trino.execution.executor.RunningSplitInfo; @@ -270,7 +271,7 @@ private static SqlTaskManager getWorkerTaskManagerWithConnectorServiceProvider(C new NodeSpillConfig(), new TestingGcMonitor(), noopTracer(), - new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())), + new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()), ignore -> true); } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestTaskExecutorStuckSplits.java b/core/trino-main/src/test/java/io/trino/execution/TestTaskExecutorStuckSplits.java index 585bb9c33fdd..65d3e752926d 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestTaskExecutorStuckSplits.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestTaskExecutorStuckSplits.java @@ -30,6 +30,7 @@ import io.trino.connector.CatalogHandle; import io.trino.connector.ConnectorServices; import io.trino.connector.ConnectorServicesProvider; +import io.trino.exchange.ExchangeManagerConfig; import io.trino.exchange.ExchangeManagerRegistry; import io.trino.execution.executor.TaskExecutor; import io.trino.execution.executor.TaskHandle; @@ -138,7 +139,7 @@ private SqlTaskManager createSqlTaskManager( new NodeSpillConfig(), new TestingGcMonitor(), noopTracer(), - new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())), + new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()), stuckSplitStackTracePredicate); } diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java b/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java index f0fa2746fa61..f18e8edb9b40 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java @@ -26,6 +26,7 @@ import io.airlift.units.DataSize; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Span; +import io.trino.exchange.ExchangeManagerConfig; import io.trino.exchange.ExchangeManagerRegistry; import io.trino.execution.StageId; import io.trino.execution.TaskId; @@ -68,7 +69,7 @@ public class TestDeduplicatingDirectExchangeBuffer @BeforeAll public void beforeClass() { - exchangeManagerRegistry = new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())); + exchangeManagerRegistry = new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()); exchangeManagerRegistry.addExchangeManagerFactory(new FileSystemExchangeManagerFactory()); exchangeManagerRegistry.loadExchangeManager("filesystem", ImmutableMap.of( "exchange.base-directories", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager")); @@ -449,7 +450,7 @@ public void testExchangeManagerNotConfigured() directExecutor(), DataSize.of(100, BYTE), RetryPolicy.QUERY, - new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())), + new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()), new QueryId("query"), Span.getInvalid(), createRandomExchangeId())) { @@ -473,7 +474,7 @@ public void testExchangeManagerNotConfigured() directExecutor(), DataSize.of(100, BYTE), RetryPolicy.QUERY, - new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())), + new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()), new QueryId("query"), Span.getInvalid(), createRandomExchangeId())) { diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDirectExchangeClient.java b/core/trino-main/src/test/java/io/trino/operator/TestDirectExchangeClient.java index bd818f42136c..902154b74010 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDirectExchangeClient.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDirectExchangeClient.java @@ -34,6 +34,7 @@ import io.opentelemetry.api.trace.Span; import io.trino.FeaturesConfig.DataIntegrityVerification; import io.trino.block.BlockAssertions; +import io.trino.exchange.ExchangeManagerConfig; import io.trino.exchange.ExchangeManagerRegistry; import io.trino.execution.StageId; import io.trino.execution.TaskId; @@ -492,7 +493,7 @@ public void testDeduplicationTaskFailure() scheduler, DataSize.of(1, Unit.MEGABYTE), RetryPolicy.QUERY, - new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())), + new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()), new QueryId("query"), Span.getInvalid(), createRandomExchangeId()); @@ -553,7 +554,7 @@ public void testDeduplication() scheduler, DataSize.of(1, Unit.KILOBYTE), RetryPolicy.QUERY, - new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())), + new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()), new QueryId("query"), Span.getInvalid(), createRandomExchangeId()), diff --git a/core/trino-main/src/test/java/io/trino/operator/TestExchangeOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestExchangeOperator.java index f0b1f1d24796..b875542800b7 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestExchangeOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestExchangeOperator.java @@ -27,6 +27,7 @@ import io.opentelemetry.api.OpenTelemetry; import io.trino.FeaturesConfig.DataIntegrityVerification; import io.trino.exchange.DirectExchangeInput; +import io.trino.exchange.ExchangeManagerConfig; import io.trino.exchange.ExchangeManagerRegistry; import io.trino.execution.StageId; import io.trino.execution.TaskId; @@ -270,7 +271,7 @@ private SourceOperator createExchangeOperator() directExchangeClientSupplier, SERDE_FACTORY, RetryPolicy.NONE, - new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()))); + new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig())); DriverContext driverContext = createTaskContext(scheduler, scheduledExecutor, TEST_SESSION) .addPipelineContext(0, true, true, false) diff --git a/core/trino-main/src/test/java/io/trino/operator/TestMergeOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestMergeOperator.java index 5a5d8a922ea2..401caa0422f7 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestMergeOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestMergeOperator.java @@ -27,6 +27,7 @@ import io.opentelemetry.api.OpenTelemetry; import io.trino.FeaturesConfig; import io.trino.exchange.DirectExchangeInput; +import io.trino.exchange.ExchangeManagerConfig; import io.trino.exchange.ExchangeManagerRegistry; import io.trino.execution.StageId; import io.trino.execution.TaskId; @@ -101,7 +102,7 @@ public void setUp() httpClient, new HttpClientConfig(), executor, - new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()))); + new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig())); orderingCompiler = new OrderingCompiler(new TypeOperators()); } diff --git a/docs/src/main/sphinx/admin/fault-tolerant-execution.md b/docs/src/main/sphinx/admin/fault-tolerant-execution.md index 4f80e97763e5..0a596c2dab5f 100644 --- a/docs/src/main/sphinx/admin/fault-tolerant-execution.md +++ b/docs/src/main/sphinx/admin/fault-tolerant-execution.md @@ -392,6 +392,11 @@ all worker nodes. In this file, set the `exchange-manager.name` configuration property to `filesystem` or `hdfs`, and set additional configuration properties as needed for your storage solution. +You can also specify the location of the exchange manager configuration file +in `config.properties` with the `exchange-manager.config-file` property. +When this property is set, Trino loads the exchange manager configuration +from the specified path instead of the default `etc/exchange-manager.properties`. + The following table lists the available configuration properties for `exchange-manager.properties`, their default values, and which file systems the property may be configured for: