diff --git a/api/README.md b/api/README.md index 10460c39e7..91651aa315 100644 --- a/api/README.md +++ b/api/README.md @@ -14,8 +14,10 @@ This module provides components organized into two main areas aligned with the [ ### Unified Execution Runtime - **`UnifiedQueryCompiler`**: Compiles Calcite logical plans (`RelNode`) into executable JDBC `PreparedStatement` objects for separation of compilation and execution. +- **`UnifiedFunction`**: Engine-agnostic function interface that enables functions to be evaluated across different execution engines without engine-specific code duplication. +- **`UnifiedFunctionRepository`**: Repository for discovering and loading functions as `UnifiedFunction` instances, providing a bridge between function definitions and external execution engines. -Together, these components enable complete workflows: parse PPL queries into logical plans, transpile those plans into target database SQL, or compile and execute queries directly for testing and conformance validation. +Together, these components enable complete workflows: parse PPL queries into logical plans, transpile those plans into target database SQL, compile and execute queries directly, or export PPL functions for use in external execution engines. ### Experimental API Design @@ -86,6 +88,58 @@ try (PreparedStatement statement = compiler.compile(plan)) { } ``` +### UnifiedFunction and UnifiedFunctionRepository + +The Unified Function API provides an engine-agnostic abstraction for functions, enabling them to be evaluated across different execution engines (Spark, Flink, Calcite, etc.) without engine-specific code duplication. + +#### Type System + +Types are represented as SQL type name strings for engine-agnostic serialization: + +- **Primitive types**: `"VARCHAR"`, `"INTEGER"`, `"BIGINT"`, `"DOUBLE"`, `"BOOLEAN"`, `"DATE"`, `"TIMESTAMP"` +- **Array types**: `"ARRAY"` (e.g., `"ARRAY"`) +- **Struct types**: `"STRUCT"` (e.g., `"STRUCT"`) + +#### Loading Functions + +Use `UnifiedFunctionRepository` to discover and load unified functions: + +```java +// Create repository with context +UnifiedFunctionRepository repository = new UnifiedFunctionRepository(context); + +// Load all available functions +List allFunctions = repository.loadFunctions(); +for (UnifiedFunctionDescriptor descriptor : allFunctions) { + String name = descriptor.getFunctionName(); + UnifiedFunctionBuilder builder = descriptor.getBuilder(); + // Use builder to create function instances +} + +// Load a specific function by name +UnifiedFunctionDescriptor upperDescriptor = repository.loadFunction("UPPER").orElseThrow(); +``` + +#### Creating and Using Functions + +Functions are created using builders with specific input types: + +```java +// Get function descriptor +UnifiedFunctionDescriptor descriptor = repository.loadFunction("UPPER").orElseThrow(); + +// Build function with specific input types +UnifiedFunction upperFunc = descriptor.getBuilder().build(List.of("VARCHAR")); + +// Get function metadata +String name = upperFunc.getFunctionName(); // "UPPER" +List inputTypes = upperFunc.getInputTypes(); // ["VARCHAR"] +String returnType = upperFunc.getReturnType(); // "VARCHAR" + +// Evaluate function +Object result = upperFunc.eval(List.of("hello")); // "HELLO" +``` + ### Complete Workflow Examples Combining all components for a complete PPL query workflow: diff --git a/api/src/main/java/org/opensearch/sql/api/function/UnifiedFunction.java b/api/src/main/java/org/opensearch/sql/api/function/UnifiedFunction.java new file mode 100644 index 0000000000..49febdb7dc --- /dev/null +++ b/api/src/main/java/org/opensearch/sql/api/function/UnifiedFunction.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.api.function; + +import java.io.Serializable; +import java.util.List; + +/** + * A unified function abstraction that provides an engine-agnostic way to represent and evaluate + * functions, enabling functions to be implemented once and used across multiple execution engines + * without engine-specific code duplication. + * + *

Note: types are represented as engine-agnostic SQL type name strings (e.g., {@code "VARCHAR"}, + * {@code "INTEGER"}, {@code "ARRAY"}, {@code "STRUCT<...>"}) to avoid introducing a dedicated + * {@code UnifiedType} abstraction until it’s needed. + * + * @see java.io.Serializable + */ +public interface UnifiedFunction extends Serializable { + + /** + * Returns the name of the function. + * + * @return the function name + */ + String getFunctionName(); + + /** + * Returns the unified type names expected for the input arguments. + * + * @return list of unified type names for input arguments + */ + List getInputTypes(); + + /** + * Returns the unified type name of the function result. + * + * @return unified type name of the function result + */ + String getReturnType(); + + /** + * Evaluates the function with the provided input values. + * + * @param inputs argument values evaluated by the caller + * @return the evaluated result, may be null depending on the function implementation + */ + Object eval(List inputs); +} diff --git a/api/src/main/java/org/opensearch/sql/api/function/UnifiedFunctionCalciteAdapter.java b/api/src/main/java/org/opensearch/sql/api/function/UnifiedFunctionCalciteAdapter.java new file mode 100644 index 0000000000..cab0d4cbef --- /dev/null +++ b/api/src/main/java/org/opensearch/sql/api/function/UnifiedFunctionCalciteAdapter.java @@ -0,0 +1,118 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.api.function; + +import java.util.List; +import java.util.Map; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; +import org.apache.calcite.DataContext; +import org.apache.calcite.DataContexts; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexExecutable; +import org.apache.calcite.rex.RexExecutorImpl; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.type.SqlTypeName; +import org.opensearch.sql.expression.function.PPLFuncImpTable; + +/** Adapter that implements {@link UnifiedFunction} using Calcite's {@link RexExecutorImpl}. */ +@ToString +@EqualsAndHashCode(exclude = "compiledCode") +@RequiredArgsConstructor +public class UnifiedFunctionCalciteAdapter implements UnifiedFunction { + + private static final long serialVersionUID = 1L; + + /** + * Key used by RexExecutorImpl's InputGetter to retrieve input values from DataContext. This is a + * Calcite internal convention. + */ + private static final String INPUT_RECORD_KEY = "inputRecord"; + + /** Unified function name. */ + @Getter private final String functionName; + + /** Unified type name of the return value. */ + @Getter private final String returnType; + + /** Unified type names of the input arguments. */ + @Getter private final List inputTypes; + + /** + * Compiled Java source for evaluating the function. + * + *

The generated code reads inputs from the {@code "inputRecord"} entry in {@link DataContext}. + * Arguments are mapped to field variables named {@code "_0"}, {@code "_1"}, etc. + * + *

{@code
+   * // For UPPER(input) function:
+   * Object[] inputRecord = (Object[]) dataContext.get("inputRecord");
+   * String _0 = (String) inputRecord[0];
+   * return _0 == null ? null : _0.toUpperCase();
+   * }
+ */ + private final String compiledCode; + + @Override + public Object eval(List inputs) { + RexExecutable rexExecutor = new RexExecutable(compiledCode, functionName); + DataContext dataContext = DataContexts.of(Map.of(INPUT_RECORD_KEY, inputs.toArray())); + rexExecutor.setDataContext(dataContext); + + Object[] results = rexExecutor.execute(); + return (results == null || results.length == 0) ? null : results[0]; + } + + /** + * Creates Calcite RexNode adapter for a unified function. + * + *

Note: this method pre-compiles the resolved function expression and stores the generated + * source code as a string. This avoids serializing {@link RexNode} instances and simplifies + * distribution across execution engines. If performance or security concerns arise, we can change + * this internal implementation. + * + * @param rexBuilder RexBuilder for creating expressions + * @param functionName function name + * @param inputTypes function argument types + * @return configured adapter instance + */ + public static UnifiedFunctionCalciteAdapter create( + RexBuilder rexBuilder, String functionName, List inputTypes) { + RexNode[] inputRefs = makeInputRefs(rexBuilder, inputTypes); + RexNode resolved = PPLFuncImpTable.INSTANCE.resolve(rexBuilder, functionName, inputRefs); + RelDataType inputRowType = buildInputRowType(rexBuilder, inputTypes); + RexExecutable executable = + RexExecutorImpl.getExecutable(rexBuilder, List.of(resolved), inputRowType); + String returnType = resolved.getType().getSqlTypeName().getName(); + + return new UnifiedFunctionCalciteAdapter( + functionName, returnType, List.copyOf(inputTypes), executable.getSource()); + } + + private static RelDataType buildInputRowType(RexBuilder rexBuilder, List inputTypes) { + RelDataTypeFactory typeFactory = rexBuilder.getTypeFactory(); + RelDataTypeFactory.Builder builder = typeFactory.builder(); + for (int i = 0; i < inputTypes.size(); i++) { + RelDataType relType = typeFactory.createSqlType(SqlTypeName.valueOf(inputTypes.get(i))); + builder.add("_" + i, relType); + } + return builder.build(); + } + + private static RexNode[] makeInputRefs(RexBuilder rexBuilder, List inputTypes) { + RelDataTypeFactory typeFactory = rexBuilder.getTypeFactory(); + RexNode[] inputRefs = new RexNode[inputTypes.size()]; + for (int i = 0; i < inputTypes.size(); i++) { + RelDataType relType = typeFactory.createSqlType(SqlTypeName.valueOf(inputTypes.get(i))); + inputRefs[i] = rexBuilder.makeInputRef(relType, i); + } + return inputRefs; + } +} diff --git a/api/src/main/java/org/opensearch/sql/api/function/UnifiedFunctionRepository.java b/api/src/main/java/org/opensearch/sql/api/function/UnifiedFunctionRepository.java new file mode 100644 index 0000000000..0d10f53c91 --- /dev/null +++ b/api/src/main/java/org/opensearch/sql/api/function/UnifiedFunctionRepository.java @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.api.function; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.Value; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.sql.validate.SqlUserDefinedFunction; +import org.opensearch.sql.api.UnifiedQueryContext; +import org.opensearch.sql.expression.function.PPLBuiltinOperators; + +/** Repository for discovering and loading PPL functions as {@link UnifiedFunction} instances. */ +@RequiredArgsConstructor +public class UnifiedFunctionRepository { + + /** Unified query context containing CalcitePlanContext for creating Rex expressions. */ + private final UnifiedQueryContext context; + + /** + * Loads all PPL functions from {@link PPLBuiltinOperators} as descriptors. + * + * @return list of function descriptors + */ + public List loadFunctions() { + RexBuilder rexBuilder = context.getPlanContext().rexBuilder; + return PPLBuiltinOperators.instance().getOperatorList().stream() + .filter(SqlUserDefinedFunction.class::isInstance) + .map( + operator -> { + String functionName = operator.getName(); + UnifiedFunctionBuilder builder = + inputTypes -> + UnifiedFunctionCalciteAdapter.create(rexBuilder, functionName, inputTypes); + return new UnifiedFunctionDescriptor(functionName, builder); + }) + .collect(Collectors.toList()); + } + + /** + * Loads a specific PPL function by name. + * + * @param functionName the name of the function to load (case-insensitive) + * @return optional function descriptor, empty if not found + */ + public Optional loadFunction(String functionName) { + return loadFunctions().stream() + .filter(desc -> desc.getFunctionName().equalsIgnoreCase(functionName)) + .findFirst(); + } + + /** Function descriptor with name and builder for creating {@link UnifiedFunction} instances. */ + @Value + public static class UnifiedFunctionDescriptor { + /** The name of the function in upper case. */ + String functionName; + + /** Builder for creating {@link UnifiedFunction} instances with specific input types. */ + UnifiedFunctionBuilder builder; + } + + /** Builder for creating {@link UnifiedFunction} instances with specific input types. */ + @FunctionalInterface + public interface UnifiedFunctionBuilder { + + /** + * Builds a {@link UnifiedFunction} instance for the specified input types. + * + * @param inputTypes Unified type names for function arguments + * @return a UnifiedFunction instance configured for the specified input types + */ + UnifiedFunction build(List inputTypes); + } +} diff --git a/api/src/test/java/org/opensearch/sql/api/function/UnifiedFunctionCalciteAdapterTest.java b/api/src/test/java/org/opensearch/sql/api/function/UnifiedFunctionCalciteAdapterTest.java new file mode 100644 index 0000000000..02fd7faf47 --- /dev/null +++ b/api/src/test/java/org/opensearch/sql/api/function/UnifiedFunctionCalciteAdapterTest.java @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.api.function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.List; +import org.apache.calcite.rex.RexBuilder; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.sql.api.UnifiedQueryTestBase; + +public class UnifiedFunctionCalciteAdapterTest extends UnifiedQueryTestBase { + + private RexBuilder rexBuilder; + + @Before + @Override + public void setUp() { + super.setUp(); + rexBuilder = context.getPlanContext().rexBuilder; + } + + @Test + public void testCreateFunction() { + UnifiedFunction upperFunc = + UnifiedFunctionCalciteAdapter.create(rexBuilder, "UPPER", List.of("VARCHAR")); + + assertNotNull(upperFunc); + assertEquals("UPPER", upperFunc.getFunctionName()); + assertEquals(List.of("VARCHAR"), upperFunc.getInputTypes()); + assertEquals("VARCHAR", upperFunc.getReturnType()); + } + + @Test(expected = IllegalArgumentException.class) + public void testCreateWithInvalidFunctionName() { + UnifiedFunctionCalciteAdapter.create(rexBuilder, "INVALID_FUNCTION", List.of("VARCHAR")); + } + + @Test + public void testEvaluateFunction() { + UnifiedFunction upperFunc = + UnifiedFunctionCalciteAdapter.create(rexBuilder, "UPPER", List.of("VARCHAR")); + + Object result = upperFunc.eval(List.of("hello")); + assertEquals("HELLO", result); + } + + @Test + public void testSerializeAndDeserialize() throws Exception { + UnifiedFunctionCalciteAdapter originalFunc = + UnifiedFunctionCalciteAdapter.create(rexBuilder, "UPPER", List.of("VARCHAR")); + + // Serialize + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(originalFunc); + } + + // Deserialize + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + UnifiedFunctionCalciteAdapter deserializedFunc; + try (ObjectInputStream ois = new ObjectInputStream(bais)) { + deserializedFunc = (UnifiedFunctionCalciteAdapter) ois.readObject(); + } + + // Verify metadata is preserved + assertNotNull(deserializedFunc); + assertEquals(originalFunc.getFunctionName(), deserializedFunc.getFunctionName()); + assertEquals(originalFunc.getInputTypes(), deserializedFunc.getInputTypes()); + assertEquals(originalFunc.getReturnType(), deserializedFunc.getReturnType()); + + // Verify functionality is preserved after deserialization + Object result = deserializedFunc.eval(List.of("hello")); + assertEquals("HELLO", result); + } +} diff --git a/api/src/test/java/org/opensearch/sql/api/function/UnifiedFunctionRepositoryTest.java b/api/src/test/java/org/opensearch/sql/api/function/UnifiedFunctionRepositoryTest.java new file mode 100644 index 0000000000..5ffc077f1e --- /dev/null +++ b/api/src/test/java/org/opensearch/sql/api/function/UnifiedFunctionRepositoryTest.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.api.function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.sql.api.UnifiedQueryTestBase; +import org.opensearch.sql.api.function.UnifiedFunctionRepository.UnifiedFunctionDescriptor; + +public class UnifiedFunctionRepositoryTest extends UnifiedQueryTestBase { + + private UnifiedFunctionRepository repository; + + @Before + @Override + public void setUp() { + super.setUp(); + repository = new UnifiedFunctionRepository(context); + } + + @Test + public void testLoadAllFunctions() { + List functions = repository.loadFunctions(); + + assertTrue("Should load at least one function", functions.size() >= 1); + for (UnifiedFunctionDescriptor descriptor : functions) { + assertNotNull("Function name should not be null", descriptor.getFunctionName()); + assertNotNull("Builder should not be null", descriptor.getBuilder()); + } + } + + @Test + public void testLoadSpecificFunction() { + UnifiedFunctionDescriptor jsonFunc = repository.loadFunction("json").orElseThrow(); + + assertEquals("JSON", jsonFunc.getFunctionName()); + assertNotNull("Builder should be present", jsonFunc.getBuilder()); + } + + @Test + public void testLoadSpecificFunctionCaseInsensitive() { + UnifiedFunctionDescriptor upperCase = repository.loadFunction("JSON").orElseThrow(); + UnifiedFunctionDescriptor lowerCase = repository.loadFunction("json").orElseThrow(); + UnifiedFunctionDescriptor mixedCase = repository.loadFunction("Json").orElseThrow(); + + assertEquals("JSON", upperCase.getFunctionName()); + assertEquals("JSON", lowerCase.getFunctionName()); + assertEquals("JSON", mixedCase.getFunctionName()); + } + + @Test + public void testLoadNonExistentFunctionReturnsEmpty() { + assertTrue( + "Non-existent function should return empty Optional", + repository.loadFunction("NON_EXISTENT_FUNCTION").isEmpty()); + } + + @Test + public void testFunctionBuilderCreatesValidFunction() { + UnifiedFunctionDescriptor descriptor = + repository.loadFunctions().stream() + .filter(d -> d.getFunctionName().equalsIgnoreCase("json")) + .findFirst() + .orElseThrow(); + UnifiedFunction jsonFunc = descriptor.getBuilder().build(List.of("VARCHAR")); + + assertNotNull("Function should be created", jsonFunc); + assertTrue("Function name should be JSON", jsonFunc.getFunctionName().equalsIgnoreCase("JSON")); + } +}