diff --git a/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java index f5adf59cca75b..de115c56da024 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -340,6 +340,7 @@ public final class SystemSessionProperties public static final String WARN_ON_COMMON_NAN_PATTERNS = "warn_on_common_nan_patterns"; public static final String INLINE_PROJECTIONS_ON_VALUES = "inline_projections_on_values"; public static final String INCLUDE_VALUES_NODE_IN_CONNECTOR_OPTIMIZER = "include_values_node_in_connector_optimizer"; + public static final String ENABLE_EMPTY_CONNECTOR_OPTIMIZER = "enable_empty_connector_optimizer"; public static final String SINGLE_NODE_EXECUTION_ENABLED = "single_node_execution_enabled"; public static final String BROADCAST_SEMI_JOIN_FOR_DELETE = "broadcast_semi_join_for_delete"; public static final String EXPRESSION_OPTIMIZER_NAME = "expression_optimizer_name"; @@ -1961,6 +1962,10 @@ public SystemSessionProperties( "Include values node for connector optimizer", featuresConfig.isIncludeValuesNodeInConnectorOptimizer(), false), + booleanProperty(ENABLE_EMPTY_CONNECTOR_OPTIMIZER, + "Run optimizers which optimize queries with values node", + false, + false), booleanProperty( INNER_JOIN_PUSHDOWN_ENABLED, "Enable Join Predicate Pushdown", @@ -3383,6 +3388,11 @@ public static boolean isIncludeValuesNodeInConnectorOptimizer(Session session) return session.getSystemProperty(INCLUDE_VALUES_NODE_IN_CONNECTOR_OPTIMIZER, Boolean.class); } + public static boolean isEmptyConnectorOptimizerEnabled(Session session) + { + return session.getSystemProperty(ENABLE_EMPTY_CONNECTOR_OPTIMIZER, Boolean.class); + } + public static Boolean isInnerJoinPushdownEnabled(Session session) { return session.getSystemProperty(INNER_JOIN_PUSHDOWN_ENABLED, Boolean.class); diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/ApplyConnectorOptimization.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/ApplyConnectorOptimization.java index 8b55d3e9330e8..87f00531409d9 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/ApplyConnectorOptimization.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/ApplyConnectorOptimization.java @@ -63,6 +63,7 @@ import java.util.Queue; import java.util.Set; +import static com.facebook.presto.SystemSessionProperties.isEmptyConnectorOptimizerEnabled; import static com.facebook.presto.SystemSessionProperties.isIncludeValuesNodeInConnectorOptimizer; import static com.facebook.presto.common.RuntimeUnit.NANO; import static com.facebook.presto.sql.OptimizerRuntimeTrackUtil.getOptimizerNameForLog; @@ -103,7 +104,7 @@ public class ApplyConnectorOptimization DeleteNode.class); // for a leaf node that does not belong to any connector (e.g., ValuesNode) - private static final ConnectorId EMPTY_CONNECTOR_ID = new ConnectorId("$internal$" + ApplyConnectorOptimization.class + "_CONNECTOR"); + private static final ConnectorId EMPTY_CONNECTOR_ID = new ConnectorId("$internal$ApplyConnectorOptimization_EMPTY_CONNECTOR"); private final Supplier>> connectorOptimizersSupplier; @@ -130,6 +131,7 @@ public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider // retrieve all the connectors ImmutableSet.Builder connectorIds = ImmutableSet.builder(); getAllConnectorIds(plan, connectorIds); + Set connectorIdSet = connectorIds.build(); // for each connector, retrieve the set of subplans to optimize // TODO: what if a new connector is added by an existing one @@ -137,8 +139,20 @@ public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider // create a UNION_ALL to federate data sources from both C1 and C2 (regardless of the classloader issue). // For such case, it is dangerous to re-calculate the "max closure" given the fixpoint property will be broken. // In order to preserve the fixpoint, we will "pretend" the newly added C2 table scan is part of C1's job to maintain. - for (ConnectorId connectorId : connectorIds.build()) { - Set optimizers = connectorOptimizers.get(connectorId); + for (ConnectorId connectorId : connectorIdSet) { + Set optimizers; + if (isEmptyConnectorOptimizerEnabled(session) && connectorIdSet.stream() + .allMatch(x -> x.equals(EMPTY_CONNECTOR_ID)) && session.getCatalog().isPresent()) { + ConnectorId queryConnectorId = new ConnectorId(session.getCatalog().get()); + optimizers = connectorOptimizers.get(queryConnectorId) == null ? null + : connectorOptimizers.get(queryConnectorId).stream() + .filter(x -> x.getSupportedConnectorIds().size() == 1 + && x.getSupportedConnectorIds().get(0).equals(EMPTY_CONNECTOR_ID)) + .collect( + toImmutableSet()); + } else { + optimizers = connectorOptimizers.get(connectorId); + } if (optimizers == null || optimizers.isEmpty()) { continue; } @@ -188,6 +202,9 @@ public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider for (ConnectorPlanOptimizer optimizer : entry.getValue()) { long start = System.nanoTime(); ConnectorSession connectorSession = session.toConnectorSession(connectorId); + if (isEmptyConnectorOptimizerEnabled(session) && connectorId.equals(EMPTY_CONNECTOR_ID) && session.getCatalog().isPresent()) { + connectorSession = session.toConnectorSession(new ConnectorId(session.getCatalog().get())); + } checkState(connectorSession.getConnectorId().isPresent()); newNode = optimizer.optimize(newNode, connectorSession, variableAllocator, idAllocator); if (enableVerboseRuntimeStats || trackOptimizerRuntime(session, optimizer)) { @@ -337,6 +354,9 @@ public Set> getReachablePlanNodeTypes() boolean isClosure(ConnectorId connectorId, Session session, List supportedConnectorId) { + if (isEmptyConnectorOptimizerEnabled(session) && reachableConnectors.stream().allMatch(x -> x.equals(EMPTY_CONNECTOR_ID)) && supportedConnectorId.size() == 1 && supportedConnectorId.get(0).equals(EMPTY_CONNECTOR_ID)) { + return containsAll(CONNECTOR_ACCESSIBLE_PLAN_NODES, reachablePlanNodeTypes); + } // check if all children can reach the only connector boolean includeValuesNode = isIncludeValuesNodeInConnectorOptimizer(session); Set connectorIds = includeValuesNode ? reachableConnectors.stream().filter(x -> !x.equals(EMPTY_CONNECTOR_ID)).collect(toImmutableSet()) : reachableConnectors; diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/optimizations/TestConnectorOptimization.java b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/optimizations/TestConnectorOptimization.java index 2aea73d977986..1f6a7f46d539a 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/optimizations/TestConnectorOptimization.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/optimizations/TestConnectorOptimization.java @@ -60,6 +60,7 @@ import java.util.Set; import static com.facebook.presto.SessionTestUtils.TEST_SESSION; +import static com.facebook.presto.SystemSessionProperties.ENABLE_EMPTY_CONNECTOR_OPTIMIZER; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; import static com.facebook.presto.expressions.LogicalRowExpressions.and; @@ -223,6 +224,107 @@ public void testAddFilterToTableScan() TypeProvider.viewOf(ImmutableMap.of("a", BIGINT, "b", BIGINT))); } + @Test + public void testEmptyConnectorOptimization() + { + PlanNode plan = output(values("a", "b"), "a"); + ConnectorId emptyConnectorId = new ConnectorId("$internal$ApplyConnectorOptimization_EMPTY_CONNECTOR"); + ConnectorPlanOptimizer emptyConnectorOptimizer = createEmptyConnectorOptimizer(emptyConnectorId); + Session session = Session.builder(TEST_SESSION).setSystemProperty(ENABLE_EMPTY_CONNECTOR_OPTIMIZER, "true").build(); + + PlanNode actual = optimize(plan, ImmutableMap.of( + new ConnectorId("tpch"), ImmutableSet.of(emptyConnectorOptimizer)), session); + + assertPlanMatch( + actual, + PlanMatchPattern.output( + PlanMatchPattern.filter( + "true", + PlanMatchPattern.values("a", "b")))); + + plan = output( + union( + values("a", "b"), + values("a", "b"), + values("a", "b")), + "a"); + + actual = optimize(plan, ImmutableMap.of( + new ConnectorId("tpch"), ImmutableSet.of(emptyConnectorOptimizer)), session); + + assertPlanMatch( + actual, + PlanMatchPattern.output( + PlanMatchPattern.filter( + "true", + PlanMatchPattern.union( + PlanMatchPattern.values("a", "b"), + PlanMatchPattern.values("a", "b"), + PlanMatchPattern.values("a", "b"))))); + + plan = output( + union( + values("a", "b"), + tableScan("cat1", "a", "b")), + "a"); + + actual = optimize(plan, ImmutableMap.of( + new ConnectorId("tpch"), ImmutableSet.of(emptyConnectorOptimizer), + new ConnectorId("cat1"), ImmutableSet.of(noop())), session); + + assertEquals(actual, plan); + + plan = output( + filter(values("a", "b"), TRUE_CONSTANT), + "a"); + + actual = optimize(plan, ImmutableMap.of( + new ConnectorId("tpch"), ImmutableSet.of(emptyConnectorOptimizer)), session); + + assertPlanMatch( + actual, + PlanMatchPattern.output( + PlanMatchPattern.filter( + "true", + PlanMatchPattern.filter( + "true", + PlanMatchPattern.values("a", "b"))))); + + plan = output( + union( + filter(values("a", "b"), TRUE_CONSTANT), + union( + values("a", "b"), + filter(values("a", "b"), TRUE_CONSTANT))), + "a"); + + actual = optimize(plan, ImmutableMap.of( + new ConnectorId("tpch"), ImmutableSet.of(emptyConnectorOptimizer)), session); + + assertPlanMatch( + actual, + PlanMatchPattern.output( + PlanMatchPattern.filter( + "true", + PlanMatchPattern.union( + PlanMatchPattern.filter( + "true", + PlanMatchPattern.values("a", "b")), + PlanMatchPattern.union( + PlanMatchPattern.values("a", "b"), + PlanMatchPattern.filter( + "true", + PlanMatchPattern.values("a", "b"))))))); + + plan = output(tableScan("cat1", "a", "b"), "a"); + + actual = optimize(plan, ImmutableMap.of( + new ConnectorId("tpch"), ImmutableSet.of(emptyConnectorOptimizer), + new ConnectorId("cat1"), ImmutableSet.of(noop())), session); + + assertEquals(actual, plan); + } + @Test public void testMultipleConnectorOptimization() { @@ -527,6 +629,12 @@ private static PlanNode optimize(PlanNode plan, Map> optimizers, Session session) + { + ApplyConnectorOptimization optimizer = new ApplyConnectorOptimization(() -> optimizers); + return optimizer.optimize(plan, session, TypeProvider.empty(), new VariableAllocator(), new PlanNodeIdAllocator(), WarningCollector.NOOP).getPlanNode(); + } + private static ConnectorPlanOptimizer filterPushdown() { return (maxSubplan, session, variableAllocator, idAllocator) -> maxSubplan.accept(new TestFilterPushdownVisitor(), null); @@ -578,6 +686,24 @@ public java.util.List getSupportedConnectorIds() }; } + private static ConnectorPlanOptimizer createEmptyConnectorOptimizer(ConnectorId emptyConnectorId) + { + return new ConnectorPlanOptimizer() + { + @Override + public PlanNode optimize(PlanNode maxSubplan, com.facebook.presto.spi.ConnectorSession session, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator) + { + return new FilterNode(Optional.empty(), idAllocator.getNextId(), maxSubplan, TRUE_CONSTANT); + } + + @Override + public java.util.List getSupportedConnectorIds() + { + return ImmutableList.of(emptyConnectorId); + } + }; + } + private static class TestPlanOptimizationVisitor extends PlanVisitor {