Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.exchange;

import io.airlift.configuration.Config;
import io.airlift.configuration.validation.FileExists;

import java.io.File;
import java.util.Optional;

public class ExchangeManagerConfig
{
private Optional<File> exchangeManagerConfigFile = Optional.empty();

public Optional<@FileExists File> getExchangeManagerConfigFile()
{
return exchangeManagerConfigFile;
}

@Config("exchange-manager.config-file")
public ExchangeManagerConfig setExchangeManagerConfigFile(File exchangeManagerConfigFile)
{
this.exchangeManagerConfigFile = Optional.ofNullable(exchangeManagerConfigFile);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
import com.google.inject.Module;
import com.google.inject.Scopes;

import static io.airlift.configuration.ConfigBinder.configBinder;

public class ExchangeManagerModule
implements Module
{
@Override
public void configure(Binder binder)
{
configBinder(binder).bindConfig(ExchangeManagerConfig.class);
binder.bind(ExchangeManagerRegistry.class).in(Scopes.SINGLETON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -52,16 +53,19 @@ public class ExchangeManagerRegistry

private volatile ExchangeManager exchangeManager;
private final SecretsResolver secretsResolver;
private final Optional<File> configFile;

@Inject
public ExchangeManagerRegistry(
OpenTelemetry openTelemetry,
Tracer tracer,
SecretsResolver secretsResolver)
SecretsResolver secretsResolver,
ExchangeManagerConfig config)
{
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.tracer = requireNonNull(tracer, "tracer is null");
this.secretsResolver = requireNonNull(secretsResolver, "secretsResolver is null");
this.configFile = config.getExchangeManagerConfigFile();
}

public void addExchangeManagerFactory(ExchangeManagerFactory factory)
Expand All @@ -74,13 +78,14 @@ public void addExchangeManagerFactory(ExchangeManagerFactory factory)

public void loadExchangeManager()
{
if (!CONFIG_FILE.exists()) {
File configFile = this.configFile.orElse(CONFIG_FILE);
if (!configFile.exists()) {
return;
}

Map<String, String> properties = loadProperties(CONFIG_FILE);
Map<String, String> properties = loadProperties(configFile);
String name = properties.remove(EXCHANGE_MANAGER_NAME_PROPERTY);
checkArgument(!isNullOrEmpty(name), "Exchange manager configuration %s does not contain %s", CONFIG_FILE, EXCHANGE_MANAGER_NAME_PROPERTY);
checkArgument(!isNullOrEmpty(name), "Exchange manager configuration %s does not contain %s", configFile, EXCHANGE_MANAGER_NAME_PROPERTY);

loadExchangeManager(name, properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.trino.dispatcher.DispatchManager;
import io.trino.eventlistener.EventListenerConfig;
import io.trino.eventlistener.EventListenerManager;
import io.trino.exchange.ExchangeManagerConfig;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.FailureInjector;
import io.trino.execution.FailureInjector.InjectedFailureType;
Expand Down Expand Up @@ -312,6 +313,7 @@ private TestingTrinoServer(
.addBinding()
.to(TracingServletFilter.class);
binder.bind(EventListenerConfig.class).in(Scopes.SINGLETON);
binder.bind(ExchangeManagerConfig.class).in(Scopes.SINGLETON);
binder.bind(AccessControlConfig.class).in(Scopes.SINGLETON);
binder.bind(TestingAccessControlManager.class).in(Scopes.SINGLETON);
binder.bind(TestingGroupProvider.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import io.trino.cost.TaskCountEstimator;
import io.trino.eventlistener.EventListenerConfig;
import io.trino.eventlistener.EventListenerManager;
import io.trino.exchange.ExchangeManagerConfig;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.DynamicFilterConfig;
import io.trino.execution.NodeTaskMap;
Expand Down Expand Up @@ -473,7 +474,7 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
ImmutableSet.of(),
ImmutableSet.of(new ExcludeColumnsFunction()));

exchangeManagerRegistry = new ExchangeManagerRegistry(noop(), noopTracer(), secretsResolver);
exchangeManagerRegistry = new ExchangeManagerRegistry(noop(), noopTracer(), secretsResolver, new ExchangeManagerConfig());
spoolingManagerRegistry = new SpoolingManagerRegistry(
new InternalNode("nodeId", URI.create("http://localhost:8080"), NodeVersion.UNKNOWN, false),
new ServerConfig(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.exchange;

import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;

import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;

final class TestExchangeConfig
{
@Test
void testDefaults()
{
assertRecordedDefaults(recordDefaults(ExchangeManagerConfig.class)
.setExchangeManagerConfigFile(null));
}

@Test
void testExplicitPropertyMappings()
throws IOException
{
Path exchangeConfig = Files.createTempFile(null, null);

Map<String, String> properties = ImmutableMap.of("exchange-manager.config-file", exchangeConfig.toString());

ExchangeManagerConfig expected = new ExchangeManagerConfig()
.setExchangeManagerConfigFile(exchangeConfig.toFile());

assertFullMapping(properties, expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void testIsBlockedCancellationIsolationInInitializationPhase()
throw new UnsupportedOperationException();
},
RetryPolicy.NONE,
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())))) {
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()))) {
ListenableFuture<Void> first = source.isBlocked();
ListenableFuture<Void> second = source.isBlocked();
assertThat(first)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.connector.CatalogHandle;
import io.trino.connector.ConnectorServices;
import io.trino.connector.ConnectorServicesProvider;
import io.trino.exchange.ExchangeManagerConfig;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.buffer.BufferResult;
import io.trino.execution.buffer.BufferState;
Expand Down Expand Up @@ -337,7 +338,7 @@ private SqlTaskManager createSqlTaskManager(TaskManagerConfig taskManagerConfig,
new NodeSpillConfig(),
new TestingGcMonitor(),
noopTracer(),
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())));
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()));
}

private TaskInfo createTask(SqlTaskManager sqlTaskManager, TaskId taskId, Set<ScheduledSplit> splits, OutputBuffers outputBuffers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.opentelemetry.api.trace.Span;
import io.trino.Session;
import io.trino.cost.StatsAndCosts;
import io.trino.exchange.ExchangeManagerConfig;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.NodeTaskMap.PartitionedSplitCountTracker;
import io.trino.execution.buffer.LazyOutputBuffer;
Expand Down Expand Up @@ -235,7 +236,7 @@ public MockRemoteTask(
DataSize.ofBytes(1),
() -> new SimpleLocalMemoryContext(newSimpleAggregatedMemoryContext(), "test"),
() -> {},
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())));
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()));

this.fragment = requireNonNull(fragment, "fragment is null");
this.nodeId = requireNonNull(nodeId, "nodeId is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.connector.CatalogHandle;
import io.trino.connector.CatalogServiceProvider;
import io.trino.cost.StatsAndCosts;
import io.trino.exchange.ExchangeManagerConfig;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.BaseTestSqlTaskManager.MockDirectExchangeClientSupplier;
import io.trino.execution.buffer.OutputBuffers;
Expand Down Expand Up @@ -180,7 +181,7 @@ public static LocalExecutionPlanner createTestingPlanner()
blockTypeOperators,
PLANNER_CONTEXT.getTypeOperators(),
new TableExecuteContextManager(),
new ExchangeManagerRegistry(noop(), noopTracer(), new SecretsResolver(ImmutableMap.of())),
new ExchangeManagerRegistry(noop(), noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()),
new NodeVersion("test"),
new CompilerConfig());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.airlift.tracing.Tracing;
import io.airlift.units.DataSize;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.exchange.ExchangeManagerConfig;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.buffer.PipelinedOutputBuffers;
import io.trino.execution.executor.TaskExecutor;
Expand Down Expand Up @@ -282,7 +283,7 @@ private SqlTask newSqlTask(QueryId queryId)
sqlTask -> {},
DataSize.of(32, MEGABYTE),
DataSize.of(200, MEGABYTE),
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())),
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()),
new CounterStat());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.airlift.units.Duration;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.trino.exchange.ExchangeManagerConfig;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.buffer.BufferResult;
import io.trino.execution.buffer.BufferState;
Expand Down Expand Up @@ -454,7 +455,7 @@ private SqlTask createInitialTask()
sqlTask -> {},
DataSize.of(32, MEGABYTE),
DataSize.of(200, MEGABYTE),
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())),
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()),
new CounterStat());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.trino.connector.MockConnectorFactory;
import io.trino.connector.TestingLocalCatalogPruneTask;
import io.trino.connector.WorkerDynamicCatalogManager;
import io.trino.exchange.ExchangeManagerConfig;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.buffer.PipelinedOutputBuffers;
import io.trino.execution.executor.RunningSplitInfo;
Expand Down Expand Up @@ -270,7 +271,7 @@ private static SqlTaskManager getWorkerTaskManagerWithConnectorServiceProvider(C
new NodeSpillConfig(),
new TestingGcMonitor(),
noopTracer(),
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())),
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()),
ignore -> true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.connector.CatalogHandle;
import io.trino.connector.ConnectorServices;
import io.trino.connector.ConnectorServicesProvider;
import io.trino.exchange.ExchangeManagerConfig;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.executor.TaskExecutor;
import io.trino.execution.executor.TaskHandle;
Expand Down Expand Up @@ -138,7 +139,7 @@ private SqlTaskManager createSqlTaskManager(
new NodeSpillConfig(),
new TestingGcMonitor(),
noopTracer(),
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())),
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()),
stuckSplitStackTracePredicate);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.airlift.units.DataSize;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.trino.exchange.ExchangeManagerConfig;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
Expand Down Expand Up @@ -68,7 +69,7 @@ public class TestDeduplicatingDirectExchangeBuffer
@BeforeAll
public void beforeClass()
{
exchangeManagerRegistry = new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()));
exchangeManagerRegistry = new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig());
exchangeManagerRegistry.addExchangeManagerFactory(new FileSystemExchangeManagerFactory());
exchangeManagerRegistry.loadExchangeManager("filesystem", ImmutableMap.of(
"exchange.base-directories", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager"));
Expand Down Expand Up @@ -449,7 +450,7 @@ public void testExchangeManagerNotConfigured()
directExecutor(),
DataSize.of(100, BYTE),
RetryPolicy.QUERY,
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())),
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()),
new QueryId("query"),
Span.getInvalid(),
createRandomExchangeId())) {
Expand All @@ -473,7 +474,7 @@ public void testExchangeManagerNotConfigured()
directExecutor(),
DataSize.of(100, BYTE),
RetryPolicy.QUERY,
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())),
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()),
new QueryId("query"),
Span.getInvalid(),
createRandomExchangeId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.opentelemetry.api.trace.Span;
import io.trino.FeaturesConfig.DataIntegrityVerification;
import io.trino.block.BlockAssertions;
import io.trino.exchange.ExchangeManagerConfig;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
Expand Down Expand Up @@ -492,7 +493,7 @@ public void testDeduplicationTaskFailure()
scheduler,
DataSize.of(1, Unit.MEGABYTE),
RetryPolicy.QUERY,
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())),
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()),
new QueryId("query"),
Span.getInvalid(),
createRandomExchangeId());
Expand Down Expand Up @@ -553,7 +554,7 @@ public void testDeduplication()
scheduler,
DataSize.of(1, Unit.KILOBYTE),
RetryPolicy.QUERY,
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())),
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()),
new QueryId("query"),
Span.getInvalid(),
createRandomExchangeId()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.opentelemetry.api.OpenTelemetry;
import io.trino.FeaturesConfig.DataIntegrityVerification;
import io.trino.exchange.DirectExchangeInput;
import io.trino.exchange.ExchangeManagerConfig;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
Expand Down Expand Up @@ -270,7 +271,7 @@ private SourceOperator createExchangeOperator()
directExchangeClientSupplier,
SERDE_FACTORY,
RetryPolicy.NONE,
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of())));
new ExchangeManagerRegistry(OpenTelemetry.noop(), Tracing.noopTracer(), new SecretsResolver(ImmutableMap.of()), new ExchangeManagerConfig()));

DriverContext driverContext = createTaskContext(scheduler, scheduledExecutor, TEST_SESSION)
.addPipelineContext(0, true, true, false)
Expand Down
Loading
Loading