diff --git a/core/src/main/java/org/opensearch/sql/ast/statement/Query.java b/core/src/main/java/org/opensearch/sql/ast/statement/Query.java index 6681d6d1d2c..04c44666896 100644 --- a/core/src/main/java/org/opensearch/sql/ast/statement/Query.java +++ b/core/src/main/java/org/opensearch/sql/ast/statement/Query.java @@ -7,7 +7,6 @@ import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.ToString; import org.opensearch.sql.ast.AbstractNodeVisitor; @@ -19,13 +18,41 @@ @Setter @ToString @EqualsAndHashCode(callSuper = false) -@RequiredArgsConstructor public class Query extends Statement { protected final UnresolvedPlan plan; protected final int fetchSize; private final QueryType queryType; + /** Pagination offset (0-based). Only used when fetchSize > 0. */ + protected final int paginationOffset; + + /** + * Constructor for backward compatibility (no pagination offset). + * + * @param plan the unresolved plan + * @param fetchSize page size (0 = no pagination) + * @param queryType the query type + */ + public Query(UnresolvedPlan plan, int fetchSize, QueryType queryType) { + this(plan, fetchSize, queryType, 0); + } + + /** + * Constructor with pagination support. + * + * @param plan the unresolved plan + * @param fetchSize page size (0 = no pagination) + * @param queryType the query type + * @param paginationOffset offset for pagination (0-based) + */ + public Query(UnresolvedPlan plan, int fetchSize, QueryType queryType, int paginationOffset) { + this.plan = plan; + this.fetchSize = fetchSize; + this.queryType = queryType; + this.paginationOffset = paginationOffset; + } + @Override public R accept(AbstractNodeVisitor visitor, C context) { return visitor.visitQuery(this, context); diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index 669d8452dc0..552c2f45153 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -48,6 +48,21 @@ public class CalcitePlanContext { @Getter @Setter private boolean isResolvingSubquery = false; @Getter @Setter private boolean inCoalesceFunction = false; + /** Pagination offset (0-based). */ + @Getter @Setter private int paginationOffset = 0; + + /** Pagination page size. 0 means pagination is disabled. */ + @Getter @Setter private int paginationSize = 0; + + /** + * Check if pagination is enabled. + * + * @return true if paginationSize > 0 + */ + public boolean isPaginationEnabled() { + return paginationSize > 0; + } + /** * The flag used to determine whether we do metadata field projection for user 1. If a project is * never visited, we will do metadata field projection for user 2. Else not because user may diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/LogicalSystemLimit.java b/core/src/main/java/org/opensearch/sql/calcite/plan/LogicalSystemLimit.java index 84c448c8387..5aae8074b4d 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/LogicalSystemLimit.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/LogicalSystemLimit.java @@ -35,6 +35,13 @@ public enum SystemLimitType { JOIN_SUBSEARCH_MAXOUT, /** Max output to return from a subsearch. */ SUBSEARCH_MAXOUT, + /** + * Pagination limit type for API pagination support. + * + *

This type is used to apply LIMIT/OFFSET for paginating query results. Unlike + * QUERY_SIZE_LIMIT, pagination is applied as post-processing after the user's query executes. + */ + PAGINATION, } @Getter private final SystemLimitType type; diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java index c85849df725..8c3864bcedb 100644 --- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java +++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java @@ -31,6 +31,7 @@ import org.opensearch.sql.analysis.AnalysisContext; import org.opensearch.sql.analysis.Analyzer; import org.opensearch.sql.ast.statement.Explain; +import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.calcite.CalciteRelNodeVisitor; @@ -71,10 +72,33 @@ public void execute( UnresolvedPlan plan, QueryType queryType, ResponseListener listener) { + execute(plan, queryType, listener, 0, 0); + } + + /** + * Execute the {@link UnresolvedPlan} with pagination support. + * + * @param plan the unresolved plan + * @param queryType the query type + * @param listener the response listener + * @param pageSize page size (0 = no pagination) + * @param offset pagination offset (0-based) + */ + public void execute( + UnresolvedPlan plan, + QueryType queryType, + ResponseListener listener, + int pageSize, + int offset) { if (shouldUseCalcite(queryType)) { - executeWithCalcite(plan, queryType, listener); + executeWithCalcite(plan, queryType, listener, pageSize, offset); } else { - executeWithLegacy(plan, queryType, listener, Optional.empty()); + // For legacy path (SQL), wrap with Paginate when pagination is enabled + if (pageSize > 0) { + executeWithLegacy(new Paginate(pageSize, plan), queryType, listener, Optional.empty()); + } else { + executeWithLegacy(plan, queryType, listener, Optional.empty()); + } } } @@ -91,16 +115,71 @@ public void explain( } } + /** + * Explain with pagination support. + * + * @param plan the unresolved plan + * @param queryType the query type + * @param listener the response listener + * @param format the explain format + * @param pageSize page size for pagination + * @param offset pagination offset (0-based) + */ + public void explain( + UnresolvedPlan plan, + QueryType queryType, + ResponseListener listener, + Explain.ExplainFormat format, + int pageSize, + int offset) { + if (shouldUseCalcite(queryType)) { + explainWithCalcite(plan, queryType, listener, format, pageSize, offset); + } else { + // For legacy path (SQL), wrap with Paginate when pagination is enabled + if (pageSize > 0) { + explainWithLegacy( + new Paginate(pageSize, plan), queryType, listener, format, Optional.empty()); + } else { + explainWithLegacy(plan, queryType, listener, format, Optional.empty()); + } + } + } + public void executeWithCalcite( UnresolvedPlan plan, QueryType queryType, ResponseListener listener) { + executeWithCalcite(plan, queryType, listener, 0, 0); + } + + /** + * Execute with Calcite engine and pagination support. + * + * @param plan the unresolved plan + * @param queryType the query type + * @param listener the response listener + * @param pageSize page size (0 = no pagination) + * @param offset pagination offset (0-based) + */ + public void executeWithCalcite( + UnresolvedPlan plan, + QueryType queryType, + ResponseListener listener, + int pageSize, + int offset) { CalcitePlanContext.run( () -> { try { CalcitePlanContext context = CalcitePlanContext.create( buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType); + + // Set pagination parameters if enabled + if (pageSize > 0) { + context.setPaginationSize(pageSize); + context.setPaginationOffset(offset); + } + RelNode relNode = analyze(plan, context); relNode = mergeAdjacentFilters(relNode); RelNode optimized = optimize(relNode, context); @@ -134,12 +213,39 @@ public void explainWithCalcite( QueryType queryType, ResponseListener listener, Explain.ExplainFormat format) { + explainWithCalcite(plan, queryType, listener, format, 0, 0); + } + + /** + * Explain with Calcite engine and pagination support. + * + * @param plan the unresolved plan + * @param queryType the query type + * @param listener the response listener + * @param format the explain format + * @param pageSize page size (0 = no pagination) + * @param offset pagination offset (0-based) + */ + public void explainWithCalcite( + UnresolvedPlan plan, + QueryType queryType, + ResponseListener listener, + Explain.ExplainFormat format, + int pageSize, + int offset) { CalcitePlanContext.run( () -> { try { CalcitePlanContext context = CalcitePlanContext.create( buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType); + + // Set pagination parameters if enabled + if (pageSize > 0) { + context.setPaginationSize(pageSize); + context.setPaginationOffset(offset); + } + context.run( () -> { RelNode relNode = analyze(plan, context); @@ -277,14 +383,28 @@ public PhysicalPlan plan(LogicalPlan plan) { } /** - * Try to optimize the plan by appending a limit operator for QUERY_SIZE_LIMIT Don't add for - * `EXPLAIN` to avoid changing its output plan. + * Try to optimize the plan by appending a limit operator for pagination or QUERY_SIZE_LIMIT. When + * pagination is enabled, use pagination LIMIT/OFFSET instead of system limit. + * + *

Pagination is applied AFTER the user's query executes completely. This ensures that + * user-specified commands like 'head X from Y' are respected first, and pagination only slices + * the final result. */ public RelNode optimize(RelNode plan, CalcitePlanContext context) { - return LogicalSystemLimit.create( - SystemLimitType.QUERY_SIZE_LIMIT, - plan, - context.relBuilder.literal(context.sysLimit.querySizeLimit())); + if (context.isPaginationEnabled()) { + // Apply pagination LIMIT/OFFSET on top of user's complete query result + return LogicalSystemLimit.create( + SystemLimitType.PAGINATION, + plan, + context.relBuilder.literal(context.getPaginationOffset()), + context.relBuilder.literal(context.getPaginationSize())); + } else { + // Apply system query size limit + return LogicalSystemLimit.create( + SystemLimitType.QUERY_SIZE_LIMIT, + plan, + context.relBuilder.literal(context.sysLimit.querySizeLimit())); + } } private boolean isCalciteFallbackAllowed(@Nullable Throwable t) { diff --git a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java index c37327f3df4..b4d4ff4c72e 100644 --- a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java +++ b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java @@ -6,9 +6,7 @@ package org.opensearch.sql.executor.execution; import java.util.Optional; -import org.apache.commons.lang3.NotImplementedException; import org.opensearch.sql.ast.statement.Explain; -import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.executor.ExecutionEngine; @@ -29,6 +27,9 @@ public class QueryPlan extends AbstractPlan { protected final Optional pageSize; + /** Pagination offset (0-based). Only used when pageSize is present. */ + protected final int paginationOffset; + /** Constructor. */ public QueryPlan( QueryId queryId, @@ -41,14 +42,27 @@ public QueryPlan( this.queryService = queryService; this.listener = listener; this.pageSize = Optional.empty(); + this.paginationOffset = 0; + } + + /** Constructor with page size (backward compatible, offset defaults to 0). */ + public QueryPlan( + QueryId queryId, + QueryType queryType, + UnresolvedPlan plan, + int pageSize, + QueryService queryService, + ResponseListener listener) { + this(queryId, queryType, plan, pageSize, 0, queryService, listener); } - /** Constructor with page size. */ + /** Constructor with page size and offset for pagination. */ public QueryPlan( QueryId queryId, QueryType queryType, UnresolvedPlan plan, int pageSize, + int paginationOffset, QueryService queryService, ResponseListener listener) { super(queryId, queryType); @@ -56,12 +70,14 @@ public QueryPlan( this.queryService = queryService; this.listener = listener; this.pageSize = Optional.of(pageSize); + this.paginationOffset = paginationOffset; } @Override public void execute() { if (pageSize.isPresent()) { - queryService.execute(new Paginate(pageSize.get(), plan), getQueryType(), listener); + // Use new pagination with offset for Calcite path + queryService.execute(plan, getQueryType(), listener, pageSize.get(), paginationOffset); } else { queryService.execute(plan, getQueryType(), listener); } @@ -71,9 +87,8 @@ public void execute() { public void explain( ResponseListener listener, Explain.ExplainFormat format) { if (pageSize.isPresent()) { - listener.onFailure( - new NotImplementedException( - "`explain` feature for paginated requests is not implemented yet.")); + queryService.explain( + plan, getQueryType(), listener, format, pageSize.get(), paginationOffset); } else { queryService.explain(plan, getQueryType(), listener, format); } diff --git a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlanFactory.java b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlanFactory.java index 5f10df92894..c11817769d8 100644 --- a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlanFactory.java +++ b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlanFactory.java @@ -107,17 +107,30 @@ public AbstractPlan visitQuery( context) { requireNonNull(context.getLeft(), "[BUG] query listener must be not null"); if (node.getFetchSize() > 0) { - if (canConvertToCursor(node.getPlan())) { + if (node.getQueryType() == QueryType.PPL) { + // PPL pagination - use pageSize and offset (Calcite path) return new QueryPlan( QueryId.queryId(), node.getQueryType(), node.getPlan(), node.getFetchSize(), + node.getPaginationOffset(), queryService, context.getLeft()); } else { - // This should be picked up by the legacy engine. - throw new UnsupportedCursorRequestException(); + // SQL pagination - use legacy cursor-based pagination + if (canConvertToCursor(node.getPlan())) { + return new QueryPlan( + QueryId.queryId(), + node.getQueryType(), + node.getPlan(), + node.getFetchSize(), + queryService, + context.getLeft()); + } else { + // This should be picked up by the legacy engine. + throw new UnsupportedCursorRequestException(); + } } } else { return new QueryPlan( diff --git a/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanFactoryTest.java b/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanFactoryTest.java index dd73b26a8c3..728ea33ac1c 100644 --- a/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanFactoryTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanFactoryTest.java @@ -11,7 +11,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import static org.opensearch.sql.executor.execution.QueryPlanFactory.NO_CONSUMER_RESPONSE_LISTENER; import org.junit.jupiter.api.BeforeEach; @@ -28,11 +27,9 @@ import org.opensearch.sql.ast.tree.CloseCursor; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.response.ResponseListener; -import org.opensearch.sql.exception.UnsupportedCursorRequestException; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.QueryService; import org.opensearch.sql.executor.QueryType; -import org.opensearch.sql.executor.pagination.CanPaginateVisitor; @ExtendWith(MockitoExtension.class) @DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) @@ -100,22 +97,21 @@ public void no_consumer_response_channel() { } @Test - public void create_query_with_fetch_size_which_can_be_paged() { - when(plan.accept(any(CanPaginateVisitor.class), any())).thenReturn(Boolean.TRUE); + public void create_query_with_fetch_size_should_create_query_plan() { factory = new QueryPlanFactory(queryService); - Statement query = new Query(plan, 10, queryType); + // Use PPL query type for new pagination feature + Statement query = new Query(plan, 10, QueryType.PPL); AbstractPlan queryExecution = factory.create(query, queryListener, explainListener); assertTrue(queryExecution instanceof QueryPlan); } @Test - public void create_query_with_fetch_size_which_cannot_be_paged() { - when(plan.accept(any(CanPaginateVisitor.class), any())).thenReturn(Boolean.FALSE); + public void create_query_with_fetch_size_and_offset_should_create_query_plan() { factory = new QueryPlanFactory(queryService); - Statement query = new Query(plan, 10, queryType); - assertThrows( - UnsupportedCursorRequestException.class, - () -> factory.create(query, queryListener, explainListener)); + // Use PPL query type for new pagination with offset feature + Statement query = new Query(plan, 10, QueryType.PPL, 50); + AbstractPlan queryExecution = factory.create(query, queryListener, explainListener); + assertTrue(queryExecution instanceof QueryPlan); } @Test diff --git a/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanTest.java b/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanTest.java index 809cbee9483..0f981d636fc 100644 --- a/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanTest.java @@ -6,14 +6,12 @@ package org.opensearch.sql.executor.execution; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import org.apache.commons.lang3.NotImplementedException; import org.junit.jupiter.api.DisplayNameGeneration; import org.junit.jupiter.api.DisplayNameGenerator; import org.junit.jupiter.api.Test; @@ -110,20 +108,14 @@ public void onFailure(Exception e) { } @Test - public void explain_is_not_supported_for_pagination() { - new QueryPlan(null, null, null, 0, null, null) - .explain( - new ResponseListener<>() { - @Override - public void onResponse(ExecutionEngine.ExplainResponse response) { - fail(); - } - - @Override - public void onFailure(Exception e) { - assertTrue(e instanceof NotImplementedException); - } - }, - format); + public void explain_with_pagination() { + int pageSize = 10; + int offset = 5; + QueryPlan query = + new QueryPlan(queryId, queryType, plan, pageSize, offset, queryService, queryListener); + query.explain(explainListener, format); + + verify(queryService, times(1)) + .explain(plan, queryType, explainListener, format, pageSize, offset); } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index d5e60d491bd..aac537eebfc 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -2154,4 +2154,73 @@ public void testRexStandardizationForScript() throws IOException { TEST_INDEX_BANK), true)); } + + // Pagination explain tests + + /** + * Test explain for head X from Y syntax. This verifies the explain plan shows the correct + * LIMIT/OFFSET structure. + */ + @Test + public void testExplainHeadFromOffset() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + String query = + String.format( + "source=%s | fields firstname, age | sort age | head 3 from 2", TEST_INDEX_BANK); + var result = explainQueryYaml(query); + String expected = loadExpectedPlan("explain_head_from_offset.yaml"); + assertYamlEqualsIgnoreId(expected, result); + } + + /** + * Test explain for head X from Y with filter. This verifies the explain plan shows the correct + * LIMIT/OFFSET structure with filter applied first. + */ + @Test + public void testExplainHeadFromOffsetWithFilter() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + String query = + String.format( + "source=%s | where age > 30 | fields firstname, age | sort age | head 2 from 3", + TEST_INDEX_BANK); + var result = explainQueryYaml(query); + String expected = loadExpectedPlan("explain_head_from_offset_with_filter.yaml"); + assertYamlEqualsIgnoreId(expected, result); + } + + /** + * Test explain for pagination API. This verifies that using pageSize and offset parameters + * produces a plan with LogicalSystemLimit(type=PAGINATION) applied on top. + */ + @Test + public void testExplainPaginationApi() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + String query = String.format("source=%s | fields firstname, age | sort age", TEST_INDEX_BANK); + var result = explainQueryYamlWithPagination(query, 3, 2); + String expected = loadExpectedPlan("explain_pagination_api.yaml"); + assertYamlEqualsIgnoreId(expected, result); + } + + /** + * Test using both pagination API and head X from Y together. The query uses head to limit results + * first, then pagination API applies additional LIMIT/OFFSET on top of that result. + * + *

Example: Query "head 5 from 1" returns rows 2-6 (5 rows starting from offset 1). Then + * pagination with pageSize=2, offset=1 returns rows 2-3 of that result (i.e., original rows 3-4). + */ + @Test + public void testExplainPaginationApiWithHeadFrom() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + // Query with head 5 from 1, then apply pagination pageSize=2, offset=1 + String query = + String.format( + "source=%s | fields firstname, age | sort age | head 5 from 1", TEST_INDEX_BANK); + var result = explainQueryYamlWithPagination(query, 2, 1); + + // The plan should show both: + // 1. LogicalSystemLimit with type=[PAGINATION] (from pagination API) + // 2. LogicalSort with offset and fetch (from head 5 from 1) + String expected = loadExpectedPlan("explain_pagination_api_with_head_from.yaml"); + assertYamlEqualsIgnoreId(expected, result); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/CalcitePaginationIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/CalcitePaginationIT.java new file mode 100644 index 00000000000..5f33cecc2e6 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/CalcitePaginationIT.java @@ -0,0 +1,477 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl; + +import static org.opensearch.sql.legacy.TestUtils.getResponseBody; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; +import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.QUERY_API_ENDPOINT; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import java.util.Locale; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.Assert; +import org.junit.Test; +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.Response; + +/** Integration tests for PPL pagination with Calcite execution engine. */ +public class CalcitePaginationIT extends PPLIntegTestCase { + + @Override + public void init() throws Exception { + super.init(); + loadIndex(Index.ACCOUNT); + loadIndex(Index.BANK); + enableCalcite(); + } + + @Test + public void testBasicPaginationWithAccount() throws IOException { + // Page 1: offset=0, pageSize=3, sorted by age + JSONObject page1 = + executePaginatedQuery( + String.format( + "source=%s | fields firstname, lastname, age | sort age | head 1000", + TEST_INDEX_ACCOUNT), + 3, + 0); + + verifySchema( + page1, + schema("firstname", "string"), + schema("lastname", "string"), + schema("age", "bigint")); + JSONArray datarows1 = page1.getJSONArray("datarows"); + assertEquals("Page 1 should have 3 rows", 3, datarows1.length()); + // All ages on page 1 should be 20 (youngest) + for (int i = 0; i < datarows1.length(); i++) { + int age = datarows1.getJSONArray(i).getInt(2); + assertEquals("First page should have age 20", 20, age); + } + + // Page 2: offset=3, pageSize=3 + JSONObject page2 = + executePaginatedQuery( + String.format( + "source=%s | fields firstname, lastname, age | sort age | head 1000", + TEST_INDEX_ACCOUNT), + 3, + 3); + + verifySchema( + page2, + schema("firstname", "string"), + schema("lastname", "string"), + schema("age", "bigint")); + // Verify we get different rows on page 2 (all should have age >= 20) + JSONArray datarows2 = page2.getJSONArray("datarows"); + assertEquals("Page 2 should have 3 rows", 3, datarows2.length()); + for (int i = 0; i < datarows2.length(); i++) { + int age = datarows2.getJSONArray(i).getInt(2); + assertTrue("Age should be >= 20", age >= 20); + } + } + + @Test + public void testPaginationWithFilter() throws IOException { + // Filter should be applied before pagination - get accounts with age > 35 + JSONObject page1 = + executePaginatedQuery( + String.format( + "source=%s | where age > 35 | fields firstname, age | sort age", + TEST_INDEX_ACCOUNT), + 5, + 0); + + verifySchema(page1, schema("firstname", "string"), schema("age", "bigint")); + + JSONArray datarows = page1.getJSONArray("datarows"); + assertTrue("Should return data", datarows.length() > 0); + assertTrue("Should return at most 5 rows", datarows.length() <= 5); + + // Verify all ages are > 35 + for (int i = 0; i < datarows.length(); i++) { + int age = datarows.getJSONArray(i).getInt(1); + assertTrue("Age should be > 35", age > 35); + } + } + + @Test + public void testPaginationWithOrderBy() throws IOException { + // Page 1 with order by age ascending + JSONObject page1 = + executePaginatedQuery( + String.format("source=%s | fields age | sort age | head 1000", TEST_INDEX_ACCOUNT), + 3, + 0); + + verifySchema(page1, schema("age", "bigint")); + JSONArray datarows1 = page1.getJSONArray("datarows"); + assertEquals(3, datarows1.length()); + + // Page 2 + JSONObject page2 = + executePaginatedQuery( + String.format("source=%s | fields age | sort age | head 1000", TEST_INDEX_ACCOUNT), + 3, + 3); + + verifySchema(page2, schema("age", "bigint")); + JSONArray datarows2 = page2.getJSONArray("datarows"); + assertEquals(3, datarows2.length()); + + // Last item from page 1 should be <= first item from page 2 + int lastAgePage1 = datarows1.getJSONArray(2).getInt(0); + int firstAgePage2 = datarows2.getJSONArray(0).getInt(0); + assertTrue("Results should maintain sort order across pages", lastAgePage1 <= firstAgePage2); + } + + @Test + public void testPaginationWithBankDataset() throws IOException { + // Test with BANK dataset - page 1: offset=0, pageSize=3 + JSONObject page1 = + executePaginatedQuery( + String.format("source=%s | fields firstname, age | sort age", TEST_INDEX_BANK), 3, 0); + + verifySchema(page1, schema("firstname", "string"), schema("age", "int")); + // BANK dataset has 7 records sorted by age: 28, 32, 33, 34, 36, 36, 39 + verifyDataRows(page1, rows("Nanette", 28), rows("Amber JOHnny", 32), rows("Dale", 33)); + + // Page 2: offset=3, pageSize=3 + JSONObject page2 = + executePaginatedQuery( + String.format("source=%s | fields firstname, age | sort age", TEST_INDEX_BANK), 3, 3); + + verifySchema(page2, schema("firstname", "string"), schema("age", "int")); + verifyDataRows(page2, rows("Dillard", 34), rows("Hattie", 36), rows("Elinor", 36)); + + // Page 3: offset=6, pageSize=3 - should only get 1 remaining record + JSONObject page3 = + executePaginatedQuery( + String.format("source=%s | fields firstname, age | sort age", TEST_INDEX_BANK), 3, 6); + + verifySchema(page3, schema("firstname", "string"), schema("age", "int")); + verifyDataRows(page3, rows("Virginia", 39)); + } + + @Test + public void testSinglePageResult() throws IOException { + // Request more data than available (using filter to limit results) + JSONObject result = + executePaginatedQuery( + String.format( + "source=%s | where firstname='Amber' | fields firstname", TEST_INDEX_ACCOUNT), + 100, + 0); + + verifySchema(result, schema("firstname", "string")); + // Amber Duke exists in the account dataset + verifyDataRows(result, rows("Amber")); + } + + @Test + public void testOffsetBeyondData() throws IOException { + // Offset beyond available data in BANK dataset (7 records) + JSONObject result = + executePaginatedQuery( + String.format("source=%s | fields firstname", TEST_INDEX_BANK), 10, 100); + + verifySchema(result, schema("firstname", "string")); + JSONArray datarows = result.getJSONArray("datarows"); + assertEquals("Should return empty when offset is beyond data", 0, datarows.length()); + } + + @Test + public void testZeroPageSize() throws IOException { + // When pageSize is 0, it should fall back to system limit behavior + JSONObject result = + executePaginatedQuery( + String.format("source=%s | fields firstname, age | head 5", TEST_INDEX_ACCOUNT), 0, 0); + + verifySchema(result, schema("firstname", "string"), schema("age", "bigint")); + JSONArray datarows = result.getJSONArray("datarows"); + // Should return data with system default limit (head 5 limits to 5) + assertEquals("Should return 5 rows with head limit", 5, datarows.length()); + } + + @Test + public void testSchemaConsistencyAcrossPages() throws IOException { + // Schema should be consistent across all pages + JSONObject page1 = + executePaginatedQuery( + String.format( + "source=%s | fields firstname, age, gender | head 1000", TEST_INDEX_ACCOUNT), + 5, + 0); + + JSONObject page2 = + executePaginatedQuery( + String.format( + "source=%s | fields firstname, age, gender | head 1000", TEST_INDEX_ACCOUNT), + 5, + 5); + + // Both pages should have consistent schema + verifySchema( + page1, schema("firstname", "string"), schema("age", "bigint"), schema("gender", "string")); + verifySchema( + page2, schema("firstname", "string"), schema("age", "bigint"), schema("gender", "string")); + + JSONArray schema1 = page1.getJSONArray("schema"); + JSONArray schema2 = page2.getJSONArray("schema"); + assertEquals( + "Schema should be consistent across pages", schema1.toString(), schema2.toString()); + } + + @Test + public void testPaginationWithAggregation() throws IOException { + // Aggregation results should also support pagination + JSONObject result = + executePaginatedQuery( + String.format("source=%s | stats count() by gender", TEST_INDEX_ACCOUNT), 10, 0); + + verifySchema(result, schema("count()", "bigint"), schema("gender", "string")); + JSONArray datarows = result.getJSONArray("datarows"); + assertTrue("Aggregation should return results", datarows.length() > 0); + // Should have M and F groups + assertTrue("Should have at most 2 gender groups", datarows.length() <= 2); + } + + @Test + public void testPaginationIterateThroughAllPages() throws IOException { + // Iterate through all pages to verify complete data coverage + int pageSize = 3; + int totalRecords = 0; + int pageCount = 0; + int maxPages = 10; // Safety limit + + while (pageCount < maxPages) { + JSONObject page = + executePaginatedQuery( + String.format("source=%s | fields firstname, age | sort age", TEST_INDEX_BANK), + pageSize, + pageCount * pageSize); + + JSONArray datarows = page.getJSONArray("datarows"); + if (datarows.length() == 0) { + break; + } + + totalRecords += datarows.length(); + pageCount++; + } + + // BANK dataset has 7 records + assertEquals("Should retrieve all 7 records across pages", 7, totalRecords); + assertEquals("Should take 3 pages to get all records", 3, pageCount); + } + + @Test + public void testPaginationWithHeadCommand() throws IOException { + // User query limits to 6 rows with head, pagination takes first 3 + // Expected: 3 rows (pagination applied after head) + JSONObject result = + executePaginatedQuery( + String.format("source=%s | fields firstname, age | sort age | head 6", TEST_INDEX_BANK), + 3, // pageSize + 0); // offset + + verifySchema(result, schema("firstname", "string"), schema("age", "int")); + JSONArray datarows = result.getJSONArray("datarows"); + assertEquals("Should return 3 rows (pageSize)", 3, datarows.length()); + // First 3 of head 6 result + verifyDataRows(result, rows("Nanette", 28), rows("Amber JOHnny", 32), rows("Dale", 33)); + } + + @Test + public void testPaginationWithHeadFrom() throws IOException { + // User query: head 4 from 2 (skip 2, take 4) + // BANK data sorted by age: Nanette(28), Amber(32), Dale(33), Dillard(34), Hattie(36), + // Elinor(36), Virginia(39) + // After head 4 from 2: Dale(33), Dillard(34), Hattie(36), Elinor(36) + // Pagination page 1 (pageSize=2, offset=0): Dale(33), Dillard(34) + JSONObject page1 = + executePaginatedQuery( + String.format( + "source=%s | fields firstname, age | sort age | head 4 from 2", TEST_INDEX_BANK), + 2, // pageSize + 0); // offset + + verifySchema(page1, schema("firstname", "string"), schema("age", "int")); + verifyDataRows(page1, rows("Dale", 33), rows("Dillard", 34)); + + // Pagination page 2 (pageSize=2, offset=2): Hattie(36), Elinor(36) + JSONObject page2 = + executePaginatedQuery( + String.format( + "source=%s | fields firstname, age | sort age | head 4 from 2", TEST_INDEX_BANK), + 2, // pageSize + 2); // offset + + verifySchema(page2, schema("firstname", "string"), schema("age", "int")); + verifyDataRows(page2, rows("Hattie", 36), rows("Elinor", 36)); + } + + @Test + public void testPaginationWithHeadSecondPage() throws IOException { + // User query: head 6 (returns 6 rows from BANK dataset) + // Pagination page 2: pageSize=3, offset=3 + // Expected: rows 4-6 of the head result + JSONObject result = + executePaginatedQuery( + String.format("source=%s | fields firstname, age | sort age | head 6", TEST_INDEX_BANK), + 3, // pageSize + 3); // offset for second page + + verifySchema(result, schema("firstname", "string"), schema("age", "int")); + // Second 3 rows of head 6: Dillard(34), Hattie(36), Elinor(36) + verifyDataRows(result, rows("Dillard", 34), rows("Hattie", 36), rows("Elinor", 36)); + } + + @Test + public void testPaginationPageSizeLargerThanHead() throws IOException { + // User query: head 3 (returns 3 rows) + // Pagination pageSize=10 is larger than head result + // Expected: only 3 rows (limited by head) + JSONObject result = + executePaginatedQuery( + String.format("source=%s | fields firstname, age | sort age | head 3", TEST_INDEX_BANK), + 10, // pageSize larger than head + 0); + + verifySchema(result, schema("firstname", "string"), schema("age", "int")); + verifyDataRows(result, rows("Nanette", 28), rows("Amber JOHnny", 32), rows("Dale", 33)); + } + + /** + * Test that pagination API with pageSize and offset produces equivalent results to using 'head X + * from Y' in the query. + * + *

Example: The query with pagination: + * + *

+   * POST /_plugins/_ppl
+   * {
+   *   "query": "source=bank | fields firstname, age | sort age",
+   *   "pageSize": 3,
+   *   "offset": 2
+   * }
+   * 
+ * + * Should produce equivalent results to: + * + *
+   * POST /_plugins/_ppl
+   * {
+   *   "query": "source=bank | fields firstname, age | sort age | head 3 from 2"
+   * }
+   * 
+ */ + @Test + public void testPaginationEquivalentToHeadFromOffset() throws IOException { + // Execute with pagination API: pageSize=3, offset=2 + JSONObject paginationResult = + executePaginatedQuery( + String.format("source=%s | fields firstname, age | sort age", TEST_INDEX_BANK), 3, 2); + + // Execute with head X from Y syntax + JSONObject headFromResult = + executePaginatedQuery( + String.format( + "source=%s | fields firstname, age | sort age | head 3 from 2", TEST_INDEX_BANK), + 0, // no pagination + 0); + + // Both should return the same data: rows 3-5 (0-indexed: 2,3,4) + // BANK dataset sorted by age: Nanette(28), Amber(32), Dale(33), Dillard(34), Hattie(36), + // Elinor(36), Virginia(39) + // Skip 2 (Nanette, Amber), return 3 (Dale, Dillard, Hattie) + verifySchema(paginationResult, schema("firstname", "string"), schema("age", "int")); + verifySchema(headFromResult, schema("firstname", "string"), schema("age", "int")); + + verifyDataRows(paginationResult, rows("Dale", 33), rows("Dillard", 34), rows("Hattie", 36)); + verifyDataRows(headFromResult, rows("Dale", 33), rows("Dillard", 34), rows("Hattie", 36)); + } + + /** + * Test that pagination with filter and sort produces equivalent results to head X from Y with + * same filter and sort. + */ + @Test + public void testPaginationWithFilterEquivalentToHeadFromOffset() throws IOException { + // Filter: age > 30, Sort: age ascending + // BANK dataset filtered (age>30): Amber(32), Dale(33), Dillard(34), Hattie(36), Elinor(36), + // Virginia(39) + + // Execute with pagination API: pageSize=2, offset=3 + JSONObject paginationResult = + executePaginatedQuery( + String.format( + "source=%s | where age > 30 | fields firstname, age | sort age", TEST_INDEX_BANK), + 2, + 3); + + // Execute with head X from Y syntax + JSONObject headFromResult = + executePaginatedQuery( + String.format( + "source=%s | where age > 30 | fields firstname, age | sort age | head 2 from 3", + TEST_INDEX_BANK), + 0, + 0); + + // Both should return rows 4-5 (0-indexed: 3,4) of filtered result + // Skip 3 (Amber, Dale, Dillard), return 2 (Hattie, Elinor) + verifySchema(paginationResult, schema("firstname", "string"), schema("age", "int")); + verifySchema(headFromResult, schema("firstname", "string"), schema("age", "int")); + + verifyDataRows(paginationResult, rows("Hattie", 36), rows("Elinor", 36)); + verifyDataRows(headFromResult, rows("Hattie", 36), rows("Elinor", 36)); + } + + /** + * Execute a PPL query with pagination parameters. + * + * @param query the PPL query + * @param pageSize the page size (0 = no pagination) + * @param offset the offset (0-based) + * @return the JSON response + */ + protected JSONObject executePaginatedQuery(String query, int pageSize, int offset) + throws IOException { + Request request = new Request("POST", QUERY_API_ENDPOINT); + + String jsonEntity; + if (pageSize > 0) { + jsonEntity = + String.format( + Locale.ROOT, + "{\n \"query\": \"%s\",\n \"pageSize\": %d,\n \"offset\": %d\n}", + query, + pageSize, + offset); + } else { + jsonEntity = String.format(Locale.ROOT, "{\n \"query\": \"%s\"\n}", query); + } + + request.setJsonEntity(jsonEntity); + + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + + Response response = client().performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + return new JSONObject(getResponseBody(response, true)); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java index 68a9c2afa69..ba7912b88ce 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java @@ -72,6 +72,24 @@ protected String explainQueryYaml(String query) throws IOException { return responseBody; } + /** + * Explain a PPL query with pagination parameters. + * + * @param query the PPL query + * @param pageSize the page size (0 = no pagination) + * @param offset the offset (0-based) + * @return the YAML explain output + */ + protected String explainQueryYamlWithPagination(String query, int pageSize, int offset) + throws IOException { + Response response = + client() + .performRequest( + buildRequestWithPagination(query, YAML_EXPLAIN_API_ENDPOINT, pageSize, offset)); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + return getResponseBody(response, true); + } + protected String explainQueryToString(String query, boolean extended) throws IOException { Response response = client() @@ -140,6 +158,29 @@ protected Request buildRequest(String query, String endpoint) { return request; } + protected Request buildRequestWithPagination( + String query, String endpoint, int pageSize, int offset) { + Request request = new Request("POST", endpoint); + String jsonEntity; + if (pageSize > 0) { + jsonEntity = + String.format( + Locale.ROOT, + "{\n \"query\": \"%s\",\n \"pageSize\": %d,\n \"offset\": %d\n}", + query, + pageSize, + offset); + } else { + jsonEntity = String.format(Locale.ROOT, "{\n \"query\": \"%s\"\n}", query); + } + request.setJsonEntity(jsonEntity); + + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + return request; + } + protected static JSONObject updateClusterSettings(ClusterSetting setting) throws IOException { Request request = new Request("PUT", "/_cluster/settings"); String persistentSetting = diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java index 01daded897d..820ac885911 100644 --- a/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java @@ -140,22 +140,13 @@ public void onFailure(Exception e) { @Test @SneakyThrows - public void test_explain_not_supported() { + public void test_explain_cursor_not_supported() { var request = new Request("POST", "_plugins/_sql/_explain"); - // Request should be rejected before index names are resolved - request.setJsonEntity("{ \"query\": \"select * from something\", \"fetch_size\": 10 }"); + // Explain with cursor continuation should be rejected + request.setJsonEntity("{ \"cursor\" : \"n:0000\" }"); var exception = assertThrows(ResponseException.class, () -> client().performRequest(request)); var response = new JSONObject(new String(exception.getResponse().getEntity().getContent().readAllBytes())); - assertEquals( - "`explain` feature for paginated requests is not implemented yet.", - response.getJSONObject("error").getString("details")); - - // Request should be rejected before cursor parsed - request.setJsonEntity("{ \"cursor\" : \"n:0000\" }"); - exception = assertThrows(ResponseException.class, () -> client().performRequest(request)); - response = - new JSONObject(new String(exception.getResponse().getEntity().getContent().readAllBytes())); assertEquals( "Explain of a paged query continuation is not supported. Use `explain` for the initial" + " query request.", diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_head_from_offset.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_head_from_offset.yaml new file mode 100644 index 00000000000..77cadee3542 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_head_from_offset.yaml @@ -0,0 +1,13 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], offset=[2], fetch=[3]) + LogicalProject(firstname=[$1], age=[$10]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[firstname, age], SORT->[{ + "age" : { + "order" : "asc", + "missing" : "_first" + } + }], LIMIT->[3 from 2], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":2,"size":3,"timeout":"1m","_source":{"includes":["firstname","age"],"excludes":[]},"sort":[{"age":{"order":"asc","missing":"_first"}}]}, requestedTotalSize=3, pageSize=null, startFrom=2)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_head_from_offset_with_filter.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_head_from_offset_with_filter.yaml new file mode 100644 index 00000000000..57d87a2be5a --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_head_from_offset_with_filter.yaml @@ -0,0 +1,14 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], offset=[3], fetch=[2]) + LogicalProject(firstname=[$1], age=[$10]) + LogicalFilter(condition=[>($10, 30)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[firstname, age], FILTER->>($1, 30), SORT->[{ + "age" : { + "order" : "asc", + "missing" : "_first" + } + }], LIMIT->[2 from 3], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":3,"size":2,"timeout":"1m","query":{"range":{"age":{"from":30,"to":null,"include_lower":false,"include_upper":true,"boost":1.0}}},"_source":{"includes":["firstname","age"],"excludes":[]},"sort":[{"age":{"order":"asc","missing":"_first"}}]}, requestedTotalSize=2, pageSize=null, startFrom=3)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_pagination_api.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_pagination_api.yaml new file mode 100644 index 00000000000..a1bfbf27790 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_pagination_api.yaml @@ -0,0 +1,13 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$1], dir0=[ASC-nulls-first], offset=[2], fetch=[3], type=[PAGINATION]) + LogicalSort(sort0=[$1], dir0=[ASC-nulls-first]) + LogicalProject(firstname=[$1], age=[$10]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[firstname, age], SORT->[{ + "age" : { + "order" : "asc", + "missing" : "_first" + } + }], LIMIT->[3 from 2]], OpenSearchRequestBuilder(sourceBuilder={"from":2,"size":3,"timeout":"1m","_source":{"includes":["firstname","age"],"excludes":[]},"sort":[{"age":{"order":"asc","missing":"_first"}}]}, requestedTotalSize=3, pageSize=null, startFrom=2)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_pagination_api_with_head_from.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_pagination_api_with_head_from.yaml new file mode 100644 index 00000000000..6f0a5bcecba --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_pagination_api_with_head_from.yaml @@ -0,0 +1,13 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$1], dir0=[ASC-nulls-first], offset=[1], fetch=[2], type=[PAGINATION]) + LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], offset=[1], fetch=[5]) + LogicalProject(firstname=[$1], age=[$10]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[firstname, age], SORT->[{ + "age" : { + "order" : "asc", + "missing" : "_first" + } + }], LIMIT->[5 from 1], LIMIT->[2 from 1]], OpenSearchRequestBuilder(sourceBuilder={"from":2,"size":2,"timeout":"1m","_source":{"includes":["firstname","age"],"excludes":[]},"sort":[{"age":{"order":"asc","missing":"_first"}}]}, requestedTotalSize=2, pageSize=null, startFrom=2)]) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java b/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java index ec09d8b5bde..0565a185513 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java @@ -23,6 +23,8 @@ public class PPLQueryRequestFactory { private static final String DEFAULT_RESPONSE_FORMAT = "jdbc"; private static final String DEFAULT_EXPLAIN_FORMAT = "standard"; private static final String QUERY_PARAMS_PRETTY = "pretty"; + private static final String PAGINATION_OFFSET = "offset"; + private static final String PAGINATION_PAGE_SIZE = "pageSize"; /** * Build {@link PPLQueryRequest} from {@link RestRequest}. @@ -59,12 +61,27 @@ private static PPLQueryRequest parsePPLRequestFromPayload(RestRequest restReques boolean pretty = getPrettyOption(restRequest.params()); try { jsonContent = new JSONObject(content); + + // Parse pagination parameters + int offset = jsonContent.optInt(PAGINATION_OFFSET, 0); + int pageSize = jsonContent.optInt(PAGINATION_PAGE_SIZE, 0); + + // Validate pagination parameters + if (offset < 0) { + throw new IllegalArgumentException("offset must be non-negative"); + } + if (pageSize < 0) { + throw new IllegalArgumentException("pageSize must be non-negative"); + } + PPLQueryRequest pplRequest = new PPLQueryRequest( jsonContent.getString(PPL_FIELD_NAME), jsonContent, restRequest.path(), - format.getFormatName()); + format.getFormatName(), + offset, + pageSize); // set sanitize option if csv format if (format.equals(Format.CSV)) { pplRequest.sanitize(getSanitizeOption(restRequest.params())); diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java index 8cdf27ef3c8..de0cfc685da 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java @@ -35,6 +35,12 @@ public class TransportPPLQueryRequest extends ActionRequest { @Getter private String format = ""; + /** Current offset for pagination (0-based). */ + @Getter private int offset = 0; + + /** Page size for pagination. 0 means pagination is disabled. */ + @Getter private int pageSize = 0; + @Setter @Getter @Accessors(fluent = true) @@ -51,6 +57,8 @@ public TransportPPLQueryRequest(PPLQueryRequest pplQueryRequest) { jsonContent = pplQueryRequest.getJsonContent(); path = pplQueryRequest.getPath(); format = pplQueryRequest.getFormat(); + offset = pplQueryRequest.getOffset(); + pageSize = pplQueryRequest.getPageSize(); sanitize = pplQueryRequest.sanitize(); style = pplQueryRequest.style(); } @@ -63,6 +71,8 @@ public TransportPPLQueryRequest(StreamInput in) throws IOException { String jsonContentString = in.readOptionalString(); jsonContent = jsonContentString != null ? new JSONObject(jsonContentString) : null; path = in.readOptionalString(); + offset = in.readInt(); + pageSize = in.readInt(); sanitize = in.readBoolean(); style = in.readEnum(JsonResponseFormatter.Style.class); } @@ -93,6 +103,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(format); out.writeOptionalString(jsonContent != null ? jsonContent.toString() : null); out.writeOptionalString(path); + out.writeInt(offset); + out.writeInt(pageSize); out.writeBoolean(sanitize); out.writeEnum(style); } @@ -128,7 +140,8 @@ public ActionRequestValidationException validate() { /** Convert to PPLQueryRequest. */ public PPLQueryRequest toPPLQueryRequest() { - PPLQueryRequest pplQueryRequest = new PPLQueryRequest(pplQuery, jsonContent, path, format); + PPLQueryRequest pplQueryRequest = + new PPLQueryRequest(pplQuery, jsonContent, path, format, offset, pageSize); pplQueryRequest.sanitize(sanitize); pplQueryRequest.style(style); return pplQueryRequest; diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java b/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java index ccd4d49dd4a..e92dc078542 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java @@ -98,12 +98,16 @@ private AbstractPlan plan( AstStatementBuilder.StatementBuilderContext.builder() .isExplain(request.isExplainRequest()) .format(request.getFormat()) + .fetchSize(request.getPageSize()) + .paginationOffset(request.getOffset()) .build())); log.info( - "[{}] Incoming request {}", + "[{}] Incoming request {} (pageSize={}, offset={})", QueryContext.getRequestId(), - anonymizer.anonymizeStatement(statement)); + anonymizer.anonymizeStatement(statement), + request.getPageSize(), + request.getOffset()); return queryExecutionFactory.create(statement, queryListener, explainListener); } diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java index ca351fcc0a7..09928c12556 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java @@ -25,6 +25,12 @@ public class PPLQueryRequest { @Getter private final String path; @Getter private String format = ""; + /** Current offset for pagination (0-based). */ + @Getter private final int offset; + + /** Page size for pagination. 0 means pagination is disabled. */ + @Getter private final int pageSize; + @Setter @Getter @Accessors(fluent = true) @@ -36,15 +42,37 @@ public class PPLQueryRequest { private JsonResponseFormatter.Style style = JsonResponseFormatter.Style.COMPACT; public PPLQueryRequest(String pplQuery, JSONObject jsonContent, String path) { - this(pplQuery, jsonContent, path, ""); + this(pplQuery, jsonContent, path, "", 0, 0); } /** Constructor of PPLQueryRequest. */ public PPLQueryRequest(String pplQuery, JSONObject jsonContent, String path, String format) { + this(pplQuery, jsonContent, path, format, 0, 0); + } + + /** Constructor of PPLQueryRequest with pagination parameters. */ + public PPLQueryRequest( + String pplQuery, + JSONObject jsonContent, + String path, + String format, + int offset, + int pageSize) { this.pplQuery = pplQuery; this.jsonContent = jsonContent; this.path = Optional.ofNullable(path).orElse(DEFAULT_PPL_PATH); this.format = format; + this.offset = offset; + this.pageSize = pageSize; + } + + /** + * Check if pagination is enabled for this request. + * + * @return true if pageSize > 0 + */ + public boolean isPaginationEnabled() { + return pageSize > 0; } public String getRequest() { diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java index a18da36c487..eecfa9a3a0a 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java @@ -30,7 +30,12 @@ public class AstStatementBuilder extends OpenSearchPPLParserBaseVisitor