Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: convert sql expression into proto extended expressions #191

Merged
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
333989f
feat: convert sql expression into proto extended expressions
davisusanibar Oct 24, 2023
f4b6581
fix: implement nameToNodeMap and nameToTypeMap dyamically instead of …
davisusanibar Oct 26, 2023
a79f57d
fix: cover support also for project extended expression
davisusanibar Oct 26, 2023
a37be92
fix: cover support also for project extended expression
davisusanibar Oct 27, 2023
9f6aaf3
fix: create schema dynamically
davisusanibar Nov 15, 2023
52b41e3
fix: set function reference and extensions dinamically
davisusanibar Nov 16, 2023
74d13d3
Merge branch 'main' into feature/sql_to_extended_expression
davisusanibar Nov 16, 2023
3d80d1f
fix: clean code
davisusanibar Nov 16, 2023
ae84176
Merge branch 'main' into feature/sql_to_extended_expression
davisusanibar Nov 16, 2023
5954a62
fix: clean code
davisusanibar Nov 16, 2023
fc33a32
fix: rename variables to clean code
davisusanibar Nov 17, 2023
217f2a0
fix: from/to pojo/protobuf
davisusanibar Nov 23, 2023
75e4f48
feat: enable support from/to pojo/protobuf for extended expressions
davisusanibar Nov 24, 2023
1d23187
Merge branch 'main' into feature/from_to_protobuf_pojo
davisusanibar Nov 24, 2023
5adc79f
fix: consume core module for proto/pojo conversions
davisusanibar Nov 24, 2023
940f703
fix: clean code redundant method
davisusanibar Nov 25, 2023
e281f2f
Merge branch 'main' into feature/sql_to_extended_expression
davisusanibar Nov 25, 2023
f817eb0
fix: apply suggestions from code review
davisusanibar Nov 29, 2023
b1c96bd
fix: code review core module
davisusanibar Nov 29, 2023
3d9b927
fix: code review core module testing side
davisusanibar Nov 29, 2023
e790492
feat: support aggregation function in extended expression from/to poj…
davisusanibar Dec 6, 2023
ef7c076
fix: merge from/to proto/pojo
davisusanibar Dec 6, 2023
d1b4efb
fix: merge from/to proto/pojo
davisusanibar Dec 6, 2023
c26fecd
fix: merge from/to proto/pojo + solve comments on the PR
davisusanibar Dec 6, 2023
bdde874
Merge branch 'main' into feature/sql_to_extended_expression
davisusanibar Dec 6, 2023
0fa69c8
fix: code review suggestion
davisusanibar Dec 6, 2023
92d2cc5
refactor: bind instanceof checked variables
vbarua Dec 7, 2023
379b83f
fix: adding Aggregate.Measure POJO instead of Proto
davisusanibar Dec 8, 2023
e415785
fix: simplify extended expression immutable class
davisusanibar Dec 8, 2023
c27dd37
fix: clean code
davisusanibar Dec 8, 2023
a5d8126
fix: support any kind of expression type on extended expression conve…
davisusanibar Dec 9, 2023
50602f2
fix: error scalar function test case
davisusanibar Dec 9, 2023
f57322a
Merge branch 'feature/from_to_protobuf_pojo' into feature/sql_to_exte…
davisusanibar Dec 12, 2023
1c8b8b5
fix: support any kind of expression type on extended expression conve…
davisusanibar Dec 12, 2023
c71bff1
fix: consolidate PR and resolve conflicting files
davisusanibar Dec 14, 2023
183dcb6
fix: addressing PR comments
davisusanibar Dec 19, 2023
3835a4f
docs: update SqlExpressionToSubstrait#convert docs
vbarua Dec 20, 2023
6327a17
fix: commit suggestion code
davisusanibar Jan 10, 2024
8658558
fix: addressing PR comments
davisusanibar Jan 11, 2024
ecf3133
Merge branch 'main' into feature/sql_to_extended_expression
davisusanibar Jan 11, 2024
3ff586a
fix: delete integration with arrow project
davisusanibar Jan 11, 2024
6151bca
fix: apply suggestions from code review
davisusanibar Jan 12, 2024
96a2f25
fix: addressing PR comments
davisusanibar Jan 15, 2024
579d2af
Merge branch 'main' into feature/sql_to_extended_expression
davisusanibar Jan 17, 2024
3b110f3
refactor: remove unused nation.parquet data
vbarua Jan 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ trim_trailing_whitespace = true
[*.{yaml,yml}]
indent_size = 2

[{**/*.sql,**/OuterReferenceResolver.md,gradlew.bat}]
[{**/*.sql,**/OuterReferenceResolver.md,gradlew.bat,**/*.parquet}]
charset = unset
end_of_line = unset
insert_final_newline = unset
Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
repos:
- repo: https://github.com/adrienverge/yamllint.git
rev: v1.26.0
vbarua marked this conversation as resolved.
Show resolved Hide resolved
rev: v1.33.0
hooks:
- id: yamllint
args: [-c=.yamllint.yaml]
- repo: https://github.com/alessandrojcm/commitlint-pre-commit-hook
rev: v8.0.0
rev: v9.9.0
hooks:
- id: commitlint
stages: [commit-msg]
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package io.substrait.isthmus;

import com.github.bsideup.jabel.Desugar;
import io.substrait.extendedexpression.ExtendedExpressionProtoConverter;
import io.substrait.extendedexpression.ImmutableExpressionReference;
import io.substrait.extendedexpression.ImmutableExtendedExpression;
import io.substrait.extension.SimpleExtension;
import io.substrait.isthmus.expression.RexExpressionConverter;
import io.substrait.isthmus.expression.ScalarFunctionConverter;
import io.substrait.proto.ExtendedExpression;
import io.substrait.type.NamedStruct;
import io.substrait.type.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.sql2rel.StandardConvertletTable;

public class SqlExpressionToSubstrait extends SqlConverterBase {

protected final RexExpressionConverter rexConverter;

public SqlExpressionToSubstrait() {
this(FEATURES_DEFAULT, EXTENSION_COLLECTION);
}

public SqlExpressionToSubstrait(
FeatureBoard features, SimpleExtension.ExtensionCollection extensions) {
super(features);
ScalarFunctionConverter scalarFunctionConverter =
new ScalarFunctionConverter(extensions.scalarFunctions(), factory);
this.rexConverter = new RexExpressionConverter(scalarFunctionConverter);
}

@Desugar
private record Result(
SqlValidator validator,
CalciteCatalogReader catalogReader,
Map<String, RelDataType> nameToTypeMap,
Map<String, RexNode> nameToNodeMap) {}

/**
* Converts the given SQL expression string to an {@link io.substrait.proto.ExtendedExpression }
*
* @param sqlExpression a SQL expression
* @param createStatements table creation statements defining fields referenced by the expression
* @return a {@link io.substrait.proto.ExtendedExpression }
* @throws SqlParseException
*/
public ExtendedExpression convert(String sqlExpression, List<String> createStatements)
throws SqlParseException {
var result = registerCreateTablesForExtendedExpression(createStatements);
return executeInnerSQLExpression(
sqlExpression,
result.validator(),
result.catalogReader(),
result.nameToTypeMap(),
result.nameToNodeMap());
}

private ExtendedExpression executeInnerSQLExpression(
String sqlExpression,
SqlValidator validator,
CalciteCatalogReader catalogReader,
Map<String, RelDataType> nameToTypeMap,
Map<String, RexNode> nameToNodeMap)
throws SqlParseException {
RexNode rexNode =
sqlToRexNode(sqlExpression, validator, catalogReader, nameToTypeMap, nameToNodeMap);
NamedStruct namedStruct = toNamedStruct(nameToTypeMap);

ImmutableExpressionReference expressionReference =
ImmutableExpressionReference.builder()
.expression(rexNode.accept(this.rexConverter))
.addOutputNames("new-column")
vibhatha marked this conversation as resolved.
Show resolved Hide resolved
.build();

List<io.substrait.extendedexpression.ExtendedExpression.ExpressionReference>
expressionReferences = new ArrayList<>();
expressionReferences.add(expressionReference);

ImmutableExtendedExpression.Builder extendedExpression =
ImmutableExtendedExpression.builder()
.referredExpressions(expressionReferences)
.baseSchema(namedStruct);

return new ExtendedExpressionProtoConverter().toProto(extendedExpression.build());
}

private RexNode sqlToRexNode(
String sql,
SqlValidator validator,
CalciteCatalogReader catalogReader,
Map<String, RelDataType> nameToTypeMap,
Map<String, RexNode> nameToNodeMap)
throws SqlParseException {
SqlParser parser = SqlParser.create(sql, parserConfig);
SqlNode sqlNode = parser.parseExpression();
vbarua marked this conversation as resolved.
Show resolved Hide resolved
SqlNode validSqlNode = validator.validateParameterizedExpression(sqlNode, nameToTypeMap);
SqlToRelConverter converter =
new SqlToRelConverter(
null,
validator,
catalogReader,
relOptCluster,
StandardConvertletTable.INSTANCE,
converterConfig);
return converter.convertExpression(validSqlNode, nameToNodeMap);
}

private Result registerCreateTablesForExtendedExpression(List<String> tables)
vibhatha marked this conversation as resolved.
Show resolved Hide resolved
throws SqlParseException {
Map<String, RelDataType> nameToTypeMap = new LinkedHashMap<>();
Map<String, RexNode> nameToNodeMap = new HashMap<>();
vibhatha marked this conversation as resolved.
Show resolved Hide resolved
CalciteSchema rootSchema = CalciteSchema.createRootSchema(false);
CalciteCatalogReader catalogReader =
new CalciteCatalogReader(rootSchema, List.of(), factory, config);
SqlValidator validator = Validator.create(factory, catalogReader, SqlValidator.Config.DEFAULT);
if (tables != null) {
for (String tableDef : tables) {
List<DefinedTable> tList = parseCreateTable(factory, validator, tableDef);
for (DefinedTable t : tList) {
rootSchema.add(t.getName(), t);
for (RelDataTypeField field : t.getRowType(factory).getFieldList()) {
nameToTypeMap.merge( // to validate the sql expression tree
field.getName(),
field.getType(),
(v1, v2) -> {
throw new IllegalArgumentException(
"There is no support for duplicate column names: " + field.getName());
});
nameToNodeMap.merge( // to convert sql expression into RexNode
field.getName(),
new RexInputRef(field.getIndex(), field.getType()),
(v1, v2) -> {
throw new IllegalArgumentException(
"There is no support for duplicate column names: " + field.getName());
});
}
}
}
} else {
throw new IllegalArgumentException(
"Information regarding the data and types must be passed.");
}
return new Result(validator, catalogReader, nameToTypeMap, nameToNodeMap);
}

private NamedStruct toNamedStruct(Map<String, RelDataType> nameToTypeMap) {
var names = new ArrayList<String>();
var types = new ArrayList<Type>();
for (Map.Entry<String, RelDataType> entry : nameToTypeMap.entrySet()) {
String k = entry.getKey();
RelDataType v = entry.getValue();
names.add(k);
types.add(TypeConverter.DEFAULT.toSubstrait(v));
}
return NamedStruct.of(names, Type.Struct.builder().fields(types).nullable(false).build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.substrait.isthmus;

import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import io.substrait.extendedexpression.ExtendedExpressionProtoConverter;
import io.substrait.extendedexpression.ProtoExtendedExpressionConverter;
import io.substrait.proto.ExtendedExpression;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.calcite.sql.parser.SqlParseException;
import org.junit.jupiter.api.Assertions;

public class ExtendedExpressionTestBase {
public static String asString(String resource) throws IOException {
return Resources.toString(Resources.getResource(resource), Charsets.UTF_8);
}

public static List<String> tpchSchemaCreateStatements(String schemaToLoad) throws IOException {
String[] values = asString(schemaToLoad).split(";");
return Arrays.stream(values)
.filter(t -> !t.trim().isBlank())
.collect(java.util.stream.Collectors.toList());
}

public static List<String> tpchSchemaCreateStatements() throws IOException {
return tpchSchemaCreateStatements("tpch/schema.sql");
}

protected ExtendedExpression assertProtoExtendedExpressionRoundtrip(String query)
throws IOException, SqlParseException {
return assertProtoExtendedExpressionRoundtrip(query, new SqlExpressionToSubstrait());
}

protected ExtendedExpression assertProtoExtendedExpressionRoundtrip(
String query, String schemaToLoad) throws IOException, SqlParseException {
return assertProtoExtendedExpressionRoundtrip(
query, new SqlExpressionToSubstrait(), schemaToLoad);
}

protected ExtendedExpression assertProtoExtendedExpressionRoundtrip(
String query, SqlExpressionToSubstrait s) throws IOException, SqlParseException {
return assertProtoExtendedExpressionRoundtrip(query, s, tpchSchemaCreateStatements());
}

protected ExtendedExpression assertProtoExtendedExpressionRoundtrip(
String query, SqlExpressionToSubstrait s, String schemaToLoad)
throws IOException, SqlParseException {
return assertProtoExtendedExpressionRoundtrip(
query, s, tpchSchemaCreateStatements(schemaToLoad));
}

protected ExtendedExpression assertProtoExtendedExpressionRoundtrip(
String query, SqlExpressionToSubstrait s, List<String> creates)
throws SqlParseException, IOException {
// proto initial extended expression
ExtendedExpression extendedExpressionProtoInitial = s.convert(query, creates);

// pojo final extended expression
io.substrait.extendedexpression.ExtendedExpression extendedExpressionPojoFinal =
new ProtoExtendedExpressionConverter().from(extendedExpressionProtoInitial);

// proto final extended expression
ExtendedExpression extendedExpressionProtoFinal =
new ExtendedExpressionProtoConverter().toProto(extendedExpressionPojoFinal);

// round-trip to validate extended expression proto initial equals to final
Assertions.assertEquals(extendedExpressionProtoFinal, extendedExpressionProtoInitial);

return extendedExpressionProtoInitial;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.substrait.isthmus;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.util.stream.Stream;
import org.apache.calcite.sql.parser.SqlParseException;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class SimpleExtendedExpressionsTest extends ExtendedExpressionTestBase {

private static Stream<Arguments> expressionTypeProvider() {
return Stream.of(
Arguments.of("2"), // I32LiteralExpression
Arguments.of("L_ORDERKEY"), // FieldReferenceExpression
Arguments.of("L_ORDERKEY > 10"), // ScalarFunctionExpressionFilter
Arguments.of("L_ORDERKEY + 10"), // ScalarFunctionExpressionProjection
Arguments.of("L_ORDERKEY IN (10, 20)"), // ScalarFunctionExpressionIn
Arguments.of("L_ORDERKEY is not null"), // ScalarFunctionExpressionIsNotNull
Arguments.of("L_ORDERKEY is null") // ScalarFunctionExpressionIsNull
);
}

@ParameterizedTest
@MethodSource("expressionTypeProvider")
public void testExtendedExpressionsRoundTrip(String sqlExpression)
throws SqlParseException, IOException {
assertProtoExtendedExpressionRoundtrip(sqlExpression);
}

@ParameterizedTest
@MethodSource("expressionTypeProvider")
public void testExtendedExpressionsRoundTripDuplicateColumnIdentifier(String sqlExpression) {
IllegalArgumentException illegalArgumentException =
assertThrows(
IllegalArgumentException.class,
() -> assertProtoExtendedExpressionRoundtrip(sqlExpression, "tpch/schema_error.sql"));
assertTrue(
illegalArgumentException
.getMessage()
.startsWith("There is no support for duplicate column names"));
}
}
Binary file not shown.
36 changes: 36 additions & 0 deletions isthmus/src/test/resources/tpch/schema_error.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
CREATE TABLE LINEITEM (
L_ORDERKEY BIGINT NOT NULL,
L_PARTKEY BIGINT NOT NULL,
L_SUPPKEY BIGINT NOT NULL,
L_LINENUMBER INTEGER,
L_QUANTITY DECIMAL,
L_EXTENDEDPRICE DECIMAL,
L_DISCOUNT DECIMAL,
L_TAX DECIMAL,
L_RETURNFLAG CHAR(1),
L_LINESTATUS CHAR(1),
L_SHIPDATE DATE,
L_COMMITDATE DATE,
L_RECEIPTDATE DATE,
L_SHIPINSTRUCT CHAR(25),
L_SHIPMODE CHAR(10),
L_COMMENT VARCHAR(44)
);
CREATE TABLE LINEITEM_DUPLICATED (
L_ORDERKEY BIGINT NOT NULL,
L_PARTKEY BIGINT NOT NULL,
L_SUPPKEY BIGINT NOT NULL,
L_LINENUMBER INTEGER,
L_QUANTITY DECIMAL,
L_EXTENDEDPRICE DECIMAL,
L_DISCOUNT DECIMAL,
L_TAX DECIMAL,
L_RETURNFLAG CHAR(1),
L_LINESTATUS CHAR(1),
L_SHIPDATE DATE,
L_COMMITDATE DATE,
L_RECEIPTDATE DATE,
L_SHIPINSTRUCT CHAR(25),
L_SHIPMODE CHAR(10),
L_COMMENT VARCHAR(44)
);