Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.calcite.rex.RexLambdaRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.RelBuilder;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;
import org.opensearch.sql.calcite.utils.CalciteToolsHelper.OpenSearchRelBuilder;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.executor.QueryType;
import org.opensearch.sql.expression.function.FunctionProperties;
Expand All @@ -32,7 +32,7 @@ public class CalcitePlanContext {

public FrameworkConfig config;
public final Connection connection;
public final RelBuilder relBuilder;
public final OpenSearchRelBuilder relBuilder;
public final ExtendedRexBuilder rexBuilder;
public final FunctionProperties functionProperties;
public final QueryType queryType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import static org.opensearch.sql.ast.tree.Sort.SortOption.DEFAULT_DESC;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_DEDUP;
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_JOIN_MAX_DEDUP;
import static org.opensearch.sql.calcite.plan.rule.PPLDedupConvertRule.buildDedupNotNull;
import static org.opensearch.sql.calcite.plan.rule.PPLDedupConvertRule.buildDedupOrNull;
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_MAIN;
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_RARE_TOP;
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_STREAMSTATS;
Expand Down Expand Up @@ -146,9 +146,9 @@
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.calcite.plan.AliasFieldsWrappable;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.utils.BinUtils;
import org.opensearch.sql.calcite.utils.JoinAndLookupUtils;
import org.opensearch.sql.calcite.utils.PPLHintUtils;
Expand Down Expand Up @@ -1330,7 +1330,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
: duplicatedFieldNames.stream()
.map(a -> (RexNode) context.relBuilder.field(a))
.toList();
buildDedupNotNull(context, dedupeFields, allowedDuplication, true);
buildDedupNotNull(context.relBuilder, dedupeFields, allowedDuplication);
}
// add LogicalSystemLimit after dedup
addSysLimitForJoinSubsearch(context);
Expand Down Expand Up @@ -1388,7 +1388,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
List<RexNode> dedupeFields =
getRightColumnsInJoinCriteria(context.relBuilder, joinCondition);

buildDedupNotNull(context, dedupeFields, allowedDuplication, true);
buildDedupNotNull(context.relBuilder, dedupeFields, allowedDuplication);
}
// add LogicalSystemLimit after dedup
addSysLimitForJoinSubsearch(context);
Expand Down Expand Up @@ -1565,81 +1565,13 @@ public RelNode visitDedupe(Dedupe node, CalcitePlanContext context) {
List<RexNode> dedupeFields =
node.getFields().stream().map(f -> rexVisitor.analyze(f, context)).toList();
if (keepEmpty) {
buildDedupOrNull(context, dedupeFields, allowedDuplication);
buildDedupOrNull(context.relBuilder, dedupeFields, allowedDuplication);
} else {
buildDedupNotNull(context, dedupeFields, allowedDuplication, false);
buildDedupNotNull(context.relBuilder, dedupeFields, allowedDuplication);
}
return context.relBuilder.peek();
}

private static void buildDedupOrNull(
CalcitePlanContext context, List<RexNode> dedupeFields, Integer allowedDuplication) {
/*
* | dedup 2 a, b keepempty=true
* LogicalProject(...)
* +- LogicalFilter(condition=[OR(IS NULL(a), IS NULL(b), <=(_row_number_dedup_, 1))])
* +- LogicalProject(..., _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY a, b ORDER BY a, b)])
* +- ...
*/
RexNode rowNumber =
context
.relBuilder
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.partitionBy(dedupeFields)
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(ROW_NUMBER_COLUMN_FOR_DEDUP);
context.relBuilder.projectPlus(rowNumber);
RexNode _row_number_dedup_ = context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_DEDUP);
// Filter (isnull('a) OR isnull('b) OR '_row_number_dedup_ <= n)
context.relBuilder.filter(
context.relBuilder.or(
context.relBuilder.or(dedupeFields.stream().map(context.relBuilder::isNull).toList()),
context.relBuilder.lessThanOrEqual(
_row_number_dedup_, context.relBuilder.literal(allowedDuplication))));
// DropColumns('_row_number_dedup_)
context.relBuilder.projectExcept(_row_number_dedup_);
}

private static void buildDedupNotNull(
CalcitePlanContext context,
List<RexNode> dedupeFields,
Integer allowedDuplication,
boolean fromJoinMaxOption) {
/*
* | dedup 2 a, b keepempty=false
* LogicalProject(...)
* +- LogicalFilter(condition=[<=(_row_number_dedup_, n)]))
* +- LogicalProject(..., _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY a, b ORDER BY a, b)])
* +- LogicalFilter(condition=[AND(IS NOT NULL(a), IS NOT NULL(b))])
* +- ...
*/
// Filter (isnotnull('a) AND isnotnull('b))
String rowNumberAlias =
fromJoinMaxOption ? ROW_NUMBER_COLUMN_FOR_JOIN_MAX_DEDUP : ROW_NUMBER_COLUMN_FOR_DEDUP;
context.relBuilder.filter(
context.relBuilder.and(dedupeFields.stream().map(context.relBuilder::isNotNull).toList()));
// Window [row_number() windowspecdefinition('a, 'b, 'a ASC NULLS FIRST, 'b ASC NULLS FIRST,
// specifiedwindowoundedpreceding$(), currentrow$())) AS _row_number_dedup_], ['a, 'b], ['a ASC
// NULLS FIRST, 'b ASC NULLS FIRST]
RexNode rowNumber =
context
.relBuilder
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.partitionBy(dedupeFields)
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(rowNumberAlias);
context.relBuilder.projectPlus(rowNumber);
RexNode rowNumberField = context.relBuilder.field(rowNumberAlias);
// Filter ('_row_number_dedup_ <= n)
context.relBuilder.filter(
context.relBuilder.lessThanOrEqual(
rowNumberField, context.relBuilder.literal(allowedDuplication)));
// DropColumns('_row_number_dedup_)
context.relBuilder.projectExcept(rowNumberField);
}

@Override
public RelNode visitWindow(Window node, CalcitePlanContext context) {
visitChildren(node, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@
import org.opensearch.sql.ast.expression.subquery.ScalarSubquery;
import org.opensearch.sql.ast.expression.subquery.SubqueryExpression;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
import org.opensearch.sql.calcite.utils.PlanUtils;
import org.opensearch.sql.calcite.utils.SubsearchUtils;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.plan.rel;

import java.util.List;
import lombok.Getter;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rex.RexNode;
import org.opensearch.sql.exception.CalciteUnsupportedException;

/** Relational expression representing a dedup command. */
@Getter
public abstract class Dedup extends SingleRel {
final List<RexNode> dedupeFields;
final Integer allowedDuplication;
final Boolean keepEmpty;
final Boolean consecutive;

/** */
protected Dedup(
RelOptCluster cluster,
RelTraitSet traitSet,
RelNode input,
List<RexNode> dedupeFields,
Integer allowedDuplication,
Boolean keepEmpty,
Boolean consecutive) {
super(cluster, traitSet, input);
if (allowedDuplication <= 0) {
throw new IllegalArgumentException("Number of duplicate events must be greater than 0");
}
if (consecutive) {
throw new CalciteUnsupportedException("Consecutive deduplication is unsupported in Calcite");
}
this.dedupeFields = dedupeFields;
this.allowedDuplication = allowedDuplication;
this.keepEmpty = keepEmpty;
this.consecutive = consecutive;
}

@Override
public final RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return copy(
traitSet,
sole(inputs),
this.dedupeFields,
this.allowedDuplication,
this.keepEmpty,
this.consecutive);
}

public abstract Dedup copy(
RelTraitSet traitSet,
RelNode input,
List<RexNode> dedupeFields,
Integer allowedDuplication,
Boolean keepEmpty,
Boolean consecutive);

public Dedup copy(RelNode input, List<RexNode> dedupeFields) {
return this.copy(
this.getTraitSet(),
input,
dedupeFields,
this.allowedDuplication,
this.keepEmpty,
this.consecutive);
}

@Override
public RelWriter explainTerms(RelWriter pw) {
return super.explainTerms(pw)
.item("dedup_fields", dedupeFields)
.item("allowed_dedup", allowedDuplication)
.item("keepEmpty", keepEmpty)
.item("consecutive", consecutive);
}

@Override
public void register(RelOptPlanner planner) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.plan.rel;

import static org.opensearch.sql.calcite.plan.rule.PPLDedupConvertRule.DEDUP_CONVERT_RULE;

import java.util.List;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rex.RexNode;

public class LogicalDedup extends Dedup {

protected LogicalDedup(
RelOptCluster cluster,
RelTraitSet traitSet,
RelNode input,
List<RexNode> dedupeFields,
Integer allowedDuplication,
Boolean keepEmpty,
Boolean consecutive) {
super(cluster, traitSet, input, dedupeFields, allowedDuplication, keepEmpty, consecutive);
}

@Override
public Dedup copy(
RelTraitSet traitSet,
RelNode input,
List<RexNode> dedupeFields,
Integer allowedDuplication,
Boolean keepEmpty,
Boolean consecutive) {
assert traitSet.containsIfApplicable(Convention.NONE);
return new LogicalDedup(
getCluster(), traitSet, input, dedupeFields, allowedDuplication, keepEmpty, consecutive);
}

public static LogicalDedup create(
RelNode input,
List<RexNode> dedupeFields,
Integer allowedDuplication,
Boolean keepEmpty,
Boolean consecutive) {
final RelOptCluster cluster = input.getCluster();
RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE);
return new LogicalDedup(
cluster, traitSet, input, dedupeFields, allowedDuplication, keepEmpty, consecutive);
}

@Override
public void register(RelOptPlanner planner) {
planner.addRule(DEDUP_CONVERT_RULE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.plan;
package org.opensearch.sql.calcite.plan.rel;

import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.plan;
package org.opensearch.sql.calcite.plan.rel;

import com.google.common.collect.ImmutableList;
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
Expand All @@ -14,6 +14,7 @@
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.rules.CoreRules;
import org.opensearch.sql.calcite.plan.rule.OpenSearchRules;

/** Relational expression representing a scan of an OpenSearch type. */
public abstract class OpenSearchTableScan extends TableScan implements EnumerableRel {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.plan;
package org.opensearch.sql.calcite.plan.rule;

import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelRule;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.plan;
package org.opensearch.sql.calcite.plan.rule;

import com.google.common.collect.ImmutableList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.plan;
package org.opensearch.sql.calcite.plan.rule;

import java.util.ArrayList;
import java.util.Collection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.plan;
package org.opensearch.sql.calcite.plan.rule;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
Expand Down
Loading
Loading