diff --git a/api/README.md b/api/README.md index 486a2a9f58a..10460c39e78 100644 --- a/api/README.md +++ b/api/README.md @@ -4,12 +4,18 @@ This module provides a high-level integration layer for the Calcite-based query ## Overview -This module provides two primary components: +This module provides components organized into two main areas aligned with the [Unified Query API architecture](https://github.com/opensearch-project/sql/issues/4782): + +### Unified Language Specification - **`UnifiedQueryPlanner`**: Accepts PPL (Piped Processing Language) queries and returns Calcite `RelNode` logical plans as intermediate representation. - **`UnifiedQueryTranspiler`**: Converts Calcite logical plans (`RelNode`) into SQL strings for various target databases using different SQL dialects. -Together, these components enable a complete workflow: parse PPL queries into logical plans, then transpile those plans into target database SQL. +### Unified Execution Runtime + +- **`UnifiedQueryCompiler`**: Compiles Calcite logical plans (`RelNode`) into executable JDBC `PreparedStatement` objects for separation of compilation and execution. + +Together, these components enable complete workflows: parse PPL queries into logical plans, transpile those plans into target database SQL, or compile and execute queries directly for testing and conformance validation. ### Experimental API Design @@ -59,40 +65,63 @@ UnifiedQueryTranspiler transpiler = UnifiedQueryTranspiler.builder() String sql = transpiler.toSql(plan); ``` -### Complete Workflow Example +Supported SQL dialects include: +- `SparkSqlDialect.DEFAULT` - Apache Spark SQL +- `PostgresqlSqlDialect.DEFAULT` - PostgreSQL +- `MysqlSqlDialect.DEFAULT` - MySQL +- And other Calcite-supported dialects + +### UnifiedQueryCompiler -Combining all components to transpile PPL queries into target database SQL: +Use `UnifiedQueryCompiler` to compile Calcite logical plans into executable JDBC statements. This separates compilation from execution and returns standard JDBC types. ```java -// Step 1: Create reusable context (shared across components) -UnifiedQueryContext context = UnifiedQueryContext.builder() - .language(QueryType.PPL) - .catalog("catalog", schema) - .defaultNamespace("catalog") - .build(); +UnifiedQueryCompiler compiler = new UnifiedQueryCompiler(context); -// Step 2: Create planner with context -UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); +try (PreparedStatement statement = compiler.compile(plan)) { + ResultSet rs = statement.executeQuery(); + while (rs.next()) { + // Standard JDBC ResultSet access + } +} +``` -// Step 3: Plan PPL query into logical plan -RelNode plan = planner.plan("source = employees | where age > 30"); +### Complete Workflow Examples -// Step 4: Create transpiler with target dialect -UnifiedQueryTranspiler transpiler = UnifiedQueryTranspiler.builder() - .dialect(SparkSqlDialect.DEFAULT) - .build(); +Combining all components for a complete PPL query workflow: -// Step 5: Transpile to target SQL -String sparkSql = transpiler.toSql(plan); -// Result: SELECT * FROM `catalog`.`employees` WHERE `age` > 30 +```java +// Step 1: Create reusable context (shared across all components) +try (UnifiedQueryContext context = UnifiedQueryContext.builder() + .language(QueryType.PPL) + .catalog("catalog", schema) + .defaultNamespace("catalog") + .build()) { + + // Step 2: Create planner with context + UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); + + // Step 3: Plan PPL query into logical plan + RelNode plan = planner.plan("source = employees | where age > 30"); + + // Option A: Transpile to target SQL + UnifiedQueryTranspiler transpiler = UnifiedQueryTranspiler.builder() + .dialect(SparkSqlDialect.DEFAULT) + .build(); + String sparkSql = transpiler.toSql(plan); + // Result: SELECT * FROM `catalog`.`employees` WHERE `age` > 30 + + // Option B: Compile and execute directly + UnifiedQueryCompiler compiler = new UnifiedQueryCompiler(context); + try (PreparedStatement statement = compiler.compile(plan)) { + ResultSet rs = statement.executeQuery(); + while (rs.next()) { + // Process results with standard JDBC + } + } +} ``` -Supported SQL dialects include: -- `SparkSqlDialect.DEFAULT` - Apache Spark SQL -- `PostgresqlSqlDialect.DEFAULT` - PostgreSQL -- `MysqlSqlDialect.DEFAULT` - MySQL -- And other Calcite-supported dialects - ## Development & Testing A set of unit tests is provided to validate planner behavior. diff --git a/api/build.gradle b/api/build.gradle index bed69fc7964..958a25cccc2 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -5,6 +5,7 @@ plugins { id 'java-library' + id 'java-test-fixtures' id "io.freefair.lombok" id 'jacoco' id 'com.diffplug.spotless' @@ -13,10 +14,14 @@ plugins { dependencies { api project(':ppl') + testImplementation testFixtures(project(':api')) testImplementation group: 'junit', name: 'junit', version: '4.13.2' testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: "${hamcrest_version}" testImplementation group: 'org.mockito', name: 'mockito-core', version: "${mockito_version}" testImplementation group: 'org.apache.calcite', name: 'calcite-testkit', version: '1.41.0' + + testFixturesApi group: 'junit', name: 'junit', version: '4.13.2' + testFixturesApi group: 'org.hamcrest', name: 'hamcrest', version: "${hamcrest_version}" } spotless { diff --git a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java index 029eb2218ae..3e0a1f972bd 100644 --- a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java +++ b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java @@ -34,7 +34,7 @@ * enabling consistent behavior across all unified query operations. */ @Value -public class UnifiedQueryContext { +public class UnifiedQueryContext implements AutoCloseable { /** CalcitePlanContext containing Calcite framework configuration and query type. */ CalcitePlanContext planContext; @@ -42,6 +42,18 @@ public class UnifiedQueryContext { /** Settings containing execution limits and feature flags used by parsers and planners. */ Settings settings; + /** + * Closes the underlying resource managed by this context. + * + * @throws Exception if an error occurs while closing the connection + */ + @Override + public void close() throws Exception { + if (planContext != null && planContext.connection != null) { + planContext.connection.close(); + } + } + /** Creates a new builder for UnifiedQueryContext. */ public static Builder builder() { return new Builder(); diff --git a/api/src/main/java/org/opensearch/sql/api/compiler/UnifiedQueryCompiler.java b/api/src/main/java/org/opensearch/sql/api/compiler/UnifiedQueryCompiler.java new file mode 100644 index 00000000000..20cf04e3f5c --- /dev/null +++ b/api/src/main/java/org/opensearch/sql/api/compiler/UnifiedQueryCompiler.java @@ -0,0 +1,71 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.api.compiler; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import lombok.NonNull; +import org.apache.calcite.interpreter.Bindables; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelHomogeneousShuttle; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.tools.RelRunner; +import org.opensearch.sql.api.UnifiedQueryContext; + +/** + * {@code UnifiedQueryCompiler} compiles Calcite logical plans ({@link RelNode}) into executable + * JDBC statements, separating query compilation from execution. + */ +public class UnifiedQueryCompiler { + + /** Unified query context containing CalcitePlanContext with all configuration. */ + private final UnifiedQueryContext context; + + /** + * Constructs a UnifiedQueryCompiler with a unified query context. + * + * @param context the unified query context containing CalcitePlanContext + */ + public UnifiedQueryCompiler(UnifiedQueryContext context) { + this.context = context; + } + + /** + * Compiles a Calcite logical plan into an executable {@link PreparedStatement}. Similar to {@code + * CalciteToolsHelper.OpenSearchRelRunners.run()} but does not close the connection, leaving + * resource management to {@link UnifiedQueryContext}. + * + * @param plan the logical plan to compile (must not be null) + * @return a compiled PreparedStatement ready for execution + * @throws IllegalStateException if compilation fails + */ + public PreparedStatement compile(@NonNull RelNode plan) { + try { + // Apply shuttle to convert LogicalTableScan to BindableTableScan + final RelHomogeneousShuttle shuttle = + new RelHomogeneousShuttle() { + @Override + public RelNode visit(TableScan scan) { + final RelOptTable table = scan.getTable(); + if (scan instanceof LogicalTableScan + && Bindables.BindableTableScan.canHandle(table)) { + return Bindables.BindableTableScan.create(scan.getCluster(), table); + } + return super.visit(scan); + } + }; + RelNode transformedPlan = plan.accept(shuttle); + + Connection connection = context.getPlanContext().connection; + final RelRunner runner = connection.unwrap(RelRunner.class); + return runner.prepareStatement(transformedPlan); + } catch (Exception e) { + throw new IllegalStateException("Failed to compile logical plan", e); + } + } +} diff --git a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java index 3be36ee435e..a3ad73f700a 100644 --- a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java +++ b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java @@ -6,7 +6,9 @@ package org.opensearch.sql.api; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.opensearch.sql.common.setting.Settings.Key.*; import org.junit.Test; @@ -79,4 +81,19 @@ public void testInvalidDefaultNamespace() { .defaultNamespace("nonexistent") .build(); } + + @Test + public void testContextClose() throws Exception { + // Create a separate context for this test to avoid affecting other tests + UnifiedQueryContext testContext = + UnifiedQueryContext.builder() + .language(QueryType.PPL) + .catalog("opensearch", testSchema) + .defaultNamespace("opensearch") + .build(); + + assertFalse(testContext.getPlanContext().connection.isClosed()); + testContext.close(); + assertTrue(testContext.getPlanContext().connection.isClosed()); + } } diff --git a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryTestBase.java b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryTestBase.java index 6064e3d768f..000b145695a 100644 --- a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryTestBase.java +++ b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryTestBase.java @@ -5,20 +5,37 @@ package org.opensearch.sql.api; +import static org.apache.calcite.sql.type.SqlTypeName.INTEGER; +import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR; + import java.util.List; import java.util.Map; +import lombok.Builder; +import lombok.Singular; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractSchema; -import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.After; import org.junit.Before; import org.opensearch.sql.executor.QueryType; /** Base class for unified query tests providing common test schema and utilities. */ public abstract class UnifiedQueryTestBase { + /** Default catalog name */ + protected static final String DEFAULT_CATALOG = "catalog"; + /** Test schema containing sample tables for testing */ protected AbstractSchema testSchema; @@ -41,23 +58,74 @@ protected Map getTableMap() { context = UnifiedQueryContext.builder() .language(QueryType.PPL) - .catalog("catalog", testSchema) + .catalog(DEFAULT_CATALOG, testSchema) .build(); planner = new UnifiedQueryPlanner(context); } + @After + public void tearDown() throws Exception { + if (context != null) { + context.close(); + } + } + + /** Creates employees table with sample data for testing */ protected Table createEmployeesTable() { - return new AbstractTable() { - @Override - public RelDataType getRowType(RelDataTypeFactory typeFactory) { - return typeFactory.createStructType( - List.of( - typeFactory.createSqlType(SqlTypeName.INTEGER), - typeFactory.createSqlType(SqlTypeName.VARCHAR), - typeFactory.createSqlType(SqlTypeName.INTEGER), - typeFactory.createSqlType(SqlTypeName.VARCHAR)), - List.of("id", "name", "age", "department")); - } - }; + return SimpleTable.builder() + .col("id", INTEGER) + .col("name", VARCHAR) + .col("age", INTEGER) + .col("department", VARCHAR) + .row(new Object[] {1, "Alice", 25, "Engineering"}) + .row(new Object[] {2, "Bob", 35, "Sales"}) + .row(new Object[] {3, "Charlie", 45, "Engineering"}) + .row(new Object[] {4, "Diana", 28, "Marketing"}) + .build(); + } + + /** Reusable scannable table with builder pattern for easy table creation */ + @Builder + protected static class SimpleTable implements ScannableTable { + @Singular("col") + private final Map schema; + + @Singular private final List rows; + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + RelDataTypeFactory.Builder builder = typeFactory.builder(); + schema.forEach(builder::add); + return builder.build(); + } + + @Override + public Enumerable scan(DataContext root) { + return Linq4j.asEnumerable(rows); + } + + @Override + public Statistic getStatistic() { + return Statistics.UNKNOWN; + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + + @Override + public boolean isRolledUp(String column) { + return false; + } + + @Override + public boolean rolledUpColumnValidInsideAgg( + String column, + SqlCall call, + SqlNode parent, + org.apache.calcite.config.CalciteConnectionConfig config) { + return false; + } } } diff --git a/api/src/test/java/org/opensearch/sql/api/compiler/UnifiedQueryCompilerTest.java b/api/src/test/java/org/opensearch/sql/api/compiler/UnifiedQueryCompilerTest.java new file mode 100644 index 00000000000..f344969cdc3 --- /dev/null +++ b/api/src/test/java/org/opensearch/sql/api/compiler/UnifiedQueryCompilerTest.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.api.compiler; + +import static java.sql.Types.BIGINT; +import static java.sql.Types.INTEGER; +import static java.sql.Types.VARCHAR; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelShuttle; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.opensearch.sql.api.ResultSetAssertion; +import org.opensearch.sql.api.UnifiedQueryTestBase; + +public class UnifiedQueryCompilerTest extends UnifiedQueryTestBase implements ResultSetAssertion { + + private UnifiedQueryCompiler compiler; + + @Before + public void setUp() { + super.setUp(); + compiler = new UnifiedQueryCompiler(context); + } + + @Test + public void testSimpleQuery() throws Exception { + RelNode plan = planner.plan("source = catalog.employees | where age > 30"); + try (PreparedStatement statement = compiler.compile(plan)) { + ResultSet resultSet = statement.executeQuery(); + + verify(resultSet) + .expectSchema( + col("id", INTEGER), + col("name", VARCHAR), + col("age", INTEGER), + col("department", VARCHAR)) + .expectData(row(2, "Bob", 35, "Sales"), row(3, "Charlie", 45, "Engineering")); + } + } + + @Test + public void testComplexQuery() throws Exception { + RelNode plan = planner.plan("source = catalog.employees | stats count() by department"); + try (PreparedStatement statement = compiler.compile(plan)) { + ResultSet resultSet = statement.executeQuery(); + + verify(resultSet) + .expectSchema(col("count()", BIGINT), col("department", VARCHAR)) + .expectData(row(2L, "Engineering"), row(1L, "Sales"), row(1L, "Marketing")); + } + } + + @Test(expected = IllegalStateException.class) + public void testCompileFailure() { + RelNode mockPlan = Mockito.mock(RelNode.class); + Mockito.when(mockPlan.accept(Mockito.any(RelShuttle.class))) + .thenThrow(new RuntimeException("Intentional compilation failure")); + + compiler.compile(mockPlan); + } +} diff --git a/api/src/testFixtures/java/org/opensearch/sql/api/ResultSetAssertion.java b/api/src/testFixtures/java/org/opensearch/sql/api/ResultSetAssertion.java new file mode 100644 index 00000000000..fbbe72ab1fe --- /dev/null +++ b/api/src/testFixtures/java/org/opensearch/sql/api/ResultSetAssertion.java @@ -0,0 +1,107 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.api; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import lombok.RequiredArgsConstructor; +import lombok.Value; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +/** + * Mixin interface providing fluent assertion API for JDBC ResultSet verification using Hamcrest + * matchers. Tests can implement this interface to gain access to verification methods for schema + * and data. + */ +public interface ResultSetAssertion { + + /** Creates ResultSetVerifier from a JDBC ResultSet */ + default ResultSetVerifier verify(ResultSet resultSet) { + return new ResultSetVerifier(resultSet); + } + + /** Creates column matcher for schema verification with type */ + default Matcher col(String name, int sqlType) { + return new TypeSafeMatcher<>() { + @Override + protected boolean matchesSafely(ColumnInfo column) { + return name.equals(column.name) && sqlType == column.sqlType; + } + + @Override + public void describeTo(Description description) { + description.appendText("column with name: " + name + " and type: " + sqlType); + } + }; + } + + /** Creates row matcher for data verification */ + default Matcher row(Object... expectedValues) { + return new TypeSafeMatcher<>() { + @Override + protected boolean matchesSafely(Object[] actualValues) { + return Arrays.equals(expectedValues, actualValues); + } + + @Override + public void describeTo(Description description) { + description.appendText("row with values: " + Arrays.toString(expectedValues)); + } + }; + } + + /** Column information holder */ + @Value + class ColumnInfo { + String name; + int sqlType; + } + + /** Fluent assertion helper for JDBC ResultSet */ + @RequiredArgsConstructor + class ResultSetVerifier { + final ResultSet resultSet; + + @SafeVarargs + public final ResultSetVerifier expectSchema(Matcher... matchers) + throws SQLException { + ResultSetMetaData metaData = resultSet.getMetaData(); + List actualColumns = new ArrayList<>(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + actualColumns.add(new ColumnInfo(metaData.getColumnName(i), metaData.getColumnType(i))); + } + + assertThat("Schema mismatch", actualColumns, contains(matchers)); + return this; + } + + @SafeVarargs + public final ResultSetVerifier expectData(Matcher... matchers) throws SQLException { + List rows = new ArrayList<>(); + int columnCount = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + Object[] rowValues = new Object[columnCount]; + for (int i = 0; i < columnCount; i++) { + rowValues[i] = resultSet.getObject(i + 1); + } + rows.add(rowValues); + } + + assertThat("Row data mismatch", rows, containsInAnyOrder(matchers)); + return this; + } + } +} diff --git a/integ-test/build.gradle b/integ-test/build.gradle index d161c6de171..f188a2e773a 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -194,6 +194,10 @@ dependencies { implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}" testImplementation project(':opensearch-sql-plugin') testImplementation project(':legacy') + testImplementation project(':api') + testImplementation(testFixtures(project(':api'))) { + exclude group: 'org.hamcrest', module: 'hamcrest-core' + } testImplementation('org.junit.jupiter:junit-jupiter-api:5.9.3') testRuntimeOnly('org.junit.jupiter:junit-jupiter-engine:5.9.3') testRuntimeOnly('org.junit.platform:junit-platform-launcher:1.9.3') diff --git a/integ-test/src/test/java/org/opensearch/sql/api/UnifiedQueryOpenSearchIT.java b/integ-test/src/test/java/org/opensearch/sql/api/UnifiedQueryOpenSearchIT.java new file mode 100644 index 00000000000..7f7d31790f1 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/api/UnifiedQueryOpenSearchIT.java @@ -0,0 +1,131 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.api; + +import static java.sql.Types.BIGINT; +import static java.sql.Types.VARCHAR; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.HashMap; +import java.util.Map; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.junit.After; +import org.junit.Test; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.sql.api.compiler.UnifiedQueryCompiler; +import org.opensearch.sql.executor.QueryType; +import org.opensearch.sql.opensearch.client.OpenSearchClient; +import org.opensearch.sql.opensearch.client.OpenSearchRestClient; +import org.opensearch.sql.opensearch.storage.OpenSearchIndex; +import org.opensearch.sql.ppl.PPLIntegTestCase; +import org.opensearch.sql.util.InternalRestHighLevelClient; + +/** + * Integration test demonstrating the integration and usage of the Unified Query API with OpenSearch + * as a data source. + */ +public class UnifiedQueryOpenSearchIT extends PPLIntegTestCase implements ResultSetAssertion { + + private UnifiedQueryContext context; + private UnifiedQueryPlanner planner; + private UnifiedQueryCompiler compiler; + + @Override + public void init() throws Exception { + super.init(); + loadIndex(Index.ACCOUNT); + + String catalogName = "opensearch"; + context = + UnifiedQueryContext.builder() + .language(QueryType.PPL) + .catalog(catalogName, createOpenSearchSchema()) + .defaultNamespace(catalogName) + .setting("plugins.query.size_limit", 200) + .setting("plugins.query.buckets", 1000) + .setting("search.max_buckets", 65535) + .setting("plugins.sql.cursor.keep_alive", TimeValue.timeValueMinutes(1)) + .setting("plugins.query.field_type_tolerance", true) + .setting("plugins.calcite.enabled", true) + .setting("plugins.calcite.pushdown.enabled", true) + .setting("plugins.calcite.pushdown.rowcount.estimation.factor", 0.9) + .build(); + planner = new UnifiedQueryPlanner(context); + compiler = new UnifiedQueryCompiler(context); + } + + @After + public void cleanUp() throws Exception { + if (context != null) { + context.close(); + } + } + + @Test + public void testSimplePPLQueryExecution() throws Exception { + String pplQuery = + String.format( + "source = opensearch.%s | fields firstname, age | where age > 30 | head 3", + TEST_INDEX_ACCOUNT); + + RelNode logicalPlan = planner.plan(pplQuery); + try (PreparedStatement statement = compiler.compile(logicalPlan)) { + ResultSet resultSet = statement.executeQuery(); + + verify(resultSet) + .expectSchema(col("firstname", VARCHAR), col("age", BIGINT)) + .expectData(row("Amber", 32L), row("Hattie", 36L), row("Dale", 33L)); + } + } + + @Test + public void testMultiplePPLQueryExecutionWithSameContext() throws Exception { + String[] queries = { + "source = opensearch.%s | fields firstname, age | where age > 30 | head 2", + "source = opensearch.%s | fields lastname, age | where age < 30 | head 3", + "source = opensearch.%s | fields state, age | where age > 25 | head 5" + }; + + for (String query : queries) { + RelNode plan = planner.plan(String.format(query, TEST_INDEX_ACCOUNT)); + + try (PreparedStatement stmt = compiler.compile(plan)) { + ResultSet rs = stmt.executeQuery(); + assertNotNull(rs); + assertTrue("Expected at least one row for query: " + query, rs.next()); + } + } + } + + /** + * Creates a dynamic schema that creates OpenSearchIndex on-demand for any table name. This allows + * querying any index without pre-registering it. + */ + private AbstractSchema createOpenSearchSchema() { + return new AbstractSchema() { + private final OpenSearchClient osClient = + new OpenSearchRestClient(new InternalRestHighLevelClient(client())); + + @Override + protected Map getTableMap() { + return new HashMap<>() { + @Override + public Table get(Object key) { + if (!super.containsKey(key)) { + String indexName = (String) key; + super.put(indexName, new OpenSearchIndex(osClient, context.getSettings(), indexName)); + } + return super.get(key); + } + }; + } + }; + } +}