Skip to content

feat(isthmus): update to calcite 1.39 #380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ com.github.vlsi.vlsi-release-plugins.version=1.74

# library version
antlr.version=4.13.1
calcite.version=1.38.0
calcite.version=1.39.0
guava.version=32.1.3-jre
immutables.version=2.10.1
jackson.version=2.16.1
Expand Down
150 changes: 150 additions & 0 deletions isthmus/src/main/java/io/substrait/isthmus/SubstraitCalciteSchema.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package io.substrait.isthmus;

import io.substrait.relation.NamedScan;
import io.substrait.relation.Rel;
import io.substrait.relation.RelCopyOnWriteVisitor;
import io.substrait.type.NamedStruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.lookup.Lookup;

/**
* A subclass of the Calcite Schema for creation from a Substrait relation
*
* <p>Implementation note:
*
* <p>The external Isthmus API can take a function that will return the table schema when needed,
* rather than it being available up front.
*
* <p>This was implemented by a special subclass of the Calcite simple schema. Since this was
* changed in Calcite 1.39.0; it failed to work; the protected methods it extended from changed.
*
* <p>The new feature of the Calcite schema is the 'lazy' or delayed lookup of tables. This feature
* has not be exploited here
*/
public class SubstraitCalciteSchema extends AbstractSchema {

private Map<String, Table> tables;

protected SubstraitCalciteSchema(Map<String, Table> tables) {
this.tables = tables;
}

@Override
protected Map<String, Table> getTableMap() {
return tables;
}

@Override
public Lookup<Table> tables() {
return super.tables();
}

/**
* Turn this into a root Calciteschema Choice of settings is based on current isthmus behaviour
*/
public CalciteSchema getRootSchema() {
return CalciteSchema.createRootSchema(false, false, "", this);
}

public static Builder builder() {
return new Builder();
}

/**
* Builder class to assist with creating the CalciteSchema
*
* <p>Can be created from a Rel or a Lookup function
*/
public static class Builder {

private Rel rel;
private RelDataTypeFactory typeFactory;
private TypeConverter typeConverter;

public Builder withTypeFactory(RelDataTypeFactory typeFactory) {
this.typeFactory = typeFactory;
return this;
}

public Builder withTypeConverter(TypeConverter typeConverter) {
this.typeConverter = typeConverter;
return this;
}

public Builder withSubstraitRel(Rel rel) {
this.rel = rel;
return this;
}

public SubstraitCalciteSchema build() {
if (typeConverter == null) {
throw new IllegalArgumentException("'TypeConverter' must be specified");
}

if (typeFactory == null) {
throw new IllegalArgumentException("'TypeFactory' must be specified");
}

if (rel == null) {
throw new IllegalArgumentException("'rel' must be specified");
}

// If there are any named structs within the relation, gather these and convert
// them to a map of tables
// index by name; note that the name of the table is 'un-namespaced' here.
// This was the existing logic so it has not been altered.
Map<List<String>, NamedStruct> tableMap = NamedStructGatherer.gatherTables(rel);

Map<String, Table> tables =
tableMap.entrySet().stream()
.map(
entry -> {
var id = entry.getKey();
var name = id.get(id.size() - 1);
var table = entry.getValue();
var value =
new SqlConverterBase.DefinedTable(
name,
typeFactory,
typeConverter.toCalcite(typeFactory, table.struct(), table.names()));
return Map.entry(name, value);
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

return new SubstraitCalciteSchema(tables);
}
}

private static final class NamedStructGatherer extends RelCopyOnWriteVisitor<RuntimeException> {
Map<List<String>, NamedStruct> tableMap;

private NamedStructGatherer() {
super();
this.tableMap = new HashMap<>();
}

public static Map<List<String>, NamedStruct> gatherTables(Rel rel) {
var visitor = new NamedStructGatherer();
rel.accept(visitor);
return visitor.tableMap;
}

@Override
public Optional<Rel> visit(NamedScan namedScan) {
Optional<Rel> result = super.visit(namedScan);

List<String> tableName = namedScan.getNames();
tableMap.put(tableName, namedScan.getInitialSchema());

return result;
}
}
}
53 changes: 6 additions & 47 deletions isthmus/src/main/java/io/substrait/isthmus/SubstraitToCalcite.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,15 @@

import io.substrait.extension.SimpleExtension;
import io.substrait.plan.Plan;
import io.substrait.relation.NamedScan;
import io.substrait.relation.Rel;
import io.substrait.relation.RelCopyOnWriteVisitor;
import io.substrait.type.NamedStruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.LookupCalciteSchema;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.RelBuilder;
Expand Down Expand Up @@ -57,19 +48,12 @@ public SubstraitToCalcite(
* <p>Override this method to customize schema extraction.
*/
protected CalciteSchema toSchema(Rel rel) {
Map<List<String>, NamedStruct> tableMap = NamedStructGatherer.gatherTables(rel);
Function<List<String>, Table> lookup =
id -> {
NamedStruct table = tableMap.get(id);
if (table == null) {
return null;
}
return new SqlConverterBase.DefinedTable(
id.get(id.size() - 1),
typeFactory,
typeConverter.toCalcite(typeFactory, table.struct(), table.names()));
};
return LookupCalciteSchema.createRootSchema(lookup);
return SubstraitCalciteSchema.builder()
.withSubstraitRel(rel)
.withTypeFactory(typeFactory)
.withTypeConverter(typeConverter)
.build()
.getRootSchema();
}

/**
Expand Down Expand Up @@ -179,29 +163,4 @@ private Pair<Integer, RelDataType> renameFields(
return Pair.of(currentIndex, type);
}
}

private static class NamedStructGatherer extends RelCopyOnWriteVisitor<RuntimeException> {
Map<List<String>, NamedStruct> tableMap;

private NamedStructGatherer() {
super();
this.tableMap = new HashMap<>();
}

public static Map<List<String>, NamedStruct> gatherTables(Rel rel) {
var visitor = new NamedStructGatherer();
rel.accept(visitor);
return visitor.tableMap;
}

@Override
public Optional<Rel> visit(NamedScan namedScan) {
Optional<Rel> result = super.visit(namedScan);

List<String> tableName = namedScan.getNames();
tableMap.put(tableName, namedScan.getInitialSchema());

return result;
}
}
}

This file was deleted.

Loading