diff --git a/core/src/main/java/org/opensearch/sql/calcite/profile/PlanProfileBuilder.java b/core/src/main/java/org/opensearch/sql/calcite/profile/PlanProfileBuilder.java new file mode 100644 index 0000000000..98b696f6c3 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/profile/PlanProfileBuilder.java @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.profile; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.rel.RelNode; +import org.opensearch.sql.calcite.plan.Scannable; +import org.opensearch.sql.monitor.profile.ProfilePlanNode; + +/** Builds a profiled EnumerableRel plan tree and matching plan node structure. */ +public final class PlanProfileBuilder { + + private PlanProfileBuilder() {} + + public static ProfilePlan profile(RelNode root) { + Objects.requireNonNull(root, "root"); + return profileRel(root); + } + + private static ProfilePlan profileRel(RelNode rel) { + List childPlans = new ArrayList<>(); + for (RelNode input : rel.getInputs()) { + childPlans.add(profileRel(input)); + } + + List newInputs = + childPlans.stream().map(ProfilePlan::rel).collect(Collectors.toList()); + List childNodes = + childPlans.stream().map(ProfilePlan::planRoot).collect(Collectors.toList()); + + ProfilePlanNode planNode = new ProfilePlanNode(nodeName(rel), childNodes); + RelNode wrappedRel = wrap(rel, newInputs, planNode); + return new ProfilePlan(wrappedRel, planNode); + } + + private static RelNode wrap(RelNode rel, List inputs, ProfilePlanNode planNode) { + if (!(rel instanceof EnumerableRel)) { + try { + return rel.copy(rel.getTraitSet(), inputs); + } catch (UnsupportedOperationException e) { + return rel; + } + } + if (rel instanceof Scannable) { + return new ProfileScannableRel((EnumerableRel) rel, inputs, planNode); + } + return new ProfileEnumerableRel((EnumerableRel) rel, inputs, planNode); + } + + private static String nodeName(RelNode rel) { + return rel.getRelTypeName(); + } + + /** Pair of the profiled RelNode tree and its root plan node. */ + public static final class ProfilePlan { + private final RelNode rel; + private final ProfilePlanNode planRoot; + + public ProfilePlan(RelNode rel, ProfilePlanNode planRoot) { + this.rel = rel; + this.planRoot = planRoot; + } + + public RelNode rel() { + return rel; + } + + public ProfilePlanNode planRoot() { + return planRoot; + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/profile/ProfileEnumerableRel.java b/core/src/main/java/org/opensearch/sql/calcite/profile/ProfileEnumerableRel.java new file mode 100644 index 0000000000..c7e1a76fcb --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/profile/ProfileEnumerableRel.java @@ -0,0 +1,197 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.profile; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.tree.BlockBuilder; +import org.apache.calcite.linq4j.tree.BlockStatement; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.GotoExpressionKind; +import org.apache.calcite.linq4j.tree.GotoStatement; +import org.apache.calcite.linq4j.tree.Statement; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.AbstractRelNode; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.type.RelDataType; +import org.opensearch.sql.monitor.profile.ProfilePlanNode; +import org.opensearch.sql.monitor.profile.ProfilePlanNodeMetrics; + +/** EnumerableRel wrapper that records inclusive time and row counts. */ +public class ProfileEnumerableRel extends AbstractRelNode implements EnumerableRel { + + private final EnumerableRel delegate; + private final List inputs; + protected final ProfilePlanNode planNode; + + public ProfileEnumerableRel( + EnumerableRel delegate, List inputs, ProfilePlanNode planNode) { + super( + Objects.requireNonNull(delegate, "delegate").getCluster(), + Objects.requireNonNull(delegate, "delegate").getTraitSet()); + this.delegate = delegate; + this.inputs = new ArrayList<>(Objects.requireNonNull(inputs, "inputs")); + this.planNode = Objects.requireNonNull(planNode, "planNode"); + } + + @Override + public List getInputs() { + return inputs; + } + + @Override + public void replaceInput(int ordinalInParent, RelNode p) { + inputs.set(ordinalInParent, p); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + return new ProfileEnumerableRel(delegate, inputs, planNode); + } + + @Override + protected RelDataType deriveRowType() { + return delegate.getRowType(); + } + + @Override + public String getRelTypeName() { + return delegate.getRelTypeName(); + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + RelWriter writer = pw; + for (RelNode input : inputs) { + writer = writer.input("input", input); + } + return writer; + } + + @Override + public Result implement(EnumerableRelImplementor implementor, Prefer pref) { + EnumerableRel rewritten = (EnumerableRel) delegate.copy(delegate.getTraitSet(), inputs); + Result result = rewritten.implement(implementor, pref); + return new Result( + wrapBlock( + result.block, implementor.stash(planNode.metrics(), ProfilePlanNodeMetrics.class)), + result.physType, + result.format); + } + + /** + * Rewrite the generated block by wrapping the returned Enumerable with profiling. + * + *

Example (simplified): + * + *

+   * // Before:
+   * {
+   *   final Enumerable input = ...;
+   *   return input;
+   * }
+   *
+   * // After:
+   * {
+   *   final Enumerable input = ...;
+   *   return ProfileEnumerableRel.profile(input, metrics);
+   * }
+   * 
+ */ + private static BlockStatement wrapBlock(BlockStatement block, Expression metricsExpression) { + List statements = block.statements; + if (statements.isEmpty()) { + return block; + } + Statement last = statements.get(statements.size() - 1); + // Expect Calcite to end blocks with a return GotoStatement; skip profiling if it doesn't. + if (!(last instanceof GotoStatement)) { + return block; + } + GotoStatement gotoStatement = (GotoStatement) last; + // Only rewrite blocks that return an expression; otherwise keep the original block. + if (gotoStatement.kind != GotoExpressionKind.Return || gotoStatement.expression == null) { + return block; + } + Expression profiled = + Expressions.call( + ProfileEnumerableRel.class, "profile", gotoStatement.expression, metricsExpression); + BlockBuilder builder = new BlockBuilder(); + for (int i = 0; i < statements.size() - 1; i++) { + builder.add(statements.get(i)); + } + builder.add(Expressions.return_(gotoStatement.labelTarget, profiled)); + return builder.toBlock(); + } + + public static Enumerable profile( + Enumerable enumerable, ProfilePlanNodeMetrics metrics) { + if (metrics == null) { + return enumerable; + } + return new AbstractEnumerable<>() { + @Override + public Enumerator enumerator() { + long start = System.nanoTime(); + Enumerator delegate = enumerable.enumerator(); + metrics.addTimeNanos(System.nanoTime() - start); + return new ProfileEnumerator<>(delegate, metrics); + } + }; + } + + private static final class ProfileEnumerator implements Enumerator { + private final Enumerator delegate; + private final ProfilePlanNodeMetrics metrics; + + private ProfileEnumerator(Enumerator delegate, ProfilePlanNodeMetrics metrics) { + this.delegate = delegate; + this.metrics = metrics; + } + + @Override + public T current() { + return delegate.current(); + } + + @Override + public boolean moveNext() { + long start = System.nanoTime(); + try { + boolean hasNext = delegate.moveNext(); + if (hasNext) { + metrics.incrementRows(); + } + return hasNext; + } finally { + metrics.addTimeNanos(System.nanoTime() - start); + } + } + + @Override + public void reset() { + delegate.reset(); + } + + @Override + public void close() { + long start = System.nanoTime(); + try { + delegate.close(); + } finally { + metrics.addTimeNanos(System.nanoTime() - start); + } + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/profile/ProfileScannableRel.java b/core/src/main/java/org/opensearch/sql/calcite/profile/ProfileScannableRel.java new file mode 100644 index 0000000000..1276e356d1 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/profile/ProfileScannableRel.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.profile; + +import java.util.List; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.rel.RelNode; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.opensearch.sql.calcite.plan.Scannable; +import org.opensearch.sql.monitor.profile.ProfilePlanNode; + +/** EnumerableRel wrapper that also supports Scannable plans. */ +public final class ProfileScannableRel extends ProfileEnumerableRel implements Scannable { + + private final Scannable scannable; + + public ProfileScannableRel( + EnumerableRel delegate, List inputs, ProfilePlanNode planNode) { + super(delegate, inputs, planNode); + this.scannable = (Scannable) delegate; + } + + @Override + public Enumerable<@Nullable Object> scan() { + return profile(scannable.scan(), planNode.metrics()); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java index 50b935ac8a..a6d57ea01f 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java @@ -103,7 +103,9 @@ import org.opensearch.sql.calcite.plan.Scannable; import org.opensearch.sql.calcite.plan.rule.OpenSearchRules; import org.opensearch.sql.calcite.plan.rule.PPLSimplifyDedupRule; +import org.opensearch.sql.calcite.profile.PlanProfileBuilder; import org.opensearch.sql.expression.function.PPLBuiltinOperators; +import org.opensearch.sql.monitor.profile.ProfileContext; import org.opensearch.sql.monitor.profile.ProfileMetric; import org.opensearch.sql.monitor.profile.QueryProfiling; @@ -318,6 +320,12 @@ public OpenSearchCalcitePreparingStmt( @Override protected PreparedResult implement(RelRoot root) { + ProfileContext profileContext = QueryProfiling.current(); + if (profileContext.isEnabled()) { + PlanProfileBuilder.ProfilePlan plan = PlanProfileBuilder.profile(root.rel); + profileContext.setPlanRoot(plan.planRoot()); + root = root.withRel(plan.rel()); + } if (root.rel instanceof Scannable scannable) { Hook.PLAN_BEFORE_IMPLEMENTATION.run(root); RelDataType resultType = root.rel.getRowType(); diff --git a/core/src/main/java/org/opensearch/sql/monitor/profile/DefaultProfileContext.java b/core/src/main/java/org/opensearch/sql/monitor/profile/DefaultProfileContext.java index c69a4e38a2..acc5b8521b 100644 --- a/core/src/main/java/org/opensearch/sql/monitor/profile/DefaultProfileContext.java +++ b/core/src/main/java/org/opensearch/sql/monitor/profile/DefaultProfileContext.java @@ -16,10 +16,16 @@ public class DefaultProfileContext implements ProfileContext { private final long startNanos = System.nanoTime(); private boolean finished; private final Map metrics = new ConcurrentHashMap<>(); + private ProfilePlanNode planRoot; private QueryProfile profile; public DefaultProfileContext() {} + @Override + public boolean isEnabled() { + return true; + } + /** {@inheritDoc} */ @Override public ProfileMetric getOrCreateMetric(MetricName name) { @@ -27,6 +33,13 @@ public ProfileMetric getOrCreateMetric(MetricName name) { return metrics.computeIfAbsent(name, key -> new DefaultMetricImpl(key.name())); } + @Override + public synchronized void setPlanRoot(ProfilePlanNode planRoot) { + if (this.planRoot == null) { + this.planRoot = planRoot; + } + } + /** {@inheritDoc} */ @Override public synchronized QueryProfile finish() { @@ -38,15 +51,12 @@ public synchronized QueryProfile finish() { Map snapshot = new LinkedHashMap<>(MetricName.values().length); for (MetricName metricName : MetricName.values()) { DefaultMetricImpl metric = metrics.get(metricName); - double millis = metric == null ? 0d : roundToMillis(metric.value()); + double millis = metric == null ? 0d : ProfileUtils.roundToMillis(metric.value()); snapshot.put(metricName, millis); } - double totalMillis = roundToMillis(endNanos - startNanos); - profile = new QueryProfile(totalMillis, snapshot); + double totalMillis = ProfileUtils.roundToMillis(endNanos - startNanos); + QueryProfile.PlanNode planSnapshot = planRoot == null ? null : planRoot.snapshot(); + profile = new QueryProfile(totalMillis, snapshot, planSnapshot); return profile; } - - private double roundToMillis(long nanos) { - return Math.round((nanos / 1_000_000.0d) * 100.0d) / 100.0d; - } } diff --git a/core/src/main/java/org/opensearch/sql/monitor/profile/NoopProfileContext.java b/core/src/main/java/org/opensearch/sql/monitor/profile/NoopProfileContext.java index b3f2a362e3..dcc164718e 100644 --- a/core/src/main/java/org/opensearch/sql/monitor/profile/NoopProfileContext.java +++ b/core/src/main/java/org/opensearch/sql/monitor/profile/NoopProfileContext.java @@ -14,6 +14,11 @@ public final class NoopProfileContext implements ProfileContext { private NoopProfileContext() {} + @Override + public boolean isEnabled() { + return false; + } + /** {@inheritDoc} */ @Override public ProfileMetric getOrCreateMetric(MetricName name) { @@ -21,6 +26,9 @@ public ProfileMetric getOrCreateMetric(MetricName name) { return NoopProfileMetric.INSTANCE; } + @Override + public void setPlanRoot(ProfilePlanNode planRoot) {} + /** {@inheritDoc} */ @Override public QueryProfile finish() { diff --git a/core/src/main/java/org/opensearch/sql/monitor/profile/ProfileContext.java b/core/src/main/java/org/opensearch/sql/monitor/profile/ProfileContext.java index 044a71c7d4..0c8048cef3 100644 --- a/core/src/main/java/org/opensearch/sql/monitor/profile/ProfileContext.java +++ b/core/src/main/java/org/opensearch/sql/monitor/profile/ProfileContext.java @@ -7,6 +7,11 @@ /** Context for collecting profiling metrics during query execution. */ public interface ProfileContext { + /** + * @return whether profiling is enabled for this context. + */ + boolean isEnabled(); + /** * Obtain or create a metric with the provided name. * @@ -15,6 +20,13 @@ public interface ProfileContext { */ ProfileMetric getOrCreateMetric(MetricName name); + /** + * Register the root plan node for profiling. + * + * @param planRoot root plan node + */ + void setPlanRoot(ProfilePlanNode planRoot); + /** * Finalize profiling and return a snapshot. * diff --git a/core/src/main/java/org/opensearch/sql/monitor/profile/ProfilePlanNode.java b/core/src/main/java/org/opensearch/sql/monitor/profile/ProfilePlanNode.java new file mode 100644 index 0000000000..f27c07f08f --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/monitor/profile/ProfilePlanNode.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.monitor.profile; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** Mutable plan node used while profiling a query. */ +public final class ProfilePlanNode { + + private final String nodeName; + private final List children; + private final ProfilePlanNodeMetrics metrics = new ProfilePlanNodeMetrics(); + + public ProfilePlanNode(String nodeName, List children) { + this.nodeName = Objects.requireNonNull(nodeName, "nodeName"); + this.children = List.copyOf(Objects.requireNonNull(children, "children")); + } + + public ProfilePlanNodeMetrics metrics() { + return metrics; + } + + public QueryProfile.PlanNode snapshot() { + List snapshotChildren = new ArrayList<>(); + for (ProfilePlanNode child : children) { + snapshotChildren.add(child.snapshot()); + } + List outputChildren = + snapshotChildren.isEmpty() ? null : List.copyOf(snapshotChildren); + return new QueryProfile.PlanNode( + nodeName, ProfileUtils.roundToMillis(metrics.timeNanos()), metrics.rows(), outputChildren); + } +} diff --git a/core/src/main/java/org/opensearch/sql/monitor/profile/ProfilePlanNodeMetrics.java b/core/src/main/java/org/opensearch/sql/monitor/profile/ProfilePlanNodeMetrics.java new file mode 100644 index 0000000000..74edcfa8e7 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/monitor/profile/ProfilePlanNodeMetrics.java @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.monitor.profile; + +import java.util.concurrent.atomic.LongAdder; + +/** Metrics captured for a single plan node. */ +public final class ProfilePlanNodeMetrics { + + private final LongAdder timeNanos = new LongAdder(); + private final LongAdder rows = new LongAdder(); + + public ProfilePlanNodeMetrics() {} + + public void addTimeNanos(long nanos) { + if (nanos > 0) { + timeNanos.add(nanos); + } + } + + public void incrementRows() { + rows.increment(); + } + + public long timeNanos() { + return timeNanos.sum(); + } + + public long rows() { + return rows.sum(); + } +} diff --git a/core/src/main/java/org/opensearch/sql/monitor/profile/ProfileUtils.java b/core/src/main/java/org/opensearch/sql/monitor/profile/ProfileUtils.java new file mode 100644 index 0000000000..b85c7f198b --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/monitor/profile/ProfileUtils.java @@ -0,0 +1,22 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.monitor.profile; + +/** Utility helpers for profiling metrics. */ +public final class ProfileUtils { + + private ProfileUtils() {} + + /** + * Convert nanoseconds to milliseconds, rounded to two decimals. + * + * @param nanos duration in nanoseconds + * @return rounded milliseconds + */ + public static double roundToMillis(long nanos) { + return Math.round((nanos / 1_000_000.0d) * 100.0d) / 100.0d; + } +} diff --git a/core/src/main/java/org/opensearch/sql/monitor/profile/QueryProfile.java b/core/src/main/java/org/opensearch/sql/monitor/profile/QueryProfile.java index c766308a12..15177ead1c 100644 --- a/core/src/main/java/org/opensearch/sql/monitor/profile/QueryProfile.java +++ b/core/src/main/java/org/opensearch/sql/monitor/profile/QueryProfile.java @@ -7,6 +7,7 @@ import com.google.gson.annotations.SerializedName; import java.util.LinkedHashMap; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; @@ -20,6 +21,8 @@ public final class QueryProfile { private final Map phases; + private final PlanNode plan; + /** * Create a new query profile snapshot. * @@ -27,8 +30,20 @@ public final class QueryProfile { * @param phases metric values keyed by {@link MetricName} */ public QueryProfile(double totalTimeMillis, Map phases) { + this(totalTimeMillis, phases, null); + } + + /** + * Create a new query profile snapshot. + * + * @param totalTimeMillis total elapsed milliseconds for the query (rounded to two decimals) + * @param phases metric values keyed by {@link MetricName} + * @param plan plan tree profiling output + */ + public QueryProfile(double totalTimeMillis, Map phases, PlanNode plan) { this.summary = new Summary(totalTimeMillis); this.phases = buildPhases(phases); + this.plan = plan; } private Map buildPhases(Map phases) { @@ -62,4 +77,24 @@ private Phase(double timeMillis) { this.timeMillis = timeMillis; } } + + @Getter + public static final class PlanNode { + + private final String node; + + @SerializedName("time_ms") + private final double timeMillis; + + private final long rows; + + private final List children; + + public PlanNode(String node, double timeMillis, long rows, List children) { + this.node = node; + this.timeMillis = timeMillis; + this.rows = rows; + this.children = children; + } + } } diff --git a/core/src/test/java/org/opensearch/sql/calcite/profile/PlanProfileBuilderTest.java b/core/src/test/java/org/opensearch/sql/calcite/profile/PlanProfileBuilderTest.java new file mode 100644 index 0000000000..fa951168ef --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/calcite/profile/PlanProfileBuilderTest.java @@ -0,0 +1,100 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.profile; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.calcite.plan.Scannable; + +@ExtendWith(MockitoExtension.class) +class PlanProfileBuilderTest { + + @Mock private RelOptCluster cluster; + private RelTraitSet traitSet; + + @BeforeEach + void setUp() { + traitSet = RelTraitSet.createEmpty(); + } + + @Test + void wrapsEnumerableRel() { + EnumerableRel rel = mockEnumerableRel("EnumerableCalc", List.of()); + + PlanProfileBuilder.ProfilePlan plan = PlanProfileBuilder.profile((RelNode) rel); + + assertInstanceOf(ProfileEnumerableRel.class, plan.rel()); + assertEquals("EnumerableCalc", plan.planRoot().snapshot().getNode()); + } + + @Test + void wrapsScannableRel() { + EnumerableRel rel = mockScannableRel("CalciteEnumerableIndexScan", List.of()); + + PlanProfileBuilder.ProfilePlan plan = PlanProfileBuilder.profile((RelNode) rel); + + assertInstanceOf(ProfileScannableRel.class, plan.rel()); + assertEquals("CalciteEnumerableIndexScan", plan.planRoot().snapshot().getNode()); + } + + @Test + void rebuildsInputsForNonEnumerableRel() { + EnumerableRel child = mockEnumerableRel("EnumerableCalc", List.of()); + RelNode parent = mock(RelNode.class); + when(parent.getInputs()).thenReturn(List.of(child)); + when(parent.getRelTypeName()).thenReturn("Parent"); + when(parent.getTraitSet()).thenReturn(traitSet); + when(parent.copy(any(), anyList())).thenReturn(parent); + + PlanProfileBuilder.ProfilePlan plan = PlanProfileBuilder.profile(parent); + + ArgumentCaptor> inputsCaptor = ArgumentCaptor.forClass(List.class); + verify(parent).copy(any(), inputsCaptor.capture()); + List copiedInputs = inputsCaptor.getValue(); + assertNotNull(copiedInputs); + assertInstanceOf(ProfileEnumerableRel.class, copiedInputs.getFirst()); + assertEquals("Parent", plan.planRoot().snapshot().getNode()); + } + + private EnumerableRel mockEnumerableRel(String name, List inputs) { + EnumerableRel rel = mock(EnumerableRel.class); + when(rel.getRelTypeName()).thenReturn(name); + when(rel.getInputs()).thenReturn(inputs); + when(rel.getCluster()).thenReturn(cluster); + when(rel.getTraitSet()).thenReturn(traitSet); + return rel; + } + + private EnumerableRel mockScannableRel(String name, List inputs) { + EnumerableRel rel = + mock( + EnumerableRel.class, + org.mockito.Mockito.withSettings().extraInterfaces(Scannable.class)); + when(rel.getRelTypeName()).thenReturn(name); + when(rel.getInputs()).thenReturn(inputs); + when(rel.getCluster()).thenReturn(cluster); + when(rel.getTraitSet()).thenReturn(traitSet); + return rel; + } +} diff --git a/core/src/test/java/org/opensearch/sql/calcite/profile/ProfileEnumerableRelTest.java b/core/src/test/java/org/opensearch/sql/calcite/profile/ProfileEnumerableRelTest.java new file mode 100644 index 0000000000..53d7aee7c8 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/calcite/profile/ProfileEnumerableRelTest.java @@ -0,0 +1,88 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.profile; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.monitor.profile.ProfilePlanNodeMetrics; + +class ProfileEnumerableRelTest { + + @Test + void profileReturnsSameEnumerableWhenMetricsMissing() { + Enumerable enumerable = new TestEnumerable(2); + + Enumerable result = ProfileEnumerableRel.profile(enumerable, null); + + assertSame(enumerable, result); + } + + @Test + void profileTracksRowsAndTime() { + ProfilePlanNodeMetrics metrics = new ProfilePlanNodeMetrics(); + Enumerable enumerable = new TestEnumerable(2); + + Enumerator enumerator = ProfileEnumerableRel.profile(enumerable, metrics).enumerator(); + while (enumerator.moveNext()) { + enumerator.current(); + } + enumerator.close(); + + assertEquals(2, metrics.rows()); + assertTrue(metrics.timeNanos() > 0); + } + + private static final class TestEnumerable extends AbstractEnumerable { + private final int size; + + private TestEnumerable(int size) { + this.size = size; + } + + @Override + public Enumerator enumerator() { + return new Enumerator<>() { + private int index = -1; + + @Override + public Integer current() { + return index; + } + + @Override + public boolean moveNext() { + sleepMillis(1); + index++; + return index < size; + } + + @Override + public void reset() { + index = -1; + } + + @Override + public void close() { + sleepMillis(1); + } + }; + } + } + + private static void sleepMillis(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/docs/user/ppl/interfaces/endpoint.md b/docs/user/ppl/interfaces/endpoint.md index 981d20e658..61edf94c13 100644 --- a/docs/user/ppl/interfaces/endpoint.md +++ b/docs/user/ppl/interfaces/endpoint.md @@ -180,6 +180,14 @@ Expected output (trimmed): "optimize": { "time_ms": 18.2 }, "execute": { "time_ms": 4.87 }, "format": { "time_ms": 0.05 } + }, + "plan": { + "node": "EnumerableCalc", + "time_ms": 4.82, + "rows": 2, + "children": [ + { "node": "CalciteEnumerableIndexScan", "time_ms": 4.12, "rows": 2 } + ] } } } @@ -189,3 +197,6 @@ Expected output (trimmed): - Profile output is only returned when the query finishes successfully. - Profiling runs only when Calcite is enabled. +- Plan node names use Calcite physical operator names (for example, `EnumerableCalc` or `CalciteEnumerableIndexScan`). +- Plan `time_ms` is inclusive of child operators and represents wall-clock time; overlapping work can make summed plan times exceed `summary.total_time_ms`. +- Scan nodes reflect operator wall-clock time; background prefetch can make scan time smaller than total request latency. diff --git a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/api/ppl.profile.yml b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/api/ppl.profile.yml index 8a9ff9b23a..882757fd8b 100644 --- a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/api/ppl.profile.yml +++ b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/api/ppl.profile.yml @@ -54,6 +54,8 @@ teardown: - gt: {profile.phases.optimize.time_ms: 0.0} - gt: {profile.phases.execute.time_ms: 0.0} - gt: {profile.phases.format.time_ms: 0.0} + - gt: {profile.plan.time_ms: 0.0} + - match: {profile.plan.rows: 2} --- "Profile ignored for explain api": diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index fd852d7f89..2868559e36 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -210,8 +210,14 @@ public void execute( client.schedule( () -> { try (PreparedStatement statement = OpenSearchRelRunners.run(context, rel)) { + ProfileMetric metric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE); + long execTime = System.nanoTime(); ResultSet result = statement.executeQuery(); - buildResultSet(result, rel.getRowType(), context.sysLimit.querySizeLimit(), listener); + QueryResponse response = + buildResultSet(result, rel.getRowType(), context.sysLimit.querySizeLimit()); + metric.add(System.nanoTime() - execTime); + listener.onResponse(response); + } catch (SQLException e) { throw new RuntimeException(e); } @@ -250,14 +256,8 @@ private static Object processValue(Object value) { return value; } - private void buildResultSet( - ResultSet resultSet, - RelDataType rowTypes, - Integer querySizeLimit, - ResponseListener listener) - throws SQLException { - ProfileMetric metric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE); - long execTime = System.nanoTime(); + private QueryResponse buildResultSet( + ResultSet resultSet, RelDataType rowTypes, Integer querySizeLimit) throws SQLException { // Get the ResultSet metadata to know about columns ResultSetMetaData metaData = resultSet.getMetaData(); int columnCount = metaData.getColumnCount(); @@ -300,8 +300,7 @@ private void buildResultSet( } Schema schema = new Schema(columns); QueryResponse response = new QueryResponse(schema, values, null); - metric.add(System.nanoTime() - execTime); - listener.onResponse(response); + return response; } /** Registers opensearch-dependent functions */ diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java index 1d05f82fc2..a3a6954cad 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java @@ -41,9 +41,6 @@ import org.opensearch.search.sort.FieldSortBuilder; import org.opensearch.search.sort.ShardDocSortBuilder; import org.opensearch.search.sort.SortBuilders; -import org.opensearch.sql.monitor.profile.MetricName; -import org.opensearch.sql.monitor.profile.ProfileMetric; -import org.opensearch.sql.monitor.profile.QueryProfiling; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; import org.opensearch.sql.opensearch.response.OpenSearchResponse; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; @@ -209,8 +206,6 @@ private OpenSearchResponse search(Function search new OpenSearchResponse( SearchHits.empty(), exprValueFactory, includes, isCountAggRequest()); } else { - ProfileMetric metric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE); - long executionStartTime = System.nanoTime(); // Set afterKey to request, null for first round (afterKey is null in the beginning). if (this.sourceBuilder.aggregations() != null) { this.sourceBuilder.aggregations().getAggregatorFactories().stream() @@ -243,7 +238,6 @@ private OpenSearchResponse search(Function search searchDone = true; } needClean = searchDone; - metric.add(System.nanoTime() - executionStartTime); } return openSearchResponse; } @@ -255,8 +249,6 @@ public OpenSearchResponse searchWithPIT(Function new OpenSearchResponse( SearchHits.empty(), exprValueFactory, includes, isCountAggRequest()); } else { - ProfileMetric metric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE); - long executionStartTime = System.nanoTime(); this.sourceBuilder.pointInTimeBuilder(new PointInTimeBuilder(this.pitId)); this.sourceBuilder.timeout(cursorKeepAlive); // check for search after @@ -299,7 +291,6 @@ public OpenSearchResponse searchWithPIT(Function LOG.debug(sourceBuilder); } } - metric.add(System.nanoTime() - executionStartTime); } return openSearchResponse; }