Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.profile;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.rel.RelNode;
import org.opensearch.sql.calcite.plan.Scannable;
import org.opensearch.sql.monitor.profile.ProfilePlanNode;

/** Builds a profiled EnumerableRel plan tree and matching plan node structure. */
public final class PlanProfileBuilder {

private PlanProfileBuilder() {}

public static ProfilePlan profile(RelNode root) {
Objects.requireNonNull(root, "root");
return profileRel(root);
}

private static ProfilePlan profileRel(RelNode rel) {
List<ProfilePlan> childPlans = new ArrayList<>();
for (RelNode input : rel.getInputs()) {
childPlans.add(profileRel(input));
}

List<RelNode> newInputs =
childPlans.stream().map(ProfilePlan::rel).collect(Collectors.toList());
List<ProfilePlanNode> childNodes =
childPlans.stream().map(ProfilePlan::planRoot).collect(Collectors.toList());

ProfilePlanNode planNode = new ProfilePlanNode(nodeName(rel), childNodes);
RelNode wrappedRel = wrap(rel, newInputs, planNode);
return new ProfilePlan(wrappedRel, planNode);
}

private static RelNode wrap(RelNode rel, List<RelNode> inputs, ProfilePlanNode planNode) {
if (!(rel instanceof EnumerableRel)) {
try {
return rel.copy(rel.getTraitSet(), inputs);
} catch (UnsupportedOperationException e) {
return rel;
}
}
Comment on lines +43 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential profiling chain break when copy() fails.

If rel.copy() throws UnsupportedOperationException, the method returns the original rel which still references its original unwrapped children. The wrapped inputs (with profiling instrumentation) are discarded, breaking the profiling chain for that subtree.

Consider logging a warning when this fallback occurs so operators can identify which node types don't support profiling.

Proposed fix with logging
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;

 public final class PlanProfileBuilder {
+  private static final Logger LOG = LogManager.getLogger(PlanProfileBuilder.class);

   // ...

   private static RelNode wrap(RelNode rel, List<RelNode> inputs, ProfilePlanNode planNode) {
     if (!(rel instanceof EnumerableRel)) {
       try {
         return rel.copy(rel.getTraitSet(), inputs);
       } catch (UnsupportedOperationException e) {
+        LOG.debug("Cannot wrap {} for profiling: copy not supported", rel.getRelTypeName());
         return rel;
       }
     }

if (rel instanceof Scannable) {
return new ProfileScannableRel((EnumerableRel) rel, inputs, planNode);
}
return new ProfileEnumerableRel((EnumerableRel) rel, inputs, planNode);
}

private static String nodeName(RelNode rel) {
return rel.getRelTypeName();
}

/** Pair of the profiled RelNode tree and its root plan node. */
public static final class ProfilePlan {
private final RelNode rel;
private final ProfilePlanNode planRoot;

public ProfilePlan(RelNode rel, ProfilePlanNode planRoot) {
this.rel = rel;
this.planRoot = planRoot;
}

public RelNode rel() {
return rel;
}

public ProfilePlanNode planRoot() {
return planRoot;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.profile;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.tree.BlockBuilder;
import org.apache.calcite.linq4j.tree.BlockStatement;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.linq4j.tree.GotoExpressionKind;
import org.apache.calcite.linq4j.tree.GotoStatement;
import org.apache.calcite.linq4j.tree.Statement;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.AbstractRelNode;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.type.RelDataType;
import org.opensearch.sql.monitor.profile.ProfilePlanNode;
import org.opensearch.sql.monitor.profile.ProfilePlanNodeMetrics;

/** EnumerableRel wrapper that records inclusive time and row counts. */
public class ProfileEnumerableRel extends AbstractRelNode implements EnumerableRel {

private final EnumerableRel delegate;
private final List<RelNode> inputs;
protected final ProfilePlanNode planNode;

public ProfileEnumerableRel(
EnumerableRel delegate, List<RelNode> inputs, ProfilePlanNode planNode) {
super(
Objects.requireNonNull(delegate, "delegate").getCluster(),
Objects.requireNonNull(delegate, "delegate").getTraitSet());
this.delegate = delegate;
this.inputs = new ArrayList<>(Objects.requireNonNull(inputs, "inputs"));
this.planNode = Objects.requireNonNull(planNode, "planNode");
}

@Override
public List<RelNode> getInputs() {
return inputs;
}

@Override
public void replaceInput(int ordinalInParent, RelNode p) {
inputs.set(ordinalInParent, p);
}

@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new ProfileEnumerableRel(delegate, inputs, planNode);
}

@Override
protected RelDataType deriveRowType() {
return delegate.getRowType();
}

@Override
public String getRelTypeName() {
return delegate.getRelTypeName();
}

@Override
public RelWriter explainTerms(RelWriter pw) {
RelWriter writer = pw;
for (RelNode input : inputs) {
writer = writer.input("input", input);
}
return writer;
}

@Override
public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
EnumerableRel rewritten = (EnumerableRel) delegate.copy(delegate.getTraitSet(), inputs);
Result result = rewritten.implement(implementor, pref);
return new Result(
wrapBlock(
result.block, implementor.stash(planNode.metrics(), ProfilePlanNodeMetrics.class)),
result.physType,
result.format);
}

/**
* Rewrite the generated block by wrapping the returned Enumerable with profiling.
*
* <p>Example (simplified):
*
* <pre>
* // Before:
* {
* final Enumerable<Object[]> input = ...;
* return input;
* }
*
* // After:
* {
* final Enumerable<Object[]> input = ...;
* return ProfileEnumerableRel.profile(input, metrics);
* }
* </pre>
*/
private static BlockStatement wrapBlock(BlockStatement block, Expression metricsExpression) {
List<Statement> statements = block.statements;
if (statements.isEmpty()) {
return block;
}
Statement last = statements.get(statements.size() - 1);
// Expect Calcite to end blocks with a return GotoStatement; skip profiling if it doesn't.
if (!(last instanceof GotoStatement)) {
return block;
}
GotoStatement gotoStatement = (GotoStatement) last;
// Only rewrite blocks that return an expression; otherwise keep the original block.
if (gotoStatement.kind != GotoExpressionKind.Return || gotoStatement.expression == null) {
return block;
}
Expression profiled =
Expressions.call(
ProfileEnumerableRel.class, "profile", gotoStatement.expression, metricsExpression);
BlockBuilder builder = new BlockBuilder();
for (int i = 0; i < statements.size() - 1; i++) {
builder.add(statements.get(i));
}
builder.add(Expressions.return_(gotoStatement.labelTarget, profiled));
return builder.toBlock();
}

public static <T> Enumerable<T> profile(
Enumerable<T> enumerable, ProfilePlanNodeMetrics metrics) {
if (metrics == null) {
return enumerable;
}
return new AbstractEnumerable<>() {
@Override
public Enumerator<T> enumerator() {
long start = System.nanoTime();
Enumerator<T> delegate = enumerable.enumerator();
metrics.addTimeNanos(System.nanoTime() - start);
return new ProfileEnumerator<>(delegate, metrics);
}
};
}

private static final class ProfileEnumerator<T> implements Enumerator<T> {
private final Enumerator<T> delegate;
private final ProfilePlanNodeMetrics metrics;

private ProfileEnumerator(Enumerator<T> delegate, ProfilePlanNodeMetrics metrics) {
this.delegate = delegate;
this.metrics = metrics;
}

@Override
public T current() {
return delegate.current();
}

@Override
public boolean moveNext() {
long start = System.nanoTime();
try {
boolean hasNext = delegate.moveNext();
if (hasNext) {
metrics.incrementRows();
}
return hasNext;
} finally {
metrics.addTimeNanos(System.nanoTime() - start);
}
}

@Override
public void reset() {
delegate.reset();
}

@Override
public void close() {
long start = System.nanoTime();
try {
delegate.close();
} finally {
metrics.addTimeNanos(System.nanoTime() - start);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.profile;

import java.util.List;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.rel.RelNode;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.opensearch.sql.calcite.plan.Scannable;
import org.opensearch.sql.monitor.profile.ProfilePlanNode;

/** EnumerableRel wrapper that also supports Scannable plans. */
public final class ProfileScannableRel extends ProfileEnumerableRel implements Scannable {

private final Scannable scannable;

public ProfileScannableRel(
EnumerableRel delegate, List<RelNode> inputs, ProfilePlanNode planNode) {
super(delegate, inputs, planNode);
this.scannable = (Scannable) delegate;
}

@Override
public Enumerable<@Nullable Object> scan() {
return profile(scannable.scan(), planNode.metrics());
}
Comment on lines +17 to +30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Override copy() to preserve ProfileScannableRel type.

Based on learnings from this repository, custom Calcite RelNode classes that extend other RelNode types must override the copy method. Without this override, ProfileScannableRel will downgrade to ProfileEnumerableRel during copy operations, losing the Scannable interface implementation.

🔧 Proposed fix
 public final class ProfileScannableRel extends ProfileEnumerableRel implements Scannable {
 
   private final Scannable scannable;
 
   public ProfileScannableRel(
       EnumerableRel delegate, List<RelNode> inputs, ProfilePlanNode planNode) {
     super(delegate, inputs, planNode);
     this.scannable = (Scannable) delegate;
   }
 
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new ProfileScannableRel((EnumerableRel) scannable, inputs, planNode);
+  }
+
   @Override
   public Enumerable<@Nullable Object> scan() {
     return profile(scannable.scan(), planNode.metrics());
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public final class ProfileScannableRel extends ProfileEnumerableRel implements Scannable {
private final Scannable scannable;
public ProfileScannableRel(
EnumerableRel delegate, List<RelNode> inputs, ProfilePlanNode planNode) {
super(delegate, inputs, planNode);
this.scannable = (Scannable) delegate;
}
@Override
public Enumerable<@Nullable Object> scan() {
return profile(scannable.scan(), planNode.metrics());
}
public final class ProfileScannableRel extends ProfileEnumerableRel implements Scannable {
private final Scannable scannable;
public ProfileScannableRel(
EnumerableRel delegate, List<RelNode> inputs, ProfilePlanNode planNode) {
super(delegate, inputs, planNode);
this.scannable = (Scannable) delegate;
}
@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new ProfileScannableRel((EnumerableRel) scannable, inputs, planNode);
}
@Override
public Enumerable<@Nullable Object> scan() {
return profile(scannable.scan(), planNode.metrics());
}
}
🤖 Prompt for AI Agents
In
@core/src/main/java/org/opensearch/sql/calcite/profile/ProfileScannableRel.java
around lines 17 - 30, ProfileScannableRel is missing an override of the
copy(...) method so copy operations can recreate a ProfileScannableRel (and
retain the Scannable behavior) instead of falling back to ProfileEnumerableRel;
add a public ProfileScannableRel copy(RelTraitSet traitSet, List<RelNode>
inputs) override that calls the superclass constructor with the incoming
delegate (cast to EnumerableRel), passes inputs and planNode, and sets the
scannable field by casting the delegate to Scannable so the new instance
preserves the Scannable implementation and metrics.

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@
import org.opensearch.sql.calcite.plan.Scannable;
import org.opensearch.sql.calcite.plan.rule.OpenSearchRules;
import org.opensearch.sql.calcite.plan.rule.PPLSimplifyDedupRule;
import org.opensearch.sql.calcite.profile.PlanProfileBuilder;
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
import org.opensearch.sql.monitor.profile.ProfileContext;
import org.opensearch.sql.monitor.profile.ProfileMetric;
import org.opensearch.sql.monitor.profile.QueryProfiling;

Expand Down Expand Up @@ -318,6 +320,12 @@ public OpenSearchCalcitePreparingStmt(

@Override
protected PreparedResult implement(RelRoot root) {
ProfileContext profileContext = QueryProfiling.current();
if (profileContext.isEnabled()) {
PlanProfileBuilder.ProfilePlan plan = PlanProfileBuilder.profile(root.rel);
profileContext.setPlanRoot(plan.planRoot());
root = root.withRel(plan.rel());
}
if (root.rel instanceof Scannable scannable) {
Hook.PLAN_BEFORE_IMPLEMENTATION.run(root);
RelDataType resultType = root.rel.getRowType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,30 @@ public class DefaultProfileContext implements ProfileContext {
private final long startNanos = System.nanoTime();
private boolean finished;
private final Map<MetricName, DefaultMetricImpl> metrics = new ConcurrentHashMap<>();
private ProfilePlanNode planRoot;
private QueryProfile profile;

public DefaultProfileContext() {}

@Override
public boolean isEnabled() {
return true;
}

/** {@inheritDoc} */
@Override
public ProfileMetric getOrCreateMetric(MetricName name) {
Objects.requireNonNull(name, "name");
return metrics.computeIfAbsent(name, key -> new DefaultMetricImpl(key.name()));
}

@Override
public synchronized void setPlanRoot(ProfilePlanNode planRoot) {
if (this.planRoot == null) {
this.planRoot = planRoot;
}
}

/** {@inheritDoc} */
@Override
public synchronized QueryProfile finish() {
Expand All @@ -38,15 +51,12 @@ public synchronized QueryProfile finish() {
Map<MetricName, Double> snapshot = new LinkedHashMap<>(MetricName.values().length);
for (MetricName metricName : MetricName.values()) {
DefaultMetricImpl metric = metrics.get(metricName);
double millis = metric == null ? 0d : roundToMillis(metric.value());
double millis = metric == null ? 0d : ProfileUtils.roundToMillis(metric.value());
snapshot.put(metricName, millis);
}
double totalMillis = roundToMillis(endNanos - startNanos);
profile = new QueryProfile(totalMillis, snapshot);
double totalMillis = ProfileUtils.roundToMillis(endNanos - startNanos);
QueryProfile.PlanNode planSnapshot = planRoot == null ? null : planRoot.snapshot();
profile = new QueryProfile(totalMillis, snapshot, planSnapshot);
return profile;
}

private double roundToMillis(long nanos) {
return Math.round((nanos / 1_000_000.0d) * 100.0d) / 100.0d;
}
}
Loading
Loading