Skip to content
Draft

Draft #26699

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ public final class SystemSessionProperties
public static final String WARN_ON_COMMON_NAN_PATTERNS = "warn_on_common_nan_patterns";
public static final String INLINE_PROJECTIONS_ON_VALUES = "inline_projections_on_values";
public static final String INCLUDE_VALUES_NODE_IN_CONNECTOR_OPTIMIZER = "include_values_node_in_connector_optimizer";
public static final String ENABLE_EMPTY_CONNECTOR_OPTIMIZER = "enable_empty_connector_optimizer";
public static final String SINGLE_NODE_EXECUTION_ENABLED = "single_node_execution_enabled";
public static final String BROADCAST_SEMI_JOIN_FOR_DELETE = "broadcast_semi_join_for_delete";
public static final String EXPRESSION_OPTIMIZER_NAME = "expression_optimizer_name";
Expand Down Expand Up @@ -1961,6 +1962,10 @@ public SystemSessionProperties(
"Include values node for connector optimizer",
featuresConfig.isIncludeValuesNodeInConnectorOptimizer(),
false),
booleanProperty(ENABLE_EMPTY_CONNECTOR_OPTIMIZER,
"Run optimizers which optimize queries with values node",
false,
false),
booleanProperty(
INNER_JOIN_PUSHDOWN_ENABLED,
"Enable Join Predicate Pushdown",
Expand Down Expand Up @@ -3383,6 +3388,11 @@ public static boolean isIncludeValuesNodeInConnectorOptimizer(Session session)
return session.getSystemProperty(INCLUDE_VALUES_NODE_IN_CONNECTOR_OPTIMIZER, Boolean.class);
}

public static boolean isEmptyConnectorOptimizerEnabled(Session session)
{
return session.getSystemProperty(ENABLE_EMPTY_CONNECTOR_OPTIMIZER, Boolean.class);
}

public static Boolean isInnerJoinPushdownEnabled(Session session)
{
return session.getSystemProperty(INNER_JOIN_PUSHDOWN_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.Queue;
import java.util.Set;

import static com.facebook.presto.SystemSessionProperties.isEmptyConnectorOptimizerEnabled;
import static com.facebook.presto.SystemSessionProperties.isIncludeValuesNodeInConnectorOptimizer;
import static com.facebook.presto.common.RuntimeUnit.NANO;
import static com.facebook.presto.sql.OptimizerRuntimeTrackUtil.getOptimizerNameForLog;
Expand Down Expand Up @@ -103,7 +104,7 @@ public class ApplyConnectorOptimization
DeleteNode.class);

// for a leaf node that does not belong to any connector (e.g., ValuesNode)
private static final ConnectorId EMPTY_CONNECTOR_ID = new ConnectorId("$internal$" + ApplyConnectorOptimization.class + "_CONNECTOR");
private static final ConnectorId EMPTY_CONNECTOR_ID = new ConnectorId("$internal$ApplyConnectorOptimization_EMPTY_CONNECTOR");

private final Supplier<Map<ConnectorId, Set<ConnectorPlanOptimizer>>> connectorOptimizersSupplier;

Expand All @@ -130,15 +131,28 @@ public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider
// retrieve all the connectors
ImmutableSet.Builder<ConnectorId> connectorIds = ImmutableSet.builder();
getAllConnectorIds(plan, connectorIds);
Set<ConnectorId> connectorIdSet = connectorIds.build();

// for each connector, retrieve the set of subplans to optimize
// TODO: what if a new connector is added by an existing one
// There are cases (e.g., query federation) where a connector C1 needs to
// create a UNION_ALL to federate data sources from both C1 and C2 (regardless of the classloader issue).
// For such case, it is dangerous to re-calculate the "max closure" given the fixpoint property will be broken.
// In order to preserve the fixpoint, we will "pretend" the newly added C2 table scan is part of C1's job to maintain.
for (ConnectorId connectorId : connectorIds.build()) {
Set<ConnectorPlanOptimizer> optimizers = connectorOptimizers.get(connectorId);
for (ConnectorId connectorId : connectorIdSet) {
Set<ConnectorPlanOptimizer> optimizers;
if (isEmptyConnectorOptimizerEnabled(session) && connectorIdSet.stream()
.allMatch(x -> x.equals(EMPTY_CONNECTOR_ID)) && session.getCatalog().isPresent()) {
ConnectorId queryConnectorId = new ConnectorId(session.getCatalog().get());
optimizers = connectorOptimizers.get(queryConnectorId) == null ? null
: connectorOptimizers.get(queryConnectorId).stream()
.filter(x -> x.getSupportedConnectorIds().size() == 1
&& x.getSupportedConnectorIds().get(0).equals(EMPTY_CONNECTOR_ID))
.collect(
toImmutableSet());
} else {
optimizers = connectorOptimizers.get(connectorId);
}
if (optimizers == null || optimizers.isEmpty()) {
continue;
}
Expand Down Expand Up @@ -188,6 +202,9 @@ public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider
for (ConnectorPlanOptimizer optimizer : entry.getValue()) {
long start = System.nanoTime();
ConnectorSession connectorSession = session.toConnectorSession(connectorId);
if (isEmptyConnectorOptimizerEnabled(session) && connectorId.equals(EMPTY_CONNECTOR_ID) && session.getCatalog().isPresent()) {
connectorSession = session.toConnectorSession(new ConnectorId(session.getCatalog().get()));
}
checkState(connectorSession.getConnectorId().isPresent());
newNode = optimizer.optimize(newNode, connectorSession, variableAllocator, idAllocator);
if (enableVerboseRuntimeStats || trackOptimizerRuntime(session, optimizer)) {
Expand Down Expand Up @@ -337,6 +354,9 @@ public Set<Class<? extends PlanNode>> getReachablePlanNodeTypes()

boolean isClosure(ConnectorId connectorId, Session session, List<ConnectorId> supportedConnectorId)
{
if (isEmptyConnectorOptimizerEnabled(session) && reachableConnectors.stream().allMatch(x -> x.equals(EMPTY_CONNECTOR_ID)) && supportedConnectorId.size() == 1 && supportedConnectorId.get(0).equals(EMPTY_CONNECTOR_ID)) {
return containsAll(CONNECTOR_ACCESSIBLE_PLAN_NODES, reachablePlanNodeTypes);
}
// check if all children can reach the only connector
boolean includeValuesNode = isIncludeValuesNodeInConnectorOptimizer(session);
Set<ConnectorId> connectorIds = includeValuesNode ? reachableConnectors.stream().filter(x -> !x.equals(EMPTY_CONNECTOR_ID)).collect(toImmutableSet()) : reachableConnectors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.Set;

import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
import static com.facebook.presto.SystemSessionProperties.ENABLE_EMPTY_CONNECTOR_OPTIMIZER;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.expressions.LogicalRowExpressions.and;
Expand Down Expand Up @@ -223,6 +224,107 @@ public void testAddFilterToTableScan()
TypeProvider.viewOf(ImmutableMap.of("a", BIGINT, "b", BIGINT)));
}

@Test
public void testEmptyConnectorOptimization()
{
PlanNode plan = output(values("a", "b"), "a");
ConnectorId emptyConnectorId = new ConnectorId("$internal$ApplyConnectorOptimization_EMPTY_CONNECTOR");
ConnectorPlanOptimizer emptyConnectorOptimizer = createEmptyConnectorOptimizer(emptyConnectorId);
Session session = Session.builder(TEST_SESSION).setSystemProperty(ENABLE_EMPTY_CONNECTOR_OPTIMIZER, "true").build();

PlanNode actual = optimize(plan, ImmutableMap.of(
new ConnectorId("tpch"), ImmutableSet.of(emptyConnectorOptimizer)), session);

assertPlanMatch(
actual,
PlanMatchPattern.output(
PlanMatchPattern.filter(
"true",
PlanMatchPattern.values("a", "b"))));

plan = output(
union(
values("a", "b"),
values("a", "b"),
values("a", "b")),
"a");

actual = optimize(plan, ImmutableMap.of(
new ConnectorId("tpch"), ImmutableSet.of(emptyConnectorOptimizer)), session);

assertPlanMatch(
actual,
PlanMatchPattern.output(
PlanMatchPattern.filter(
"true",
PlanMatchPattern.union(
PlanMatchPattern.values("a", "b"),
PlanMatchPattern.values("a", "b"),
PlanMatchPattern.values("a", "b")))));

plan = output(
union(
values("a", "b"),
tableScan("cat1", "a", "b")),
"a");

actual = optimize(plan, ImmutableMap.of(
new ConnectorId("tpch"), ImmutableSet.of(emptyConnectorOptimizer),
new ConnectorId("cat1"), ImmutableSet.of(noop())), session);

assertEquals(actual, plan);

plan = output(
filter(values("a", "b"), TRUE_CONSTANT),
"a");

actual = optimize(plan, ImmutableMap.of(
new ConnectorId("tpch"), ImmutableSet.of(emptyConnectorOptimizer)), session);

assertPlanMatch(
actual,
PlanMatchPattern.output(
PlanMatchPattern.filter(
"true",
PlanMatchPattern.filter(
"true",
PlanMatchPattern.values("a", "b")))));

plan = output(
union(
filter(values("a", "b"), TRUE_CONSTANT),
union(
values("a", "b"),
filter(values("a", "b"), TRUE_CONSTANT))),
"a");

actual = optimize(plan, ImmutableMap.of(
new ConnectorId("tpch"), ImmutableSet.of(emptyConnectorOptimizer)), session);

assertPlanMatch(
actual,
PlanMatchPattern.output(
PlanMatchPattern.filter(
"true",
PlanMatchPattern.union(
PlanMatchPattern.filter(
"true",
PlanMatchPattern.values("a", "b")),
PlanMatchPattern.union(
PlanMatchPattern.values("a", "b"),
PlanMatchPattern.filter(
"true",
PlanMatchPattern.values("a", "b")))))));

plan = output(tableScan("cat1", "a", "b"), "a");

actual = optimize(plan, ImmutableMap.of(
new ConnectorId("tpch"), ImmutableSet.of(emptyConnectorOptimizer),
new ConnectorId("cat1"), ImmutableSet.of(noop())), session);

assertEquals(actual, plan);
}

@Test
public void testMultipleConnectorOptimization()
{
Expand Down Expand Up @@ -527,6 +629,12 @@ private static PlanNode optimize(PlanNode plan, Map<ConnectorId, Set<ConnectorPl
return optimizer.optimize(plan, TEST_SESSION, TypeProvider.empty(), new VariableAllocator(), new PlanNodeIdAllocator(), WarningCollector.NOOP).getPlanNode();
}

private static PlanNode optimize(PlanNode plan, Map<ConnectorId, Set<ConnectorPlanOptimizer>> optimizers, Session session)
{
ApplyConnectorOptimization optimizer = new ApplyConnectorOptimization(() -> optimizers);
return optimizer.optimize(plan, session, TypeProvider.empty(), new VariableAllocator(), new PlanNodeIdAllocator(), WarningCollector.NOOP).getPlanNode();
}

private static ConnectorPlanOptimizer filterPushdown()
{
return (maxSubplan, session, variableAllocator, idAllocator) -> maxSubplan.accept(new TestFilterPushdownVisitor(), null);
Expand Down Expand Up @@ -578,6 +686,24 @@ public java.util.List<ConnectorId> getSupportedConnectorIds()
};
}

private static ConnectorPlanOptimizer createEmptyConnectorOptimizer(ConnectorId emptyConnectorId)
{
return new ConnectorPlanOptimizer()
{
@Override
public PlanNode optimize(PlanNode maxSubplan, com.facebook.presto.spi.ConnectorSession session, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator)
{
return new FilterNode(Optional.empty(), idAllocator.getNextId(), maxSubplan, TRUE_CONSTANT);
}

@Override
public java.util.List<ConnectorId> getSupportedConnectorIds()
{
return ImmutableList.of(emptyConnectorId);
}
};
}

private static class TestPlanOptimizationVisitor
extends PlanVisitor<PlanNode, Void>
{
Expand Down
Loading