Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ This module provides components organized into two main areas aligned with the [
### Unified Execution Runtime

- **`UnifiedQueryCompiler`**: Compiles Calcite logical plans (`RelNode`) into executable JDBC `PreparedStatement` objects for separation of compilation and execution.
- **`UnifiedFunction`**: Engine-agnostic function interface that enables functions to be evaluated across different execution engines without engine-specific code duplication.
- **`UnifiedFunctionRepository`**: Repository for discovering and loading functions as `UnifiedFunction` instances, providing a bridge between function definitions and external execution engines.

Together, these components enable complete workflows: parse PPL queries into logical plans, transpile those plans into target database SQL, or compile and execute queries directly for testing and conformance validation.
Together, these components enable complete workflows: parse PPL queries into logical plans, transpile those plans into target database SQL, compile and execute queries directly, or export PPL functions for use in external execution engines.

### Experimental API Design

Expand Down Expand Up @@ -86,6 +88,58 @@ try (PreparedStatement statement = compiler.compile(plan)) {
}
```

### UnifiedFunction and UnifiedFunctionRepository

The Unified Function API provides an engine-agnostic abstraction for functions, enabling them to be evaluated across different execution engines (Spark, Flink, Calcite, etc.) without engine-specific code duplication.

#### Type System

Types are represented as SQL type name strings for engine-agnostic serialization:

- **Primitive types**: `"VARCHAR"`, `"INTEGER"`, `"BIGINT"`, `"DOUBLE"`, `"BOOLEAN"`, `"DATE"`, `"TIMESTAMP"`
- **Array types**: `"ARRAY<ELEMENT_TYPE>"` (e.g., `"ARRAY<INTEGER>"`)
- **Struct types**: `"STRUCT<field1:TYPE1, field2:TYPE2>"` (e.g., `"STRUCT<name:VARCHAR, age:INTEGER>"`)

#### Loading Functions

Use `UnifiedFunctionRepository` to discover and load unified functions:

```java
// Create repository with context
UnifiedFunctionRepository repository = new UnifiedFunctionRepository(context);

// Load all available functions
List<UnifiedFunctionDescriptor> allFunctions = repository.loadFunctions();
for (UnifiedFunctionDescriptor descriptor : allFunctions) {
String name = descriptor.getFunctionName();
UnifiedFunctionBuilder builder = descriptor.getBuilder();
// Use builder to create function instances
}

// Load a specific function by name
UnifiedFunctionDescriptor upperDescriptor = repository.loadFunction("UPPER").orElseThrow();
```

#### Creating and Using Functions

Functions are created using builders with specific input types:

```java
// Get function descriptor
UnifiedFunctionDescriptor descriptor = repository.loadFunction("UPPER").orElseThrow();

// Build function with specific input types
UnifiedFunction upperFunc = descriptor.getBuilder().build(List.of("VARCHAR"));

// Get function metadata
String name = upperFunc.getFunctionName(); // "UPPER"
List<String> inputTypes = upperFunc.getInputTypes(); // ["VARCHAR"]
String returnType = upperFunc.getReturnType(); // "VARCHAR"

// Evaluate function
Object result = upperFunc.eval(List.of("hello")); // "HELLO"
```

### Complete Workflow Examples

Combining all components for a complete PPL query workflow:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.api.function;

import java.io.Serializable;
import java.util.List;

/**
* A unified function abstraction that provides an engine-agnostic way to represent and evaluate
* functions, enabling functions to be implemented once and used across multiple execution engines
* without engine-specific code duplication.
*
* <p>Note: types are represented as engine-agnostic SQL type name strings (e.g., {@code "VARCHAR"},
* {@code "INTEGER"}, {@code "ARRAY<T>"}, {@code "STRUCT<...>"}) to avoid introducing a dedicated
* {@code UnifiedType} abstraction until it’s needed.
*
* @see java.io.Serializable
*/
public interface UnifiedFunction extends Serializable {

/**
* Returns the name of the function.
*
* @return the function name
*/
String getFunctionName();

/**
* Returns the unified type names expected for the input arguments.
*
* @return list of unified type names for input arguments
*/
List<String> getInputTypes();

/**
* Returns the unified type name of the function result.
*
* @return unified type name of the function result
*/
String getReturnType();

/**
* Evaluates the function with the provided input values.
*
* @param inputs argument values evaluated by the caller
* @return the evaluated result, may be null depending on the function implementation
*/
Object eval(List<Object> inputs);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.api.function;

import java.util.List;
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.calcite.DataContext;
import org.apache.calcite.DataContexts;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexExecutable;
import org.apache.calcite.rex.RexExecutorImpl;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.type.SqlTypeName;
import org.opensearch.sql.expression.function.PPLFuncImpTable;

/** Adapter that implements {@link UnifiedFunction} using Calcite's {@link RexExecutorImpl}. */
@ToString
@EqualsAndHashCode(exclude = "compiledCode")
@RequiredArgsConstructor
public class UnifiedFunctionCalciteAdapter implements UnifiedFunction {

private static final long serialVersionUID = 1L;

/**
* Key used by RexExecutorImpl's InputGetter to retrieve input values from DataContext. This is a
* Calcite internal convention.
*/
private static final String INPUT_RECORD_KEY = "inputRecord";

/** Unified function name. */
@Getter private final String functionName;

/** Unified type name of the return value. */
@Getter private final String returnType;

/** Unified type names of the input arguments. */
@Getter private final List<String> inputTypes;

/**
* Compiled Java source for evaluating the function.
*
* <p>The generated code reads inputs from the {@code "inputRecord"} entry in {@link DataContext}.
* Arguments are mapped to field variables named {@code "_0"}, {@code "_1"}, etc.
*
* <pre>{@code
* // For UPPER(input) function:
* Object[] inputRecord = (Object[]) dataContext.get("inputRecord");
* String _0 = (String) inputRecord[0];
* return _0 == null ? null : _0.toUpperCase();
* }</pre>
*/
private final String compiledCode;

@Override
public Object eval(List<Object> inputs) {
RexExecutable rexExecutor = new RexExecutable(compiledCode, functionName);
DataContext dataContext = DataContexts.of(Map.of(INPUT_RECORD_KEY, inputs.toArray()));
rexExecutor.setDataContext(dataContext);

Object[] results = rexExecutor.execute();
return (results == null || results.length == 0) ? null : results[0];
}

/**
* Creates Calcite RexNode adapter for a unified function.
*
* <p>Note: this method pre-compiles the resolved function expression and stores the generated
* source code as a string. This avoids serializing {@link RexNode} instances and simplifies
* distribution across execution engines. If performance or security concerns arise, we can change
* this internal implementation.
*
* @param rexBuilder RexBuilder for creating expressions
* @param functionName function name
* @param inputTypes function argument types
* @return configured adapter instance
*/
public static UnifiedFunctionCalciteAdapter create(
RexBuilder rexBuilder, String functionName, List<String> inputTypes) {
RexNode[] inputRefs = makeInputRefs(rexBuilder, inputTypes);
RexNode resolved = PPLFuncImpTable.INSTANCE.resolve(rexBuilder, functionName, inputRefs);
RelDataType inputRowType = buildInputRowType(rexBuilder, inputTypes);
RexExecutable executable =
RexExecutorImpl.getExecutable(rexBuilder, List.of(resolved), inputRowType);
String returnType = resolved.getType().getSqlTypeName().getName();

return new UnifiedFunctionCalciteAdapter(
functionName, returnType, List.copyOf(inputTypes), executable.getSource());
}

private static RelDataType buildInputRowType(RexBuilder rexBuilder, List<String> inputTypes) {
RelDataTypeFactory typeFactory = rexBuilder.getTypeFactory();
RelDataTypeFactory.Builder builder = typeFactory.builder();
for (int i = 0; i < inputTypes.size(); i++) {
RelDataType relType = typeFactory.createSqlType(SqlTypeName.valueOf(inputTypes.get(i)));
builder.add("_" + i, relType);
}
return builder.build();
}

private static RexNode[] makeInputRefs(RexBuilder rexBuilder, List<String> inputTypes) {
RelDataTypeFactory typeFactory = rexBuilder.getTypeFactory();
RexNode[] inputRefs = new RexNode[inputTypes.size()];
for (int i = 0; i < inputTypes.size(); i++) {
RelDataType relType = typeFactory.createSqlType(SqlTypeName.valueOf(inputTypes.get(i)));
inputRefs[i] = rexBuilder.makeInputRef(relType, i);
}
return inputRefs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.api.function;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
import org.opensearch.sql.api.UnifiedQueryContext;
import org.opensearch.sql.expression.function.PPLBuiltinOperators;

/** Repository for discovering and loading PPL functions as {@link UnifiedFunction} instances. */
@RequiredArgsConstructor
public class UnifiedFunctionRepository {

/** Unified query context containing CalcitePlanContext for creating Rex expressions. */
private final UnifiedQueryContext context;

/**
* Loads all PPL functions from {@link PPLBuiltinOperators} as descriptors.
*
* @return list of function descriptors
*/
public List<UnifiedFunctionDescriptor> loadFunctions() {
RexBuilder rexBuilder = context.getPlanContext().rexBuilder;
return PPLBuiltinOperators.instance().getOperatorList().stream()
.filter(SqlUserDefinedFunction.class::isInstance)
.map(
operator -> {
String functionName = operator.getName();
UnifiedFunctionBuilder builder =
inputTypes ->
UnifiedFunctionCalciteAdapter.create(rexBuilder, functionName, inputTypes);
return new UnifiedFunctionDescriptor(functionName, builder);
})
.collect(Collectors.toList());
}

/**
* Loads a specific PPL function by name.
*
* @param functionName the name of the function to load (case-insensitive)
* @return optional function descriptor, empty if not found
*/
public Optional<UnifiedFunctionDescriptor> loadFunction(String functionName) {
return loadFunctions().stream()
.filter(desc -> desc.getFunctionName().equalsIgnoreCase(functionName))
.findFirst();
}

/** Function descriptor with name and builder for creating {@link UnifiedFunction} instances. */
@Value
public static class UnifiedFunctionDescriptor {
/** The name of the function in upper case. */
String functionName;

/** Builder for creating {@link UnifiedFunction} instances with specific input types. */
UnifiedFunctionBuilder builder;
}

/** Builder for creating {@link UnifiedFunction} instances with specific input types. */
@FunctionalInterface
public interface UnifiedFunctionBuilder {

/**
* Builds a {@link UnifiedFunction} instance for the specified input types.
*
* @param inputTypes Unified type names for function arguments
* @return a UnifiedFunction instance configured for the specified input types
*/
UnifiedFunction build(List<String> inputTypes);
}
}
Loading
Loading