diff --git a/common/src/main/java/org/opensearch/sql/common/utils/QueryContext.java b/common/src/main/java/org/opensearch/sql/common/utils/QueryContext.java index 686263238aa..4d4301df473 100644 --- a/common/src/main/java/org/opensearch/sql/common/utils/QueryContext.java +++ b/common/src/main/java/org/opensearch/sql/common/utils/QueryContext.java @@ -20,6 +20,8 @@ public class QueryContext { /** The key of the request id in the context map. */ private static final String REQUEST_ID_KEY = "request_id"; + private static final String PROFILE_KEY = "profile"; + /** * Generates a random UUID and adds to the {@link ThreadContext} as the request id. * @@ -66,4 +68,20 @@ private QueryContext() { throw new AssertionError( getClass().getCanonicalName() + " is a utility class and must not be initialized"); } + + /** + * Store the profile flag in thread context. + * + * @param profileEnabled whether profiling is enabled + */ + public static void setProfile(boolean profileEnabled) { + ThreadContext.put(PROFILE_KEY, Boolean.toString(profileEnabled)); + } + + /** + * @return true if profiling flag is set in the thread context. + */ + public static boolean isProfileEnabled() { + return Boolean.parseBoolean(ThreadContext.get(PROFILE_KEY)); + } } diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java index a5cdf0f45f0..d7a02cd7f28 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java @@ -28,6 +28,7 @@ package org.opensearch.sql.calcite.utils; import static java.util.Objects.requireNonNull; +import static org.opensearch.sql.monitor.profile.MetricName.OPTIMIZE; import com.google.common.collect.ImmutableList; import java.lang.reflect.Type; @@ -91,6 +92,8 @@ import org.opensearch.sql.calcite.plan.OpenSearchRules; import org.opensearch.sql.calcite.plan.Scannable; import org.opensearch.sql.expression.function.PPLBuiltinOperators; +import org.opensearch.sql.monitor.profile.ProfileMetric; +import org.opensearch.sql.monitor.profile.QueryProfiling; /** * Calcite Tools Helper. This class is used to create customized: 1. Connection 2. JavaTypeFactory @@ -340,6 +343,8 @@ public static class OpenSearchRelRunners { * org.apache.calcite.tools.RelRunners#run(RelNode)} */ public static PreparedStatement run(CalcitePlanContext context, RelNode rel) { + ProfileMetric optimizeTime = QueryProfiling.current().getOrCreateMetric(OPTIMIZE); + long startTime = System.nanoTime(); final RelShuttle shuttle = new RelHomogeneousShuttle() { @Override @@ -358,7 +363,9 @@ public RelNode visit(TableScan scan) { // the line we changed here try (Connection connection = context.connection) { final RelRunner runner = connection.unwrap(RelRunner.class); - return runner.prepareStatement(rel); + PreparedStatement preparedStatement = runner.prepareStatement(rel); + optimizeTime.set(System.nanoTime() - startTime); + return preparedStatement; } catch (SQLException e) { // Detect if error is due to window functions in unsupported context (bins on time fields) String errorMsg = e.getMessage(); diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index c85849df725..27539c0d46c 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -40,9 +40,14 @@ import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.common.utils.QueryContext; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.exception.CalciteUnsupportedException; import org.opensearch.sql.exception.NonFallbackCalciteException; +import org.opensearch.sql.monitor.profile.MetricName; +import org.opensearch.sql.monitor.profile.ProfileContext; +import org.opensearch.sql.monitor.profile.ProfileMetric; +import org.opensearch.sql.monitor.profile.QueryProfiling; import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.Planner; import org.opensearch.sql.planner.logical.LogicalPaginate; @@ -98,6 +103,10 @@ public void executeWithCalcite( CalcitePlanContext.run( () -> { try { + ProfileContext profileContext = + QueryProfiling.activate(QueryContext.isProfileEnabled()); + ProfileMetric analyzeMetric = profileContext.getOrCreateMetric(MetricName.ANALYZE); + long analyzeStart = System.nanoTime(); CalcitePlanContext context = CalcitePlanContext.create( buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType); @@ -105,6 +114,7 @@ public void executeWithCalcite( relNode = mergeAdjacentFilters(relNode); RelNode optimized = optimize(relNode, context); RelNode calcitePlan = convertToCalcitePlan(optimized); + analyzeMetric.set(System.nanoTime() - analyzeStart); executionEngine.execute(calcitePlan, context, listener); } catch (Throwable t) { if (isCalciteFallbackAllowed(t) && !(t instanceof NonFallbackCalciteException)) { @@ -137,6 +147,7 @@ public void explainWithCalcite( CalcitePlanContext.run( () -> { try { + QueryProfiling.noop(); CalcitePlanContext context = CalcitePlanContext.create( buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType); diff --git a/core/src/main/java/org/opensearch/sql/monitor/profile/DefaultMetricImpl.java b/core/src/main/java/org/opensearch/sql/monitor/profile/DefaultMetricImpl.java new file mode 100644 index 00000000000..511bfaac3a8 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/monitor/profile/DefaultMetricImpl.java @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.monitor.profile; + +import java.util.concurrent.atomic.LongAdder; + +/** Concrete metric backed by {@link LongAdder}. */ +final class DefaultMetricImpl implements ProfileMetric { + + private final String name; + private final LongAdder value = new LongAdder(); + + /** + * Construct a metric with the provided name. + * + * @param name metric name + */ + DefaultMetricImpl(String name) { + this.name = name; + } + + @Override + public String name() { + return name; + } + + @Override + public long value() { + return value.sum(); + } + + @Override + public void add(long delta) { + value.add(delta); + } + + @Override + public void set(long value) { + this.value.reset(); + this.value.add(value); + } +} diff --git a/core/src/main/java/org/opensearch/sql/monitor/profile/DefaultProfileContext.java b/core/src/main/java/org/opensearch/sql/monitor/profile/DefaultProfileContext.java new file mode 100644 index 00000000000..c69a4e38a2b --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/monitor/profile/DefaultProfileContext.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.monitor.profile; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** Default implementation that records profiling metrics. */ +public class DefaultProfileContext implements ProfileContext { + + private final long startNanos = System.nanoTime(); + private boolean finished; + private final Map metrics = new ConcurrentHashMap<>(); + private QueryProfile profile; + + public DefaultProfileContext() {} + + /** {@inheritDoc} */ + @Override + public ProfileMetric getOrCreateMetric(MetricName name) { + Objects.requireNonNull(name, "name"); + return metrics.computeIfAbsent(name, key -> new DefaultMetricImpl(key.name())); + } + + /** {@inheritDoc} */ + @Override + public synchronized QueryProfile finish() { + if (finished) { + return profile; + } + finished = true; + long endNanos = System.nanoTime(); + Map snapshot = new LinkedHashMap<>(MetricName.values().length); + for (MetricName metricName : MetricName.values()) { + DefaultMetricImpl metric = metrics.get(metricName); + double millis = metric == null ? 0d : roundToMillis(metric.value()); + snapshot.put(metricName, millis); + } + double totalMillis = roundToMillis(endNanos - startNanos); + profile = new QueryProfile(totalMillis, snapshot); + return profile; + } + + private double roundToMillis(long nanos) { + return Math.round((nanos / 1_000_000.0d) * 100.0d) / 100.0d; + } +} diff --git a/core/src/main/java/org/opensearch/sql/monitor/profile/MetricName.java b/core/src/main/java/org/opensearch/sql/monitor/profile/MetricName.java new file mode 100644 index 00000000000..987d7dfb647 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/monitor/profile/MetricName.java @@ -0,0 +1,14 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.monitor.profile; + +/** Named metrics used by query profiling. */ +public enum MetricName { + ANALYZE, + OPTIMIZE, + EXECUTE, + FORMAT +} diff --git a/core/src/main/java/org/opensearch/sql/monitor/profile/NoopProfileContext.java b/core/src/main/java/org/opensearch/sql/monitor/profile/NoopProfileContext.java new file mode 100644 index 00000000000..b3f2a362e38 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/monitor/profile/NoopProfileContext.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.monitor.profile; + +import java.util.Objects; + +/** Disabled profiling context. */ +public final class NoopProfileContext implements ProfileContext { + + public static final NoopProfileContext INSTANCE = new NoopProfileContext(); + + private NoopProfileContext() {} + + /** {@inheritDoc} */ + @Override + public ProfileMetric getOrCreateMetric(MetricName name) { + Objects.requireNonNull(name, "name"); + return NoopProfileMetric.INSTANCE; + } + + /** {@inheritDoc} */ + @Override + public QueryProfile finish() { + return null; + } +} diff --git a/core/src/main/java/org/opensearch/sql/monitor/profile/NoopProfileMetric.java b/core/src/main/java/org/opensearch/sql/monitor/profile/NoopProfileMetric.java new file mode 100644 index 00000000000..c75e2f8f2ad --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/monitor/profile/NoopProfileMetric.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.monitor.profile; + +/** No-op metric implementation. */ +final class NoopProfileMetric implements ProfileMetric { + + static final NoopProfileMetric INSTANCE = new NoopProfileMetric(); + + private NoopProfileMetric() {} + + /** {@inheritDoc} */ + @Override + public String name() { + return ""; + } + + /** {@inheritDoc} */ + @Override + public long value() { + return 0; + } + + /** {@inheritDoc} */ + @Override + public void add(long delta) {} + + /** {@inheritDoc} */ + @Override + public void set(long value) {} +} diff --git a/core/src/main/java/org/opensearch/sql/monitor/profile/ProfileContext.java b/core/src/main/java/org/opensearch/sql/monitor/profile/ProfileContext.java new file mode 100644 index 00000000000..044a71c7d46 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/monitor/profile/ProfileContext.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.monitor.profile; + +/** Context for collecting profiling metrics during query execution. */ +public interface ProfileContext { + /** + * Obtain or create a metric with the provided name. + * + * @param name fully qualified metric name + * @return metric instance + */ + ProfileMetric getOrCreateMetric(MetricName name); + + /** + * Finalize profiling and return a snapshot. + * + * @return immutable query profile snapshot + */ + QueryProfile finish(); +} diff --git a/core/src/main/java/org/opensearch/sql/monitor/profile/ProfileMetric.java b/core/src/main/java/org/opensearch/sql/monitor/profile/ProfileMetric.java new file mode 100644 index 00000000000..ba3e85a0953 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/monitor/profile/ProfileMetric.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.monitor.profile; + +/** Metric for query profiling. */ +public interface ProfileMetric { + /** + * @return metric name. + */ + String name(); + + /** + * @return current metric value. + */ + long value(); + + /** + * Increment the metric by the given delta. + * + * @param delta amount to add + */ + void add(long delta); + + /** + * Set the metric to the provided value. + * + * @param value new metric value + */ + void set(long value); +} diff --git a/core/src/main/java/org/opensearch/sql/monitor/profile/QueryProfile.java b/core/src/main/java/org/opensearch/sql/monitor/profile/QueryProfile.java new file mode 100644 index 00000000000..c766308a122 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/monitor/profile/QueryProfile.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.monitor.profile; + +import com.google.gson.annotations.SerializedName; +import java.util.LinkedHashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import lombok.Getter; + +/** Immutable snapshot of query profiling metrics. */ +@Getter +public final class QueryProfile { + + private final Summary summary; + + private final Map phases; + + /** + * Create a new query profile snapshot. + * + * @param totalTimeMillis total elapsed milliseconds for the query (rounded to two decimals) + * @param phases metric values keyed by {@link MetricName} + */ + public QueryProfile(double totalTimeMillis, Map phases) { + this.summary = new Summary(totalTimeMillis); + this.phases = buildPhases(phases); + } + + private Map buildPhases(Map phases) { + Objects.requireNonNull(phases, "phases"); + Map ordered = new LinkedHashMap<>(MetricName.values().length); + for (MetricName metricName : MetricName.values()) { + Double value = phases.getOrDefault(metricName, 0d); + ordered.put(metricName.name().toLowerCase(Locale.ROOT), new Phase(value)); + } + return ordered; + } + + @Getter + public static final class Summary { + + @SerializedName("total_time_ms") + private final double totalTimeMillis; + + private Summary(double totalTimeMillis) { + this.totalTimeMillis = totalTimeMillis; + } + } + + @Getter + public static final class Phase { + + @SerializedName("time_ms") + private final double timeMillis; + + private Phase(double timeMillis) { + this.timeMillis = timeMillis; + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/monitor/profile/QueryProfiling.java b/core/src/main/java/org/opensearch/sql/monitor/profile/QueryProfiling.java new file mode 100644 index 00000000000..3ef32dac748 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/monitor/profile/QueryProfiling.java @@ -0,0 +1,74 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.monitor.profile; + +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Thread-local holder for query profiling contexts. + * + *

Callers can enable or disable profiling per-thread via {@link #activate(boolean)} and obtain + * the active context with {@link #current()}. + */ +public final class QueryProfiling { + + private static final ThreadLocal CURRENT = new ThreadLocal<>(); + + private QueryProfiling() {} + + /** + * @return the profiling context bound to this thread, or a no-op context if not activated. + */ + public static ProfileContext current() { + ProfileContext ctx = CURRENT.get(); + return ctx == null ? NoopProfileContext.INSTANCE : ctx; + } + + /** + * Create noop profiling for the current thread. + * + * @return newly activated profiling context + */ + public static ProfileContext noop() { + return activate(false); + } + + /** + * Activate profiling for the current thread. + * + * @param profilingEnabled whether profiling should be enabled + * @return newly activated profiling context + */ + public static ProfileContext activate(boolean profilingEnabled) { + if (profilingEnabled) { + CURRENT.set(new DefaultProfileContext()); + } else { + CURRENT.set(NoopProfileContext.INSTANCE); + } + return CURRENT.get(); + } + + /** Clear any profiling context bound to the current thread. */ + public static void clear() { + CURRENT.remove(); + } + + /** + * Run a supplier with the provided profiling context bound to the current thread. + * + * @param action supplier to execute + * @return supplier result + */ + public static T withCurrentContext(ProfileContext ctx, Supplier action) { + CURRENT.set(Objects.requireNonNull(ctx, "ctx")); + try { + return action.get(); + } finally { + clear(); + } + } +} diff --git a/docs/user/ppl/interfaces/endpoint.md b/docs/user/ppl/interfaces/endpoint.md index e1e9cf705bf..8b4e19ea687 100644 --- a/docs/user/ppl/interfaces/endpoint.md +++ b/docs/user/ppl/interfaces/endpoint.md @@ -151,4 +151,41 @@ calcite: physical: | CalciteEnumerableIndexScan(table=[[OpenSearch, state_country]], PushDownContext=[[PROJECT->[name, country, state, month, year, age], FILTER->>($5, 30), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":10000,"timeout":"1m","query":{"range":{"age":{"from":30,"to":null,"include_lower":false,"include_upper":true,"boost":1.0}}},"_source":{"includes":["name","country","state","month","year","age"],"excludes":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)]) ``` - \ No newline at end of file + +## Profile (Experimental) + +You can enable profiling on the PPL endpoint to capture per-stage timings in milliseconds. Profiling is returned only for regular query execution (not explain) and only when using the default `format=jdbc`. + +### Example + +```bash ppl ignore +curl -sS -H 'Content-Type: application/json' \ + -X POST localhost:9200/_plugins/_ppl \ + -d '{ + "profile": true, + "query" : "source=accounts | fields firstname, lastname" + }' +``` + +Expected output (trimmed): + +```json +{ + "profile": { + "summary": { + "total_time_ms": 33.34 + }, + "phases": { + "analyze": { "time_ms": 8.68 }, + "optimize": { "time_ms": 18.2 }, + "execute": { "time_ms": 4.87 }, + "format": { "time_ms": 0.05 } + } + } +} +``` + +### Notes + +- Profile output is only returned when the query finishes successfully. +- Profiling runs only when Calcite is enabled. diff --git a/integ-test/src/yamlRestTest/resources/rest-api-spec/api/ppl.json b/integ-test/src/yamlRestTest/resources/rest-api-spec/api/ppl.json index 6b6e56307c4..61c5c139224 100644 --- a/integ-test/src/yamlRestTest/resources/rest-api-spec/api/ppl.json +++ b/integ-test/src/yamlRestTest/resources/rest-api-spec/api/ppl.json @@ -13,7 +13,12 @@ } ] }, - "params": {}, + "params": { + "format":{ + "type":"string", + "description":"response format: jdbc, csv, raw, viz" + } + }, "body": { "description": "PPL Query", "required":true diff --git a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/api/ppl.profile.yml b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/api/ppl.profile.yml new file mode 100644 index 00000000000..8a9ff9b23a5 --- /dev/null +++ b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/api/ppl.profile.yml @@ -0,0 +1,102 @@ +setup: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : true + - do: + indices.create: + index: ppl_profile + body: + settings: + number_of_shards: 1 + number_of_replicas: 0 + mappings: + properties: + message: + type: keyword + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "ppl_profile", "_id": 1}}' + - '{"message": "hello"}' + - '{"index": {"_index": "ppl_profile", "_id": 2}}' + - '{"message": "world"}' + +--- +teardown: + - do: + indices.delete: + index: ppl_profile + ignore_unavailable: true + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : false + +--- +"Profile metrics returned for ppl query": + - skip: + features: + - headers + - allowed_warnings + - do: + headers: + Content-Type: 'application/json' + ppl: + body: + query: 'source=ppl_profile | fields message' + profile: true + - gt: {profile.summary.total_time_ms: 0.0} + - gt: {profile.phases.analyze.time_ms: 0.0} + - gt: {profile.phases.optimize.time_ms: 0.0} + - gt: {profile.phases.execute.time_ms: 0.0} + - gt: {profile.phases.format.time_ms: 0.0} + +--- +"Profile ignored for explain api": + - skip: + features: + - headers + - allowed_warnings + - do: + headers: + Content-Type: 'application/json' + ppl.explain: + body: + query: 'source=ppl_profile | fields message' + profile: true + - match: {profile: null} + +--- +"Profile ignored for explain query": + - skip: + features: + - headers + - allowed_warnings + - do: + headers: + Content-Type: 'application/json' + ppl: + body: + query: 'explain source=ppl_profile | fields message' + profile: true + - match: {profile: null} + +--- +"Profile ignored for viz format": + - skip: + features: + - headers + - allowed_warnings + - do: + headers: + Content-Type: 'application/json' + ppl: + body: + query: 'source=ppl_profile | stats count()' + profile: true + format: 'viz' + - match: {profile: null} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index 71f0c8667ff..dead64899bc 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -51,6 +51,9 @@ import org.opensearch.sql.executor.pagination.PlanSerializer; import org.opensearch.sql.expression.function.BuiltinFunctionName; import org.opensearch.sql.expression.function.PPLFuncImpTable; +import org.opensearch.sql.monitor.profile.MetricName; +import org.opensearch.sql.monitor.profile.ProfileMetric; +import org.opensearch.sql.monitor.profile.QueryProfiling; import org.opensearch.sql.opensearch.client.OpenSearchClient; import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector; import org.opensearch.sql.opensearch.functions.DistinctCountApproxAggFunction; @@ -218,6 +221,8 @@ private void buildResultSet( Integer querySizeLimit, ResponseListener listener) throws SQLException { + ProfileMetric metric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE); + long execTime = System.nanoTime(); // Get the ResultSet metadata to know about columns ResultSetMetaData metaData = resultSet.getMetaData(); int columnCount = metaData.getColumnCount(); @@ -262,6 +267,7 @@ private void buildResultSet( } Schema schema = new Schema(columns); QueryResponse response = new QueryResponse(schema, values, null); + metric.add(System.nanoTime() - execTime); listener.onResponse(response); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java index 04af888cfdd..1d05f82fc2a 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java @@ -21,7 +21,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.TestOnly; -import org.opensearch.action.search.*; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchScrollRequest; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentType; @@ -39,6 +41,9 @@ import org.opensearch.search.sort.FieldSortBuilder; import org.opensearch.search.sort.ShardDocSortBuilder; import org.opensearch.search.sort.SortBuilders; +import org.opensearch.sql.monitor.profile.MetricName; +import org.opensearch.sql.monitor.profile.ProfileMetric; +import org.opensearch.sql.monitor.profile.QueryProfiling; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; import org.opensearch.sql.opensearch.response.OpenSearchResponse; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; @@ -204,6 +209,8 @@ private OpenSearchResponse search(Function search new OpenSearchResponse( SearchHits.empty(), exprValueFactory, includes, isCountAggRequest()); } else { + ProfileMetric metric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE); + long executionStartTime = System.nanoTime(); // Set afterKey to request, null for first round (afterKey is null in the beginning). if (this.sourceBuilder.aggregations() != null) { this.sourceBuilder.aggregations().getAggregatorFactories().stream() @@ -236,6 +243,7 @@ private OpenSearchResponse search(Function search searchDone = true; } needClean = searchDone; + metric.add(System.nanoTime() - executionStartTime); } return openSearchResponse; } @@ -247,6 +255,8 @@ public OpenSearchResponse searchWithPIT(Function new OpenSearchResponse( SearchHits.empty(), exprValueFactory, includes, isCountAggRequest()); } else { + ProfileMetric metric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE); + long executionStartTime = System.nanoTime(); this.sourceBuilder.pointInTimeBuilder(new PointInTimeBuilder(this.pitId)); this.sourceBuilder.timeout(cursorKeepAlive); // check for search after @@ -289,6 +299,7 @@ public OpenSearchResponse searchWithPIT(Function LOG.debug(sourceBuilder); } } + metric.add(System.nanoTime() - executionStartTime); } return openSearchResponse; } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScanner.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScanner.java index a0b33f3b541..65ba189b0d9 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScanner.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScanner.java @@ -17,6 +17,8 @@ import org.opensearch.OpenSearchSecurityException; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.exception.NonFallbackCalciteException; +import org.opensearch.sql.monitor.profile.ProfileContext; +import org.opensearch.sql.monitor.profile.QueryProfiling; import org.opensearch.sql.opensearch.client.OpenSearchClient; import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.response.OpenSearchResponse; @@ -102,8 +104,11 @@ public boolean isScanDone() { */ public void startScanning(OpenSearchRequest request) { if (isAsync()) { + ProfileContext ctx = QueryProfiling.current(); nextBatchFuture = - CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor); + CompletableFuture.supplyAsync( + () -> QueryProfiling.withCurrentContext(ctx, () -> client.search(request)), + backgroundExecutor); } } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java b/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java index ec09d8b5bde..0908d27b491 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java @@ -23,6 +23,7 @@ public class PPLQueryRequestFactory { private static final String DEFAULT_RESPONSE_FORMAT = "jdbc"; private static final String DEFAULT_EXPLAIN_FORMAT = "standard"; private static final String QUERY_PARAMS_PRETTY = "pretty"; + private static final String QUERY_PARAMS_PROFILE = "profile"; /** * Build {@link PPLQueryRequest} from {@link RestRequest}. @@ -59,12 +60,17 @@ private static PPLQueryRequest parsePPLRequestFromPayload(RestRequest restReques boolean pretty = getPrettyOption(restRequest.params()); try { jsonContent = new JSONObject(content); + boolean profileRequested = jsonContent.optBoolean(QUERY_PARAMS_PROFILE, false); + String queryString = jsonContent.optString(PPL_FIELD_NAME, ""); + boolean enableProfile = + profileRequested && isProfileSupported(restRequest.path(), format, queryString); PPLQueryRequest pplRequest = new PPLQueryRequest( jsonContent.getString(PPL_FIELD_NAME), jsonContent, restRequest.path(), - format.getFormatName()); + format.getFormatName(), + enableProfile); // set sanitize option if csv format if (format.equals(Format.CSV)) { pplRequest.sanitize(getSanitizeOption(restRequest.params())); @@ -115,4 +121,12 @@ private static boolean getPrettyOption(Map requestParams) { } return false; } + + private static boolean isProfileSupported(String path, Format format, String query) { + boolean explainPath = isExplainRequest(path); + boolean explainQuery = query != null && query.trim().toLowerCase().startsWith("explain"); + boolean isJdbcFormat = + format != null && DEFAULT_RESPONSE_FORMAT.equalsIgnoreCase(format.getFormatName()); + return !explainPath && !explainQuery && isJdbcFormat; + } } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java index 83a419cca66..27bfe2084f7 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java @@ -30,6 +30,7 @@ import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.monitor.profile.QueryProfiling; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.plugin.config.OpenSearchPluginModule; import org.opensearch.sql.ppl.PPLService; @@ -108,15 +109,17 @@ protected void doExecute( TransportPPLQueryRequest transportRequest = TransportPPLQueryRequest.fromActionRequest(request); // in order to use PPL service, we need to convert TransportPPLQueryRequest to PPLQueryRequest PPLQueryRequest transformedRequest = transportRequest.toPPLQueryRequest(); + QueryContext.setProfile(transformedRequest.profile()); + ActionListener clearingListener = wrapWithProfilingClear(listener); if (transformedRequest.isExplainRequest()) { pplService.explain( - transformedRequest, createExplainResponseListener(transformedRequest, listener)); + transformedRequest, createExplainResponseListener(transformedRequest, clearingListener)); } else { pplService.execute( transformedRequest, - createListener(transformedRequest, listener), - createExplainResponseListener(transformedRequest, listener)); + createListener(transformedRequest, clearingListener), + createExplainResponseListener(transformedRequest, clearingListener)); } } @@ -202,4 +205,27 @@ private Format format(PPLQueryRequest pplRequest) { String.format(Locale.ROOT, "response in %s format is not supported.", format)); } } + + private ActionListener wrapWithProfilingClear( + ActionListener delegate) { + return new ActionListener<>() { + @Override + public void onResponse(TransportPPLQueryResponse transportPPLQueryResponse) { + try { + delegate.onResponse(transportPPLQueryResponse); + } finally { + QueryProfiling.clear(); + } + } + + @Override + public void onFailure(Exception e) { + try { + delegate.onFailure(e); + } finally { + QueryProfiling.clear(); + } + } + }; + } } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java index 8cdf27ef3c8..65663bbec00 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java @@ -45,6 +45,11 @@ public class TransportPPLQueryRequest extends ActionRequest { @Accessors(fluent = true) private JsonResponseFormatter.Style style = JsonResponseFormatter.Style.COMPACT; + @Setter + @Getter + @Accessors(fluent = true) + private boolean profile = false; + /** Constructor of TransportPPLQueryRequest from PPLQueryRequest. */ public TransportPPLQueryRequest(PPLQueryRequest pplQueryRequest) { pplQuery = pplQueryRequest.getRequest(); @@ -53,6 +58,7 @@ public TransportPPLQueryRequest(PPLQueryRequest pplQueryRequest) { format = pplQueryRequest.getFormat(); sanitize = pplQueryRequest.sanitize(); style = pplQueryRequest.style(); + profile = pplQueryRequest.profile(); } /** Constructor of TransportPPLQueryRequest from StreamInput. */ @@ -65,6 +71,7 @@ public TransportPPLQueryRequest(StreamInput in) throws IOException { path = in.readOptionalString(); sanitize = in.readBoolean(); style = in.readEnum(JsonResponseFormatter.Style.class); + profile = in.readBoolean(); } /** Re-create the object from the actionRequest. */ @@ -95,6 +102,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(path); out.writeBoolean(sanitize); out.writeEnum(style); + out.writeBoolean(profile); } public String getRequest() { @@ -128,7 +136,8 @@ public ActionRequestValidationException validate() { /** Convert to PPLQueryRequest. */ public PPLQueryRequest toPPLQueryRequest() { - PPLQueryRequest pplQueryRequest = new PPLQueryRequest(pplQuery, jsonContent, path, format); + PPLQueryRequest pplQueryRequest = + new PPLQueryRequest(pplQuery, jsonContent, path, format, profile); pplQueryRequest.sanitize(sanitize); pplQueryRequest.style(style); return pplQueryRequest; diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java index ca351fcc0a7..321e1d410c4 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java @@ -35,16 +35,27 @@ public class PPLQueryRequest { @Accessors(fluent = true) private JsonResponseFormatter.Style style = JsonResponseFormatter.Style.COMPACT; + @Setter + @Getter + @Accessors(fluent = true) + private boolean profile = false; + public PPLQueryRequest(String pplQuery, JSONObject jsonContent, String path) { this(pplQuery, jsonContent, path, ""); } - /** Constructor of PPLQueryRequest. */ public PPLQueryRequest(String pplQuery, JSONObject jsonContent, String path, String format) { + this(pplQuery, jsonContent, path, format, false); + } + + /** Constructor of PPLQueryRequest. */ + public PPLQueryRequest( + String pplQuery, JSONObject jsonContent, String path, String format, boolean profile) { this.pplQuery = pplQuery; this.jsonContent = jsonContent; this.path = Optional.ofNullable(path).orElse(DEFAULT_PPL_PATH); this.format = format; + this.profile = profile; } public String getRequest() { diff --git a/protocol/src/main/java/org/opensearch/sql/protocol/response/format/SimpleJsonResponseFormatter.java b/protocol/src/main/java/org/opensearch/sql/protocol/response/format/SimpleJsonResponseFormatter.java index c00174dc9fa..ff59ce4cddc 100644 --- a/protocol/src/main/java/org/opensearch/sql/protocol/response/format/SimpleJsonResponseFormatter.java +++ b/protocol/src/main/java/org/opensearch/sql/protocol/response/format/SimpleJsonResponseFormatter.java @@ -10,6 +10,10 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Singular; +import org.opensearch.sql.monitor.profile.MetricName; +import org.opensearch.sql.monitor.profile.ProfileMetric; +import org.opensearch.sql.monitor.profile.QueryProfile; +import org.opensearch.sql.monitor.profile.QueryProfiling; import org.opensearch.sql.protocol.response.QueryResult; /** @@ -40,6 +44,9 @@ public SimpleJsonResponseFormatter(Style style) { @Override public Object buildJsonObject(QueryResult response) { + ProfileMetric formatMetric = QueryProfiling.current().getOrCreateMetric(MetricName.FORMAT); + long formatTime = System.nanoTime(); + JsonResponse.JsonResponseBuilder json = JsonResponse.builder(); json.total(response.size()).size(response.size()); @@ -47,6 +54,9 @@ public Object buildJsonObject(QueryResult response) { response.columnNameTypes().forEach((name, type) -> json.column(new Column(name, type))); json.datarows(fetchDataRows(response)); + formatMetric.set(System.nanoTime() - formatTime); + + json.profile(QueryProfiling.current().finish()); return json.build(); } @@ -63,6 +73,8 @@ private Object[][] fetchDataRows(QueryResult response) { @Builder @Getter public static class JsonResponse { + private final QueryProfile profile; + @Singular("column") private final List schema;