Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.profile;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.rel.RelNode;
import org.opensearch.sql.calcite.plan.Scannable;
import org.opensearch.sql.monitor.profile.ProfilePlanNode;

/** Builds a profiled EnumerableRel plan tree and matching plan node structure. */
public final class PlanProfileBuilder {

private PlanProfileBuilder() {}

public static ProfilePlan profile(RelNode root) {
Objects.requireNonNull(root, "root");
return profileRel(root);
}

private static ProfilePlan profileRel(RelNode rel) {
List<ProfilePlan> childPlans = new ArrayList<>();
for (RelNode input : rel.getInputs()) {
childPlans.add(profileRel(input));
}

List<RelNode> newInputs =
childPlans.stream().map(ProfilePlan::rel).collect(Collectors.toList());
List<ProfilePlanNode> childNodes =
childPlans.stream().map(ProfilePlan::planRoot).collect(Collectors.toList());

ProfilePlanNode planNode = new ProfilePlanNode(nodeName(rel), childNodes);
RelNode wrappedRel = wrap(rel, newInputs, planNode);
return new ProfilePlan(wrappedRel, planNode);
}

private static RelNode wrap(RelNode rel, List<RelNode> inputs, ProfilePlanNode planNode) {
if (!(rel instanceof EnumerableRel)) {
try {
return rel.copy(rel.getTraitSet(), inputs);
} catch (UnsupportedOperationException e) {
return rel;
}
}
if (rel instanceof Scannable) {
return new ProfileScannableRel((EnumerableRel) rel, inputs, planNode);
}
return new ProfileEnumerableRel((EnumerableRel) rel, inputs, planNode);
}

private static String nodeName(RelNode rel) {
return rel.getRelTypeName();
}

/** Pair of the profiled RelNode tree and its root plan node. */
public static final class ProfilePlan {
private final RelNode rel;
private final ProfilePlanNode planRoot;

public ProfilePlan(RelNode rel, ProfilePlanNode planRoot) {
this.rel = rel;
this.planRoot = planRoot;
}

public RelNode rel() {
return rel;
}

public ProfilePlanNode planRoot() {
return planRoot;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.profile;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.tree.BlockBuilder;
import org.apache.calcite.linq4j.tree.BlockStatement;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.linq4j.tree.GotoExpressionKind;
import org.apache.calcite.linq4j.tree.GotoStatement;
import org.apache.calcite.linq4j.tree.Statement;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.AbstractRelNode;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.type.RelDataType;
import org.opensearch.sql.monitor.profile.ProfilePlanNode;
import org.opensearch.sql.monitor.profile.ProfilePlanNodeMetrics;

/** EnumerableRel wrapper that records inclusive time and row counts. */
public class ProfileEnumerableRel extends AbstractRelNode implements EnumerableRel {

private final EnumerableRel delegate;
private final List<RelNode> inputs;
protected final ProfilePlanNode planNode;

public ProfileEnumerableRel(
EnumerableRel delegate, List<RelNode> inputs, ProfilePlanNode planNode) {
super(
Objects.requireNonNull(delegate, "delegate").getCluster(),
Objects.requireNonNull(delegate, "delegate").getTraitSet());
this.delegate = delegate;
this.inputs = new ArrayList<>(Objects.requireNonNull(inputs, "inputs"));
this.planNode = Objects.requireNonNull(planNode, "planNode");
}

@Override
public List<RelNode> getInputs() {
return inputs;
}

@Override
public void replaceInput(int ordinalInParent, RelNode p) {
inputs.set(ordinalInParent, p);
}

@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new ProfileEnumerableRel(delegate, inputs, planNode);
}

@Override
protected RelDataType deriveRowType() {
return delegate.getRowType();
}

@Override
public String getRelTypeName() {
return delegate.getRelTypeName();
}

@Override
public RelWriter explainTerms(RelWriter pw) {
RelWriter writer = pw;
for (RelNode input : inputs) {
writer = writer.input("input", input);
}
return writer;
}

@Override
public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
EnumerableRel rewritten = (EnumerableRel) delegate.copy(delegate.getTraitSet(), inputs);
Result result = rewritten.implement(implementor, pref);
return new Result(
wrapBlock(
result.block, implementor.stash(planNode.metrics(), ProfilePlanNodeMetrics.class)),
result.physType,
result.format);
}

/**
* Rewrite the generated block by wrapping the returned Enumerable with profiling.
*
* <p>Example (simplified):
*
* <pre>
* // Before:
* {
* final Enumerable<Object[]> input = ...;
* return input;
* }
*
* // After:
* {
* final Enumerable<Object[]> input = ...;
* return ProfileEnumerableRel.profile(input, metrics);
* }
* </pre>
*/
private static BlockStatement wrapBlock(BlockStatement block, Expression metricsExpression) {
List<Statement> statements = block.statements;
if (statements.isEmpty()) {
return block;
}
Statement last = statements.get(statements.size() - 1);
// Expect Calcite to end blocks with a return GotoStatement; skip profiling if it doesn't.
if (!(last instanceof GotoStatement)) {
return block;
}
GotoStatement gotoStatement = (GotoStatement) last;
// Only rewrite blocks that return an expression; otherwise keep the original block.
if (gotoStatement.kind != GotoExpressionKind.Return || gotoStatement.expression == null) {
return block;
}
Expression profiled =
Expressions.call(
ProfileEnumerableRel.class, "profile", gotoStatement.expression, metricsExpression);
BlockBuilder builder = new BlockBuilder();
for (int i = 0; i < statements.size() - 1; i++) {
builder.add(statements.get(i));
}
builder.add(Expressions.return_(gotoStatement.labelTarget, profiled));
return builder.toBlock();
}

public static <T> Enumerable<T> profile(
Enumerable<T> enumerable, ProfilePlanNodeMetrics metrics) {
if (metrics == null) {
return enumerable;
}
return new AbstractEnumerable<>() {
@Override
public Enumerator<T> enumerator() {
long start = System.nanoTime();
Enumerator<T> delegate = enumerable.enumerator();
metrics.addTimeNanos(System.nanoTime() - start);
return new ProfileEnumerator<>(delegate, metrics);
}
};
}

private static final class ProfileEnumerator<T> implements Enumerator<T> {
private final Enumerator<T> delegate;
private final ProfilePlanNodeMetrics metrics;

private ProfileEnumerator(Enumerator<T> delegate, ProfilePlanNodeMetrics metrics) {
this.delegate = delegate;
this.metrics = metrics;
}

@Override
public T current() {
return delegate.current();
}

@Override
public boolean moveNext() {
long start = System.nanoTime();
try {
boolean hasNext = delegate.moveNext();
if (hasNext) {
metrics.incrementRows();
}
return hasNext;
} finally {
metrics.addTimeNanos(System.nanoTime() - start);
}
}

@Override
public void reset() {
delegate.reset();
}

@Override
public void close() {
long start = System.nanoTime();
try {
delegate.close();
} finally {
metrics.addTimeNanos(System.nanoTime() - start);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.profile;

import java.util.List;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.rel.RelNode;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.opensearch.sql.calcite.plan.Scannable;
import org.opensearch.sql.monitor.profile.ProfilePlanNode;

/** EnumerableRel wrapper that also supports Scannable plans. */
public final class ProfileScannableRel extends ProfileEnumerableRel implements Scannable {

private final Scannable scannable;

public ProfileScannableRel(
EnumerableRel delegate, List<RelNode> inputs, ProfilePlanNode planNode) {
super(delegate, inputs, planNode);
this.scannable = (Scannable) delegate;
}

@Override
public Enumerable<@Nullable Object> scan() {
return profile(scannable.scan(), planNode.metrics());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@
import org.opensearch.sql.calcite.plan.Scannable;
import org.opensearch.sql.calcite.plan.rule.OpenSearchRules;
import org.opensearch.sql.calcite.plan.rule.PPLSimplifyDedupRule;
import org.opensearch.sql.calcite.profile.PlanProfileBuilder;
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
import org.opensearch.sql.monitor.profile.ProfileContext;
import org.opensearch.sql.monitor.profile.ProfileMetric;
import org.opensearch.sql.monitor.profile.QueryProfiling;

Expand Down Expand Up @@ -318,11 +320,18 @@ public OpenSearchCalcitePreparingStmt(

@Override
protected PreparedResult implement(RelRoot root) {
ProfileContext profileContext = QueryProfiling.current();
if (profileContext.isEnabled()) {
PlanProfileBuilder.ProfilePlan plan = PlanProfileBuilder.profile(root.rel);
profileContext.setPlanRoot(plan.planRoot());
root = root.withRel(plan.rel());
}
if (root.rel instanceof Scannable) {
Scannable scannable = (Scannable) root.rel;
Hook.PLAN_BEFORE_IMPLEMENTATION.run(root);
RelDataType resultType = root.rel.getRowType();
boolean isDml = root.kind.belongsTo(SqlKind.DML);
final Bindable bindable = dataContext -> ((Scannable) root.rel).scan();
final Bindable bindable = dataContext -> scannable.scan();

return new PreparedResultImpl(
resultType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,30 @@ public class DefaultProfileContext implements ProfileContext {
private final long startNanos = System.nanoTime();
private boolean finished;
private final Map<MetricName, DefaultMetricImpl> metrics = new ConcurrentHashMap<>();
private ProfilePlanNode planRoot;
private QueryProfile profile;

public DefaultProfileContext() {}

@Override
public boolean isEnabled() {
return true;
}

/** {@inheritDoc} */
@Override
public ProfileMetric getOrCreateMetric(MetricName name) {
Objects.requireNonNull(name, "name");
return metrics.computeIfAbsent(name, key -> new DefaultMetricImpl(key.name()));
}

@Override
public synchronized void setPlanRoot(ProfilePlanNode planRoot) {
if (this.planRoot == null) {
this.planRoot = planRoot;
}
}

/** {@inheritDoc} */
@Override
public synchronized QueryProfile finish() {
Expand All @@ -38,15 +51,12 @@ public synchronized QueryProfile finish() {
Map<MetricName, Double> snapshot = new LinkedHashMap<>(MetricName.values().length);
for (MetricName metricName : MetricName.values()) {
DefaultMetricImpl metric = metrics.get(metricName);
double millis = metric == null ? 0d : roundToMillis(metric.value());
double millis = metric == null ? 0d : ProfileUtils.roundToMillis(metric.value());
snapshot.put(metricName, millis);
}
double totalMillis = roundToMillis(endNanos - startNanos);
profile = new QueryProfile(totalMillis, snapshot);
double totalMillis = ProfileUtils.roundToMillis(endNanos - startNanos);
QueryProfile.PlanNode planSnapshot = planRoot == null ? null : planRoot.snapshot();
profile = new QueryProfile(totalMillis, snapshot, planSnapshot);
return profile;
}

private double roundToMillis(long nanos) {
return Math.round((nanos / 1_000_000.0d) * 100.0d) / 100.0d;
}
}
Loading
Loading