diff --git a/.editorconfig b/.editorconfig
index cc987b518..ce1567087 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -10,7 +10,7 @@ trim_trailing_whitespace = true
[*.{yaml,yml}]
indent_size = 2
-[{**/*.sql,**/OuterReferenceResolver.md,gradlew.bat}]
+[{**/*.sql,**/OuterReferenceResolver.md,**gradlew.bat}]
charset = unset
end_of_line = unset
insert_final_newline = unset
diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml
index 2feebebea..b362a4d43 100644
--- a/.github/workflows/pr.yml
+++ b/.github/workflows/pr.yml
@@ -86,6 +86,26 @@ jobs:
uses: gradle/actions/setup-gradle@v3
- name: Build with Gradle
run: gradle build --rerun-tasks
+ examples:
+ name: Build Examples
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ with:
+ submodules: recursive
+ - name: Set up JDK 17
+ uses: actions/setup-java@v4
+ with:
+ java-version: '17'
+ distribution: 'temurin'
+ - uses: extractions/setup-just@v2
+ - name: substrait-spark
+ shell: bash
+ run: |
+ pwd
+ ls -lart
+ just -f ./examples/substrait-spark buildapp
+
isthmus-native-image-mac-linux:
name: Build Isthmus Native Image
needs: java
diff --git a/.gitignore b/.gitignore
index d7d9428ea..c84c103c5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,3 +8,4 @@ gen
out/**
*.iws
.vscode
+.pmdCache
diff --git a/examples/substrait-spark/.gitignore b/examples/substrait-spark/.gitignore
new file mode 100644
index 000000000..8965a89f4
--- /dev/null
+++ b/examples/substrait-spark/.gitignore
@@ -0,0 +1,2 @@
+_apps
+_data
diff --git a/examples/substrait-spark/README.md b/examples/substrait-spark/README.md
new file mode 100644
index 000000000..97a53b707
--- /dev/null
+++ b/examples/substrait-spark/README.md
@@ -0,0 +1,587 @@
+# Introduction to the Substrait-Spark library
+
+The Substrait-Spark library was recently added to the [substrait-java](https://github.com/substrait-io/substrait-java) project; this library allows Substrait plans to convert to and from Spark Plans.
+
+
+## How does this work in practice?
+
+Once Spark SQL and Spark DataFrame APIs queries have been created, Spark's optimized query plan can be used generate Substrait plans; and Substrait Plans can be executed on a Spark cluster. Below is a description of how to use this library; there are two sample datasets included for demonstration.
+
+The most commonly used logical relations are supported, including those generated from all the TPC-H queries, but there are currently some gaps in support that prevent all the TPC-DS queries from being translatable.
+
+
+## Running the examples
+
+There are 3 example classes:
+
+- [SparkDataset](./app/src/main/java/io/substrait/examples/SparkDataset.java) that creates a plan starting with the Spark Dataset API
+- [SparkSQL](./app/src/main/java/io/substrait/examples/SparkSQL.java) that creates a plan starting with the Spark SQL API
+- [SparkConsumeSubstrait](./app/src/main/java/io/substrait/examples/SparkConsumeSubstrait.java) that loads a Substrait plan and executes it
+
+
+
+### Requirements
+
+To run these you will need:
+
+- Java 17 or greater
+- Docker to start a test Spark Cluster
+ - you could use your own cluster, but would need to adjust file locations defined in [SparkHelper](./app/src/main/java/io/substrait/examples/SparkHelper.java)
+- [just task runner](https://github.com/casey/just#installation) optional, but very helpful to run the bash commands
+- [Two datafiles](./app/src/main/resources/) are provided (CSV format)
+
+For building using the `substrait-spark` library youself, using the [mvn repository](https://mvnrepository.com/artifact/io.substrait/spark)
+
+Using maven:
+```xml
+
+
+ io.substrait
+ spark
+ 0.36.0
+
+```
+
+Using Gradle (groovy)
+```groovy
+// https://mvnrepository.com/artifact/io.substrait/spark
+implementation 'io.substrait:spark:0.36.0'
+```
+
+### Setup configuration
+
+Firstly the application needs to be built; this is a simple Java application. As well issuing the `gradle` build command it also creates two directories `_apps` and `_data`. The JAR file and will be copied to the `_apps` directory and the datafiles to the `_data`. Note that the permissions on the `_data` directory are set to group write - this allows the spark process in the docker container to write the output plan
+
+To run using `just`
+```
+just buildapp
+
+# or
+
+./gradlew build
+mkdir -p ./_data && chmod g+w ./_data
+mkdir -p ./_apps
+
+cp ./app/build/libs/app.jar ./_apps
+cp ./app/src/main/resources/*.csv ./_data
+
+```
+
+- In the `_data` directory there are now two csv files [tests_subset_2023.csv](./app/src/main/resources/tests_subset_2023.csv) and [vehicles_subset_2023.csv](./app/src/main/resources/vehicles_subset_2023.csv)
+
+
+Second, you can start the basic Spark cluster - this uses `docker compose`. It is best to start this is a separate window
+
+```
+just spark
+```
+
+- In [SparkHelper](./app/src/main/java/io/substrait/examples/SparkHelper.java) there are constants defined to match these locations
+
+```java
+ public static final String VEHICLES_PQ_CSV = "vehicles_subset_2023.csv";
+ public static final String TESTS_PQ_CSV = "tests_subset_2023.csv";
+ public static final String ROOT_DIR = "file:/opt/spark-data";
+```
+
+- To run the application `exec` into the SparkMaster node, and issue `spark-submit`
+
+```
+docker exec -it subtrait-spark-spark-1 bash
+/opt/spark/bin/spark-submit --master spark://subtrait-spark-spark-1:7077 --driver-memory 1G --executor-memory 1G /opt/spark-apps/app.jar
+```
+
+The `justfile` has three targets to make it easy to run the examples
+
+- `just dataset` runs the Dataset API and produces `spark_dataset_substrait.plan`
+- `just sql` runs the SQL api and produces `spark_sql_substrait.plan`
+- `just consume ` runs the specified plan (from the `_data` directory)
+
+
+
+
+## Creating a Substrait Plan
+
+In [SparkSQL](./app/src/main/java/io/substrait/examples/SparkSQL.java) is a simple use of SQL to join the two tables; after reading the two CSV files, the SQL query is defined. This is then run on Spark.
+
+### Loading data
+
+Firstly the filenames are created, and the CSV files read. Temporary views need to be created to refer to these tables in the SQL query.
+
+```java
+ String vehiclesFile = Paths.get(ROOT_DIR, VEHICLES_CSV).toString();
+ String testsFile = Paths.get(ROOT_DIR, TESTS_CSV).toString();
+
+ spark.read().option("delimiter", ",").option("header", "true").csv(vehiclesFile)
+ .createOrReplaceTempView(VEHICLE_TABLE);
+ spark.read().option("delimiter", ",").option("header", "true").csv(testsFile)
+ .createOrReplaceTempView(TESTS_TABLE);
+```
+
+### Creating the SQL query
+
+The standard SQL query string as an example will find the counts of all cars (arranged by colour) of all vehicles that have passed the vehicle safety test.
+
+```java
+ String sqlQuery = """
+ SELECT vehicles.colour, count(*) as colourcount
+ FROM vehicles
+ INNER JOIN tests ON vehicles.vehicle_id=tests.vehicle_id
+ WHERE tests.test_result = 'P'
+ GROUP BY vehicles.colour
+ ORDER BY count(*)
+ """;
+ var result = spark.sql(sqlQuery);
+ result.show();
+```
+
+If we were to just run this as-is, the output table would be below.
+```
++------+-----------+
+|colour|colourcount|
++------+-----------+
+| GREEN| 1|
+|BRONZE| 1|
+| RED| 2|
+| BLACK| 2|
+| GREY| 2|
+| BLUE| 2|
+|SILVER| 3|
+| WHITE| 5|
++------+-----------+
+```
+
+### Logical and Optimized Query Plans
+
+THe next step is to look at the logical and optimised query plans that Spark has constructed.
+
+```java
+ LogicalPlan logical = result.logicalPlan();
+ System.out.println(logical);
+
+ LogicalPlan optimised = result.queryExecution().optimizedPlan();
+ System.out.println(optimised);
+
+```
+
+The logical plan will be:
+
+```
+Sort [colourcount#30L ASC NULLS FIRST], true
++- Aggregate [colour#3], [colour#3, count(1) AS colourcount#30L]
+ +- Filter (test_result#19 = P)
+ +- Join Inner, (vehicle_id#0L = vehicle_id#15L)
+ :- SubqueryAlias vehicles
+ : +- View (`vehicles`, [vehicle_id#0L,make#1,model#2,colour#3,fuel_type#4,cylinder_capacity#5L,first_use_date#6])
+ : +- Relation [vehicle_id#0L,make#1,model#2,colour#3,fuel_type#4,cylinder_capacity#5L,first_use_date#6] csv
+ +- SubqueryAlias tests
+ +- View (`tests`, [test_id#14L,vehicle_id#15L,test_date#16,test_class#17,test_type#18,test_result#19,test_mileage#20L,postcode_area#21])
+ +- Relation [test_id#14L,vehicle_id#15L,test_date#16,test_class#17,test_type#18,test_result#19,test_mileage#20L,postcode_area#21] csv
+```
+
+Similarly, the optimized plan can be found; here the `SubQuery` and `View` have been converted into Project and Filter
+
+```
+Sort [colourcount#30L ASC NULLS FIRST], true
++- Aggregate [colour#3], [colour#3, count(1) AS colourcount#30L]
+ +- Project [colour#3]
+ +- Join Inner, (vehicle_id#0L = vehicle_id#15L)
+ :- Project [vehicle_id#0L, colour#3]
+ : +- Filter isnotnull(vehicle_id#0L)
+ : +- Relation [vehicle_id#0L,make#1,model#2,colour#3,fuel_type#4,cylinder_capacity#5L,first_use_date#6] csv
+ +- Project [vehicle_id#15L]
+ +- Filter ((isnotnull(test_result#19) AND (test_result#19 = P)) AND isnotnull(vehicle_id#15L))
+ +- Relation [test_id#14L,vehicle_id#15L,test_date#16,test_class#17,test_type#18,test_result#19,test_mileage#20L,postcode_area#21] csv
+```
+
+### Dataset API
+
+Alternatively, the dataset API can be used to create the plans, the code for this in [`SparkDataset`](./app/src/main/java/io/substrait/examples/SparkDataset.java). The overall flow of the code is very similar
+
+Rather than create a temporary view, the reference to the datasets are kept in `dsVehicles` and `dsTests`
+```java
+ dsVehicles = spark.read().option("delimiter", ",").option("header", "true").csv(vehiclesFile);
+ dsVehicles.show();
+
+ dsTests = spark.read().option("delimiter", ",").option("header", "true").csv(testsFile);
+ dsTests.show();
+```
+
+They query can be constructed based on these two datasets
+
+```java
+ Dataset joinedDs = dsVehicles.join(dsTests, dsVehicles.col("vehicle_id").equalTo(dsTests.col("vehicle_id")))
+ .filter(dsTests.col("test_result").equalTo("P"))
+ .groupBy(dsVehicles.col("colour"))
+ .count();
+
+ joinedDs = joinedDs.orderBy(joinedDs.col("count"));
+ joinedDs.show();
+```
+
+Using the same APIs, the Spark's optimized plan is available. If you compare this to the plan above you will see that structurally it is identical.
+
+```
+Sort [count#189L ASC NULLS FIRST], true
++- Aggregate [colour#20], [colour#20, count(1) AS count#189L]
+ +- Project [colour#20]
+ +- Join Inner, (vehicle_id#17 = vehicle_id#86)
+ :- Project [vehicle_id#17, colour#20]
+ : +- Filter isnotnull(vehicle_id#17)
+ : +- Relation [vehicle_id#17,make#18,model#19,colour#20,fuel_type#21,cylinder_capacity#22,first_use_date#23] csv
+ +- Project [vehicle_id#86]
+ +- Filter ((isnotnull(test_result#90) AND (test_result#90 = P)) AND isnotnull(vehicle_id#86))
+ +- Relation [test_id#85,vehicle_id#86,test_date#87,test_class#88,test_type#89,test_result#90,test_mileage#91,postcode_area#92] csv
+```
+
+### Substrait Creation
+
+This optimized plan is the best starting point to produce a Substrait Plan; there's a `createSubstrait(..)` function that does the work and writes a binary protobuf file (`spark)
+
+```
+ LogicalPlan optimised = result.queryExecution().optimizedPlan();
+ System.out.println(optimised);
+
+ createSubstrait(optimised);
+```
+
+Let's look at the APIs in the `createSubstrait(...)` method to see how it's using the `Substrait-Spark` Library.
+
+```java
+ ToSubstraitRel toSubstrait = new ToSubstraitRel();
+ io.substrait.plan.Plan plan = toSubstrait.convert(enginePlan);
+```
+
+`ToSubstraitRel` is the main class and provides the convert method; this takes the Spark plan (optimized plan is best) and produce the Substrait Plan. The most common relations are supported currently - and the optimized plan is more likely to use these.
+
+The `io.substrait.plan.Plan` object is a high-level Substrait POJO representing a plan. This could be used directly or more likely be persisted. protobuf is the canonical serialization form. It's easy to convert this and store in a file
+
+```java
+ PlanProtoConverter planToProto = new PlanProtoConverter();
+ byte[] buffer = planToProto.toProto(plan).toByteArray();
+ try {
+ Files.write(Paths.get(ROOT_DIR, "spark_sql_substrait.plan"),buffer);
+ } catch (IOException e){
+ e.printStackTrace();
+ }
+```
+
+For the dataset approach, the `spark_dataset_substrait.plan` is created, and for the SQL approach the `spark_sql_substrait.plan` is created. These Intermediate Representations of the query can be saved, transferred and reloaded into a Data Engine.
+
+We can also review the Substrait plan's structure; the canonical format of the Substrait plan is the binary protobuf format, but it's possible to produce a textual version, an example is below. Both the Substrait plans from the Dataset or SQL APIs generate the same output.
+
+```
+
+Root :: ImmutableSort [colour, count]
+
++- Sort:: FieldRef#/I64/StructField{offset=1} ASC_NULLS_FIRST
+ +- Project:: [Str, I64, Str, I64]
+ +- Aggregate:: FieldRef#/Str/StructField{offset=0}
+ +- Project:: [Str, Str, Str, Str]
+ +- Join:: INNER equal:any_any
+ : arg0 = FieldRef#/Str/StructField{offset=0}
+ : arg1 = FieldRef#/Str/StructField{offset=2}
+ +- Project:: [Str, Str, Str, Str, Str, Str, Str, Str, Str]
+ +- Filter:: is_not_null:any
+ : arg0 = FieldRef#/Str/StructField{offset=0}
+ +- LocalFiles::
+ : file:///opt/spark-data/vehicles_subset_2023.csv len=1547 partition=0 start=0
+ +- Project:: [Str, Str, Str, Str, Str, Str, Str, Str, Str]
+ +- Filter:: and:bool
+ : arg0 = and:bool
+ : arg0 = is_not_null:any
+ : arg0 = FieldRef#/Str/StructField{offset=5}
+ : arg1 = equal:any_any
+ : arg0 = FieldRef#/Str/StructField{offset=5}
+ : arg1 =
+ : arg1 = is_not_null:any
+ : arg0 = FieldRef#/Str/StructField{offset=1}
+ +- LocalFiles::
+ : file:///opt/spark-data/tests_subset_2023.csv len=1491 partition=0 start=0
+```
+
+There is a more detail in this version that the Spark versions; details of the functions called for example are included. However, the structure of the overall plan is identical with 1 exception. There is an additional `project` relation included between the `sort` and `aggregate` - this is necessary to get the correct types of the output data.
+
+We can also see in this case as the plan came from Spark directly it's also included the location of the datafiles. Below when we reload this into Spark, the locations of the files don't need to be explicitly included.
+
+
+As `Substrait Spark` library also allows plans to be loaded and executed, so the next step is to consume these Substrait plans.
+
+## Consuming a Substrait Plan
+
+The [`SparkConsumeSubstrait`](./app/src/main/java/io/substrait/examples/SparkConsumeSubstrait.java) code shows how to load this file, and most importantly how to convert it to a Spark engine plan to execute
+
+Loading the binary protobuf file is the reverse of the writing process (in the code the file name comes from a command line argument, here we're showing the hardcode file name )
+
+```java
+ byte[] buffer = Files.readAllBytes(Paths.get("spark_sql_substrait.plan"));
+ io.substrait.proto.Plan proto = io.substrait.proto.Plan.parseFrom(buffer);
+
+ ProtoPlanConverter protoToPlan = new ProtoPlanConverter();
+ Plan plan = protoToPlan.from(proto);
+
+```
+The loaded byte array is first converted into the protobuf Plan, and then into the Substrait Plan object. Note it can be useful to name the variables, and/or use the pull class names to keep track of it's the ProtoBuf Plan or the high-level POJO Plan.
+
+Finally this can be converted to a Spark Plan:
+
+```java
+ ToLogicalPlan substraitConverter = new ToLogicalPlan(spark);
+ LogicalPlan sparkPlan = substraitConverter.convert(plan);
+```
+
+If you were to print out this plan, it has the identical structure to the plan seen earlier on.
+
+```
++- Sort [count(1)#18L ASC NULLS FIRST], true
+ +- Aggregate [colour#5], [colour#5, count(1) AS count(1)#18L]
+ +- Project [colour#5]
+ +- Join Inner, (vehicle_id#2 = vehicle_id#10)
+ :- Project [vehicle_id#2, colour#5]
+ : +- Filter isnotnull(vehicle_id#2)
+ : +- Relation [vehicle_id#2,make#3,model#4,colour#5,fuel_type#6,cylinder_capacity#7,first_use_date#8] csv
+ +- Project [vehicle_id#10]
+ +- Filter ((isnotnull(test_result#14) AND (test_result#14 = P)) AND isnotnull(vehicle_id#10))
+ +- Relation [test_id#9,vehicle_id#10,test_date#11,test_class#12,test_type#13,test_result#14,test_mileage#15,postcode_area#16] csv
+
+```
+
+Executed of this plan is then simple `Dataset.ofRows(spark, sparkPlan).show();` giving the output of
+
+```java
++------+-----+
+|colour|count|
++------+-----+
+| GREEN| 1|
+|BRONZE| 1|
+| RED| 2|
+| BLACK| 2|
+| GREY| 2|
+| BLUE| 2|
+|SILVER| 3|
+| WHITE| 5|
++------+-----+
+```
+
+### Observations
+
+To recap on the steps above
+
+- Two CSV files have been loaded into Spark
+- Using either the Spark SQL or the Spark Dataset API we can produce a query across those two datasets
+- Both queries result in Spark creating a logical and optimized query plan
+ - And both being are structurally identical
+- Using the Substrait-Java library, we can convert the optimized plan into the Substrait format.
+- This Substrait intermediate representation of the query can be serialized via the protobuf format
+ - Here store as a flat file containing the bytes of that protobuf
+- *Separately* this file can be loaded and the Substrait Plan converted to a Spark Plan
+- This can be run in an application on Spark getting the same results
+
+---
+## Plan Comparison
+
+The structure of the query plans for both Spark and Substrait are structurally very similar.
+
+### Aggregate and Sort
+
+Spark's plan has a Project that filters down to the colour, followed by the Aggregation and Sort.
+```
++- Sort [count(1)#18L ASC NULLS FIRST], true
+ +- Aggregate [colour#5], [colour#5, count(1) AS count(1)#18L]
+ +- Project [colour#5]
+```
+
+When converted to Substrait the Sort and Aggregate is in the same order, but there are additional projects; it's not reduced the number of fields as early.
+
+```
++- Sort:: FieldRef#/I64/StructField{offset=1} ASC_NULLS_FIRST
+ +- Project:: [Str, I64, Str, I64]
+ +- Aggregate:: FieldRef#/Str/StructField{offset=0}
+```
+
+### Inner Join
+
+Spark's inner join is taking as inputs the two filtered relations; it's ensuring the join key is not null but also the `test_result==p` check.
+
+```
+ +- Join Inner, (vehicle_id#2 = vehicle_id#10)
+ :- Project [vehicle_id#2, colour#5]
+ : +- Filter isnotnull(vehicle_id#2)
+
+ +- Project [vehicle_id#10]
+ +- Filter ((isnotnull(test_result#14) AND (test_result#14 = P)) AND isnotnull(vehicle_id#10))
+
+```
+
+The Substrait Representation looks longer, but is showing the same structure. (note that this format is a custom format implemented as [SubstraitStingify](...) as the standard text output can be hard to read).
+
+```
+ +- Join:: INNER equal:any_any
+ : arg0 = FieldRef#/Str/StructField{offset=0}
+ : arg1 = FieldRef#/Str/StructField{offset=2}
+ +- Project:: [Str, Str, Str, Str, Str, Str, Str, Str, Str]
+ +- Filter:: is_not_null:any
+ : arg0 = FieldRef#/Str/StructField{offset=0}
+
+ +- Project:: [Str, Str, Str, Str, Str, Str, Str, Str, Str]
+ +- Filter:: and:bool
+ : arg0 = and:bool
+ : arg0 = is_not_null:any
+ : arg0 = FieldRef#/Str/StructField{offset=5}
+ : arg1 = equal:any_any
+ : arg0 = FieldRef#/Str/StructField{offset=5}
+ : arg1 =
+ : arg1 = is_not_null:any
+ : arg0 = FieldRef#/Str/StructField{offset=1}
+```
+
+### LocalFiles
+
+The source of the data originally was two csv files; in the Spark plan this is referred to by csv suffix: ` Relation [...] csv`; this is represented in the Substrait plan as
+```
+ +- LocalFiles::
+ : file:///opt/spark-data/tests_subset_2023.csv len=1491 partition=0 start=0
+```
+
+There is a dedicated Substrait `ReadRel` relation for referencing files, it does include additional information about the type of the file, size, format and options for reading those specific formats. Parquet/Arrow/Orc/ProtoBuf/Dwrf currently all have specific option structures.
+
+## Data Locations
+
+The implication of a relation that includes a filename is seen when the plan is deserialized and executed; the binary Substrait plan needs to be read, converted into a Substrait Plan POJO and passed to the Spark-Substrait library to be converted. Once converted it can be directly executed.
+
+The plan itself contains all the information needed to be able to execute the query.
+
+A slight difference is observed when the Spark DataFrame is saved as a Hive table. Using `saveAsTable(...)` and `table(...)` the data can be persisted.
+
+```java
+ String vehiclesFile = Paths.get(ROOT_DIR, VEHICLES_CSV).toString();
+ Dataset dsVehicles = spark.read().option("delimiter", ",").option("header", "true").csv(vehiclesFile);
+ dsVehicles.write().saveAsTable("vehicles");
+
+ spark.read().table("vehicles").show();
+```
+
+When this is table is read and used in queries the Substrait "ReadRel" will be a `NamedScan` instead; this is referring to a table
+`[spark_catalog, default, vehicles]` - default is the name of the default Spark database.
+
+```
+ +- NamedScan:: Tables=[spark_catalog, default, vehicles] Fields=vehicle_id[Str],make[Str],model[Str],colour[Str],fuel_type[Str],cylinder_capacity[Str],first_use_date[Str]
+```
+
+This plan can be consumed in exactly the same many as the other plans; the only difference being, _if the table is not aleady_ present it will fail to execute. There isn't the source of the data, rather a reference name, and the expected fields. Ensuring the data is present in Spark, the query will execute without issue.
+
+## Observations on LoadFiles/NamedScan
+
+Including the information on the location of the data permits easy use of the plan. In the example here this worked well; however there could be difficulties depending on the recipient engine. Substrait as an intermediate form gives the ability to transfer the plans between engines; how different engines catalogue their data will be relevant.
+
+For example the above plan can be handled with PyArrow or DuckDB (as an example there are a variety of other engines); the code for consuming the plans is straightforward.
+
+```python
+ with open(PLAN_FILE, "wb") as file:
+ planbytes = file.read()
+ reader = substrait.run_query(
+ base64.b64decode(planbytes),
+ table_provider=self.simple_provider,
+ )
+ result = reader.read_all()
+
+```
+
+When run with the plan pyarrow instantly rejects it with
+
+```
+pyarrow.lib.ArrowNotImplementedError: non-default substrait::ReadRel::LocalFiles::FileOrFiles::length
+```
+
+DuckDB has a simiar API `connection.from_substrait(planbyhtes)` and produces a different error
+
+```
+duckdb.duckdb.IOException: IO Error: No files found that match the pattern "file:///opt/spark-data/tests_subset_2023.csv"
+```
+
+This shows that different engines will potentially have different supported relations; PyArrow wants to delegate the loading of the data to the user, whereas DuckDB is happy to load files. DuckDB though of course can only proceed with the information that it has, the URI of the file here is coupled to the location of the data on the originating engine. Something like a s3 uri could be potentially useful.
+
+Creating a plan from Spark but where the data is saved as table provides an alternative. Depending on the engine this can also need some careful handling. In the `NamedScan` above, the name was a list of 3 strings. `Tables=[spark_catalog, default, vehicles]`. Whilst DuckDB's implementation understands that these are referring to a table, its own catalogue can't be indexed with these three values.
+
+```
+duckdb.duckdb.CatalogException: Catalog Error: Table with name spark_catalog does not exist!
+```
+
+PyArrow takes a different approach in locating the data. In the PyArrow code above there is a reference to a `table_provider`; the job of 'providing a table' is delegated back to the user.
+
+Firstly we need to load the datasets to PyArrow datasets
+```python
+ test = pq.read_table(TESTS_PQ_FILE)
+ vehicles = pq.read_table(VEHICLES_PQ_FILE)
+```
+
+We can define a `table_provider` function; this logs which table is being requested, but also what the expected schema is.
+As names is a array, we can check the final part of the name and return the matching dataset.
+
+```python
+ def table_provider(self, names, schema):
+ print(f"== Requesting table {names} with schema: \n{schema}\n")
+
+ if not names:
+ raise Exception("No names provided")
+ else:
+ if names[-1] == "tests":
+ return self.test
+ elif names[-1] == "vehicles":
+ return self.vehicles
+
+ raise Exception(f"Unrecognized table name {names}")
+```
+
+
+When run the output is along these lines (the query is slightly different here for simplicity); we can see the tables being request and the schema expected. Nothing is done with the schema here but could be useful for ensuring that the expectations of the plan match the schema of the data held in the engine.
+
+```
+== Requesting table ['spark_catalog', 'default', 'vehicles'] with schema:
+vehicle_id: string
+make: string
+model: string
+colour: string
+fuel_type: string
+cylinder_capacity: string
+first_use_date: string
+
+== Requesting table ['spark_catalog', 'default', 'tests'] with schema:
+test_id: string
+vehicle_id: string
+test_date: string
+test_class: string
+test_type: string
+test_result: string
+test_mileage: string
+postcode_area: string
+
+ colour test_result
+0 WHITE P
+1 WHITE F
+2 BLACK P
+3 BLACK P
+4 RED P
+5 BLACK P
+6 BLUE P
+7 SILVER F
+8 SILVER F
+9 BLACK P
+```
+
+# Summary
+
+The Substrait intermediate representation of the query can be serialized via the protobuf format and transferred between engines of the same type or between different engines.
+
+In the case of Spark for example, identical plans can be created with the Spark SQL or the Spark Dataset API.
+*Separately* this file can be loaded and the Substrait Plan converted to a Spark Plan. Assuming that the consuming engine has the same understanding of the reference to LocalFiles the plan can be read and executed.
+
+Logical references to a 'table' via a `NamedScan` gives more flexibility; but the structure of the reference still needs to be properly understood and agreed upon.
+
+Once common understanding is agreed upon, transferring plans between engines brings great flexibility and potential.
+
+
+
+
+
+
diff --git a/examples/substrait-spark/app/.gitignore b/examples/substrait-spark/app/.gitignore
new file mode 100644
index 000000000..2ee4e319c
--- /dev/null
+++ b/examples/substrait-spark/app/.gitignore
@@ -0,0 +1,2 @@
+spark-warehouse
+derby.log
diff --git a/examples/substrait-spark/app/build.gradle b/examples/substrait-spark/app/build.gradle
new file mode 100644
index 000000000..cb2710b8b
--- /dev/null
+++ b/examples/substrait-spark/app/build.gradle
@@ -0,0 +1,62 @@
+/*
+ * This file was generated by the Gradle 'init' task.
+ *
+ * This project uses @Incubating APIs which are subject to change.
+ */
+
+plugins {
+ id 'buildlogic.java-application-conventions'
+}
+
+dependencies {
+ implementation 'org.apache.commons:commons-text'
+ // for running as a Spark application for real, this could be compile-only
+
+
+ implementation libs.substrait.core
+ implementation libs.substrait.spark
+ implementation libs.spark.sql
+
+ // For a real Spark application, these would not be required since they would be in the Spark server classpath
+ runtimeOnly libs.spark.core
+// https://mvnrepository.com/artifact/org.apache.spark/spark-hive
+ runtimeOnly libs.spark.hive
+
+
+
+}
+
+def jvmArguments = [
+ "--add-exports",
+ "java.base/sun.nio.ch=ALL-UNNAMED",
+ "--add-opens=java.base/java.net=ALL-UNNAMED",
+ "--add-opens=java.base/java.nio=ALL-UNNAMED",
+ "-Dspark.master=local"
+]
+
+application {
+ // Define the main class for the application.
+ mainClass = 'io.substrait.examples.App'
+ applicationDefaultJvmArgs = jvmArguments
+}
+
+jar {
+ zip64 = true
+ duplicatesStrategy = DuplicatesStrategy.EXCLUDE
+
+ manifest {
+ attributes 'Main-Class': 'io.substrait.examples.App'
+ }
+
+ from {
+ configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
+ }
+
+ exclude 'META-INF/*.RSA'
+ exclude 'META-INF/*.SF'
+ exclude 'META-INF/*.DSA'
+}
+
+repositories {
+
+}
diff --git a/examples/substrait-spark/app/src/main/java/io/substrait/examples/App.java b/examples/substrait-spark/app/src/main/java/io/substrait/examples/App.java
new file mode 100644
index 000000000..fed789b3f
--- /dev/null
+++ b/examples/substrait-spark/app/src/main/java/io/substrait/examples/App.java
@@ -0,0 +1,40 @@
+package io.substrait.examples;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import io.substrait.plan.Plan;
+import io.substrait.plan.ProtoPlanConverter;
+
+public class App {
+
+ public static interface Action {
+ public void run(String arg);
+ }
+
+ private App() {
+ }
+
+ public static void main(String args[]) {
+ try {
+
+ if (args.length == 0) {
+ args = new String[] { "SparkDataset" };
+ }
+ String exampleClass = args[0];
+
+ var clz = Class.forName(App.class.getPackageName() + "." + exampleClass);
+ var action = (Action) clz.getDeclaredConstructor().newInstance();
+
+ if (args.length == 2) {
+ action.run(args[1]);
+ } else {
+ action.run(null);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/examples/substrait-spark/app/src/main/java/io/substrait/examples/SparkConsumeSubstrait.java b/examples/substrait-spark/app/src/main/java/io/substrait/examples/SparkConsumeSubstrait.java
new file mode 100644
index 000000000..761209850
--- /dev/null
+++ b/examples/substrait-spark/app/src/main/java/io/substrait/examples/SparkConsumeSubstrait.java
@@ -0,0 +1,49 @@
+package io.substrait.examples;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+
+import io.substrait.plan.Plan;
+import io.substrait.plan.ProtoPlanConverter;
+import io.substrait.spark.logical.ToLogicalPlan;
+
+import static io.substrait.examples.SparkHelper.ROOT_DIR;
+
+/** Minimal Spark application */
+public class SparkConsumeSubstrait implements App.Action {
+
+ public SparkConsumeSubstrait() {
+ }
+
+ @Override
+ public void run(String arg) {
+
+ // Connect to a local in-process Spark instance
+ try (SparkSession spark = SparkHelper.connectLocalSpark()) {
+
+ System.out.println("Reading from " + arg);
+ byte[] buffer = Files.readAllBytes(Paths.get(ROOT_DIR, arg));
+
+ io.substrait.proto.Plan proto = io.substrait.proto.Plan.parseFrom(buffer);
+ ProtoPlanConverter protoToPlan = new ProtoPlanConverter();
+ Plan plan = protoToPlan.from(proto);
+
+ ToLogicalPlan substraitConverter = new ToLogicalPlan(spark);
+ LogicalPlan sparkPlan = substraitConverter.convert(plan);
+
+ System.out.println(sparkPlan);
+
+ Dataset.ofRows(spark, sparkPlan).show();
+
+ spark.stop();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/examples/substrait-spark/app/src/main/java/io/substrait/examples/SparkDataset.java b/examples/substrait-spark/app/src/main/java/io/substrait/examples/SparkDataset.java
new file mode 100644
index 000000000..4f0e668c7
--- /dev/null
+++ b/examples/substrait-spark/app/src/main/java/io/substrait/examples/SparkDataset.java
@@ -0,0 +1,80 @@
+package io.substrait.examples;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import java.io.IOException;
+import java.nio.file.*;
+import io.substrait.plan.PlanProtoConverter;
+import io.substrait.spark.logical.ToSubstraitRel;
+import static io.substrait.examples.SparkHelper.ROOT_DIR;
+import static io.substrait.examples.SparkHelper.TESTS_CSV;
+import static io.substrait.examples.SparkHelper.VEHICLES_CSV;
+
+/** Minimal Spark application */
+public class SparkDataset implements App.Action {
+
+ public SparkDataset() {
+
+ }
+
+ @Override
+ public void run(String arg) {
+
+ // Connect to a local in-process Spark instance
+ try (SparkSession spark = SparkHelper.connectLocalSpark()) {
+
+ Dataset dsVehicles;
+ Dataset dsTests;
+
+ // load from CSV files
+ String vehiclesFile = Paths.get(ROOT_DIR, VEHICLES_CSV).toString();
+ String testsFile = Paths.get(ROOT_DIR, TESTS_CSV).toString();
+
+ System.out.println("Reading "+vehiclesFile);
+ System.out.println("Reading "+testsFile);
+
+ dsVehicles = spark.read().option("delimiter", ",").option("header", "true").csv(vehiclesFile);
+ dsVehicles.show();
+
+ dsTests = spark.read().option("delimiter", ",").option("header", "true").csv(testsFile);
+ dsTests.show();
+
+ // created the joined dataset
+ Dataset joinedDs = dsVehicles.join(dsTests, dsVehicles.col("vehicle_id").equalTo(dsTests.col("vehicle_id")))
+ .filter(dsTests.col("test_result").equalTo("P"))
+ .groupBy(dsVehicles.col("colour"))
+ .count();
+
+ joinedDs = joinedDs.orderBy(joinedDs.col("count"));
+ joinedDs.show();
+
+ LogicalPlan plan = joinedDs.queryExecution().optimizedPlan();
+
+ System.out.println(plan);
+ createSubstrait(plan);
+
+ spark.stop();
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ public void createSubstrait(LogicalPlan enginePlan) {
+ ToSubstraitRel toSubstrait = new ToSubstraitRel();
+ io.substrait.plan.Plan plan = toSubstrait.convert(enginePlan);
+
+ System.out.println(plan);
+
+ PlanProtoConverter planToProto = new PlanProtoConverter();
+ byte[] buffer = planToProto.toProto(plan).toByteArray();
+ try {
+ Files.write(Paths.get(ROOT_DIR,"spark_dataset_substrait.plan"), buffer);
+ System.out.println("File written to "+Paths.get(ROOT_DIR,"spark_sql_substrait.plan"));
+ } catch (IOException e) {
+ e.printStackTrace(System.out);
+ }
+ }
+
+}
diff --git a/examples/substrait-spark/app/src/main/java/io/substrait/examples/SparkHelper.java b/examples/substrait-spark/app/src/main/java/io/substrait/examples/SparkHelper.java
new file mode 100644
index 000000000..7bed7fae4
--- /dev/null
+++ b/examples/substrait-spark/app/src/main/java/io/substrait/examples/SparkHelper.java
@@ -0,0 +1,44 @@
+package io.substrait.examples;
+
+import org.apache.spark.sql.SparkSession;
+
+public class SparkHelper {
+ public static final String NAMESPACE = "demo_db";
+ public static final String VEHICLE_TABLE = "vehicles";
+ public static final String TESTS_TABLE = "tests";
+
+ public static final String VEHICLES_PQ = "vehicles_subset_2023.parquet";
+ public static final String TESTS_PQ = "tests_subset_2023.parquet";
+
+ public static final String VEHICLES_CSV = "vehicles_subset_2023.csv";
+ public static final String TESTS_CSV = "tests_subset_2023.csv";
+
+ public static final String ROOT_DIR = "/opt/spark-data";
+
+ // Connect to local spark for demo purposes
+ public static SparkSession connectSpark(String spark_master) {
+
+ SparkSession spark = SparkSession.builder()
+ // .config("spark.sql.warehouse.dir", "spark-warehouse")
+ .config("spark.master", spark_master)
+ .enableHiveSupport()
+ .getOrCreate();
+
+ spark.sparkContext().setLogLevel("ERROR");
+
+ return spark;
+ }
+
+ public static SparkSession connectLocalSpark() {
+
+ SparkSession spark = SparkSession.builder()
+ .enableHiveSupport()
+ .getOrCreate();
+
+ spark.sparkContext().setLogLevel("ERROR");
+
+ return spark;
+ }
+
+
+}
diff --git a/examples/substrait-spark/app/src/main/java/io/substrait/examples/SparkSQL.java b/examples/substrait-spark/app/src/main/java/io/substrait/examples/SparkSQL.java
new file mode 100644
index 000000000..3bdd26e96
--- /dev/null
+++ b/examples/substrait-spark/app/src/main/java/io/substrait/examples/SparkSQL.java
@@ -0,0 +1,86 @@
+package io.substrait.examples;
+
+import static io.substrait.examples.SparkHelper.ROOT_DIR;
+import static io.substrait.examples.SparkHelper.TESTS_CSV;
+import static io.substrait.examples.SparkHelper.TESTS_TABLE;
+import static io.substrait.examples.SparkHelper.VEHICLES_CSV;
+import static io.substrait.examples.SparkHelper.VEHICLE_TABLE;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+
+import io.substrait.plan.PlanProtoConverter;
+import io.substrait.spark.logical.ToSubstraitRel;
+
+/** Minimal Spark application */
+public class SparkSQL implements App.Action {
+
+ public SparkSQL() {
+
+ }
+
+ @Override
+ public void run(String arg) {
+
+ // Connect to a local in-process Spark instance
+ try (SparkSession spark = SparkHelper.connectLocalSpark()) {
+ spark.catalog().listDatabases().show();
+
+ // load from CSV files
+ String vehiclesFile = Paths.get(ROOT_DIR, VEHICLES_CSV).toString();
+ String testsFile = Paths.get(ROOT_DIR, TESTS_CSV).toString();
+
+ System.out.println("Reading " + vehiclesFile);
+ System.out.println("Reading " + testsFile);
+
+ spark.read().option("delimiter", ",").option("header", "true").csv(vehiclesFile)
+ .createOrReplaceTempView(VEHICLE_TABLE);
+ spark.read().option("delimiter", ",").option("header", "true").csv(testsFile)
+ .createOrReplaceTempView(TESTS_TABLE);
+
+ String sqlQuery = """
+ SELECT vehicles.colour, count(*) as colourcount
+ FROM vehicles
+ INNER JOIN tests ON vehicles.vehicle_id=tests.vehicle_id
+ WHERE tests.test_result = 'P'
+ GROUP BY vehicles.colour
+ ORDER BY count(*)
+ """;
+
+ var result = spark.sql(sqlQuery);
+ result.show();
+
+ LogicalPlan logical = result.logicalPlan();
+ System.out.println(logical);
+
+ LogicalPlan optimised = result.queryExecution().optimizedPlan();
+ System.out.println(optimised);
+
+ createSubstrait(optimised);
+ spark.stop();
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ public void createSubstrait(LogicalPlan enginePlan) {
+ ToSubstraitRel toSubstrait = new ToSubstraitRel();
+ io.substrait.plan.Plan plan = toSubstrait.convert(enginePlan);
+ System.out.println(plan);
+
+ PlanProtoConverter planToProto = new PlanProtoConverter();
+ byte[] buffer = planToProto.toProto(plan).toByteArray();
+ try {
+ Files.write(Paths.get(ROOT_DIR,"spark_sql_substrait.plan"), buffer);
+ System.out.println("File written to "+Paths.get(ROOT_DIR,"spark_sql_substrait.plan"));
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/examples/substrait-spark/app/src/main/resources/tests_subset_2023.csv b/examples/substrait-spark/app/src/main/resources/tests_subset_2023.csv
new file mode 100644
index 000000000..762d53491
--- /dev/null
+++ b/examples/substrait-spark/app/src/main/resources/tests_subset_2023.csv
@@ -0,0 +1,30 @@
+test_id,vehicle_id,test_date,test_class,test_type,test_result,test_mileage,postcode_area
+539514409,17113014,2023-01-09,4,NT,F,69934,PA
+1122718877,986649781,2023-01-16,4,NT,F,57376,SG
+1104881351,424684356,2023-03-06,4,NT,F,81853,SG
+1487493049,1307056703,2023-03-07,4,NT,P,20763,SA
+1107861883,130747047,2023-03-27,4,RT,P,125910,SA
+472789285,777757523,2023-03-29,4,NT,P,68399,CO
+1105082521,840180863,2023-04-15,4,NT,P,54240,NN
+1172953135,917255260,2023-04-27,4,NT,P,60918,SM
+127807783,888103385,2023-05-08,4,NT,P,112090,EH
+1645970709,816803134,2023-06-03,4,NT,P,134858,RG
+1355347761,919820431,2023-06-21,4,NT,P,37336,ST
+1750209849,544950855,2023-06-23,4,NT,F,120034,NR
+1376930435,439876988,2023-07-19,4,NT,P,109927,PO
+582729949,1075446447,2023-07-19,4,NT,P,72986,SA
+127953451,105663799,2023-07-31,4,NT,F,35824,ME
+759291679,931759350,2023-08-07,4,NT,P,65353,DY
+1629819891,335780567,2023-08-08,4,NT,PRS,103365,CF
+1120026477,1153361746,2023-08-11,4,NT,P,286881,RM
+1331300969,644861283,2023-08-15,4,NT,P,52173,LE
+990694587,449899992,2023-08-16,4,NT,F,124891,SA
+193460599,759696266,2023-08-29,4,NT,P,83554,LU
+1337337679,1110416764,2023-10-09,4,NT,PRS,71093,SS
+1885237527,137785384,2023-11-04,4,NT,P,88730,BH
+1082642803,1291985882,2023-11-15,4,NT,PRS,160717,BA
+896066743,615735063,2023-11-15,4,RT,P,107710,NR
+1022666841,474362449,2023-11-20,4,NT,P,56296,HP
+1010400923,1203222226,2023-12-04,4,NT,F,89255,TW
+866705687,605696575,2023-12-06,4,NT,P,14674,YO
+621751843,72093448,2023-12-14,4,NT,F,230280,TR
diff --git a/examples/substrait-spark/app/src/main/resources/vehicles_subset_2023.csv b/examples/substrait-spark/app/src/main/resources/vehicles_subset_2023.csv
new file mode 100644
index 000000000..087b54c84
--- /dev/null
+++ b/examples/substrait-spark/app/src/main/resources/vehicles_subset_2023.csv
@@ -0,0 +1,31 @@
+vehicle_id,make,model,colour,fuel_type,cylinder_capacity,first_use_date
+17113014,VAUXHALL,VIVARO,BLACK,DI,1995,2011-09-29
+986649781,VAUXHALL,INSIGNIA,WHITE,DI,1956,2017-07-19
+424684356,RENAULT,GRAND SCENIC,GREY,PE,1997,2010-07-19
+1307056703,RENAULT,CLIO,BLACK,DI,1461,2014-05-30
+130747047,FORD,FOCUS,SILVER,DI,1560,2013-07-10
+777757523,HYUNDAI,I10,WHITE,PE,998,2016-05-21
+840180863,BMW,1 SERIES,WHITE,PE,2979,2016-03-11
+917255260,VAUXHALL,ASTRA,WHITE,PE,1364,2012-04-21
+888103385,FORD,GALAXY,SILVER,DI,1997,2014-09-12
+816803134,FORD,FIESTA,BLUE,PE,1299,2002-10-24
+697184031,BMW,X1,WHITE,DI,1995,2016-03-31
+919820431,TOYOTA,AURIS,BRONZE,PE,1329,2015-06-29
+544950855,VAUXHALL,ASTRA,RED,DI,1956,2012-09-17
+439876988,MINI,MINI,GREEN,PE,1598,2010-03-31
+1075446447,CITROEN,C4,RED,DI,1560,2015-10-05
+105663799,RENAULT,KADJAR,BLACK,PE,1332,2020-07-23
+931759350,FIAT,DUCATO,WHITE,DI,2199,2008-04-18
+335780567,HYUNDAI,I20,BLUE,PE,1396,2013-08-13
+1153361746,TOYOTA,PRIUS,SILVER,HY,1800,2010-06-23
+644861283,FORD,FIESTA,BLACK,PE,998,2015-09-03
+449899992,BMW,3 SERIES,GREEN,DI,2926,2006-09-30
+759696266,CITROEN,C4,BLUE,DI,1997,2011-12-19
+1110416764,CITROEN,XSARA,SILVER,DI,1997,1999-06-30
+137785384,MINI,MINI,GREY,DI,1598,2011-11-29
+1291985882,LAND ROVER,DEFENDER,BLUE,DI,2495,2002-06-12
+615735063,VOLKSWAGEN,CADDY,WHITE,DI,1598,2013-03-01
+474362449,VAUXHALL,GRANDLAND,GREY,PE,1199,2018-11-12
+1203222226,VAUXHALL,ASTRA,BLUE,PE,1598,2010-06-03
+605696575,SUZUKI,SWIFT SZ-T DUALJET MHEV CVT,RED,HY,1197,2020-12-18
+72093448,AUDI,A4,SILVER,DI,1896,2001-03-19
diff --git a/examples/substrait-spark/build-logic/build.gradle b/examples/substrait-spark/build-logic/build.gradle
new file mode 100644
index 000000000..d29beaf6e
--- /dev/null
+++ b/examples/substrait-spark/build-logic/build.gradle
@@ -0,0 +1,16 @@
+/*
+ * This file was generated by the Gradle 'init' task.
+ *
+ * This project uses @Incubating APIs which are subject to change.
+ */
+
+plugins {
+ // Support convention plugins written in Groovy. Convention plugins are build scripts in 'src/main' that automatically become available as plugins in the main build.
+ id 'groovy-gradle-plugin'
+}
+
+repositories {
+
+ // Use the plugin portal to apply community plugins in convention plugins.
+ gradlePluginPortal()
+}
diff --git a/examples/substrait-spark/build-logic/settings.gradle b/examples/substrait-spark/build-logic/settings.gradle
new file mode 100644
index 000000000..58fbfd5cb
--- /dev/null
+++ b/examples/substrait-spark/build-logic/settings.gradle
@@ -0,0 +1,15 @@
+/*
+ * This file was generated by the Gradle 'init' task.
+ *
+ * This settings file is used to specify which projects to include in your build-logic build.
+ * This project uses @Incubating APIs which are subject to change.
+ */
+
+dependencyResolutionManagement {
+ // Reuse version catalog from the main build.
+ versionCatalogs {
+ create('libs', { from(files("../gradle/libs.versions.toml")) })
+ }
+}
+
+rootProject.name = 'build-logic'
diff --git a/examples/substrait-spark/build-logic/src/main/groovy/buildlogic.java-application-conventions.gradle b/examples/substrait-spark/build-logic/src/main/groovy/buildlogic.java-application-conventions.gradle
new file mode 100644
index 000000000..1006b9b31
--- /dev/null
+++ b/examples/substrait-spark/build-logic/src/main/groovy/buildlogic.java-application-conventions.gradle
@@ -0,0 +1,13 @@
+/*
+ * This file was generated by the Gradle 'init' task.
+ *
+ * This project uses @Incubating APIs which are subject to change.
+ */
+
+plugins {
+ // Apply the common convention plugin for shared build configuration between library and application projects.
+ id 'buildlogic.java-common-conventions'
+
+ // Apply the application plugin to add support for building a CLI application in Java.
+ id 'application'
+}
diff --git a/examples/substrait-spark/build-logic/src/main/groovy/buildlogic.java-common-conventions.gradle b/examples/substrait-spark/build-logic/src/main/groovy/buildlogic.java-common-conventions.gradle
new file mode 100644
index 000000000..1f605ee5f
--- /dev/null
+++ b/examples/substrait-spark/build-logic/src/main/groovy/buildlogic.java-common-conventions.gradle
@@ -0,0 +1,39 @@
+/*
+ * This file was generated by the Gradle 'init' task.
+ *
+ * This project uses @Incubating APIs which are subject to change.
+ */
+
+plugins {
+ // Apply the java Plugin to add support for Java.
+ id 'java'
+}
+
+repositories {
+ // Use Maven Central for resolving dependencies.
+ mavenCentral()
+}
+
+dependencies {
+ constraints {
+ // Define dependency versions as constraints
+ implementation 'org.apache.commons:commons-text:1.11.0'
+ }
+}
+
+testing {
+ suites {
+ // Configure the built-in test suite
+ test {
+ // Use JUnit Jupiter test framework
+ useJUnitJupiter('5.10.1')
+ }
+ }
+}
+
+// Apply a specific Java toolchain to ease working on different environments.
+java {
+ toolchain {
+ languageVersion = JavaLanguageVersion.of(17)
+ }
+}
diff --git a/examples/substrait-spark/build-logic/src/main/groovy/buildlogic.java-library-conventions.gradle b/examples/substrait-spark/build-logic/src/main/groovy/buildlogic.java-library-conventions.gradle
new file mode 100644
index 000000000..526803e32
--- /dev/null
+++ b/examples/substrait-spark/build-logic/src/main/groovy/buildlogic.java-library-conventions.gradle
@@ -0,0 +1,13 @@
+/*
+ * This file was generated by the Gradle 'init' task.
+ *
+ * This project uses @Incubating APIs which are subject to change.
+ */
+
+plugins {
+ // Apply the common convention plugin for shared build configuration between library and application projects.
+ id 'buildlogic.java-common-conventions'
+
+ // Apply the java-library plugin for API and implementation separation.
+ id 'java-library'
+}
diff --git a/examples/substrait-spark/docker-compose.yaml b/examples/substrait-spark/docker-compose.yaml
new file mode 100644
index 000000000..15252983e
--- /dev/null
+++ b/examples/substrait-spark/docker-compose.yaml
@@ -0,0 +1,32 @@
+services:
+ spark:
+ image: docker.io/bitnami/spark:3.5
+ user: ":${MY_GID}"
+ environment:
+ - SPARK_MODE=master
+ - SPARK_RPC_AUTHENTICATION_ENABLED=no
+ - SPARK_RPC_ENCRYPTION_ENABLED=no
+ - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
+ - SPARK_SSL_ENABLED=no
+ - SPARK_USER=spark
+ ports:
+ - '8080:8080'
+ volumes:
+ - ./_apps:/opt/spark-apps
+ - ./_data:/opt/spark-data
+ spark-worker:
+ image: docker.io/bitnami/spark:3.5
+ user: ":${MY_GID}"
+ environment:
+ - SPARK_MODE=worker
+ - SPARK_MASTER_URL=spark://spark:7077
+ - SPARK_WORKER_MEMORY=1G
+ - SPARK_WORKER_CORES=1
+ - SPARK_RPC_AUTHENTICATION_ENABLED=no
+ - SPARK_RPC_ENCRYPTION_ENABLED=no
+ - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
+ - SPARK_SSL_ENABLED=no
+ - SPARK_USER=spark
+ volumes:
+ - ./_apps:/opt/spark-apps
+ - ./_data:/opt/spark-data
diff --git a/examples/substrait-spark/gradle.properties b/examples/substrait-spark/gradle.properties
new file mode 100644
index 000000000..18f452c73
--- /dev/null
+++ b/examples/substrait-spark/gradle.properties
@@ -0,0 +1,6 @@
+# This file was generated by the Gradle 'init' task.
+# https://docs.gradle.org/current/userguide/build_environment.html#sec:gradle_configuration_properties
+
+org.gradle.parallel=true
+org.gradle.caching=true
+
diff --git a/examples/substrait-spark/gradle/libs.versions.toml b/examples/substrait-spark/gradle/libs.versions.toml
new file mode 100644
index 000000000..8a36ae4d9
--- /dev/null
+++ b/examples/substrait-spark/gradle/libs.versions.toml
@@ -0,0 +1,14 @@
+# This file was generated by the Gradle 'init' task.
+# https://docs.gradle.org/current/userguide/platforms.html#sub::toml-dependencies-format
+[versions]
+spark = "3.5.1"
+spotless = "6.25.0"
+substrait = "0.36.0"
+substrait-spark = "0.36.0"
+
+[libraries]
+spark-core = { module = "org.apache.spark:spark-core_2.12", version.ref = "spark" }
+spark-sql = { module = "org.apache.spark:spark-sql_2.12", version.ref = "spark" }
+spark-hive = { module = "org.apache.spark:spark-hive_2.12", version.ref = "spark" }
+substrait-spark = { module = "io.substrait:spark", version.ref = "substrait-spark" }
+substrait-core = { module = "io.substrait:core", version.ref = "substrait" }
diff --git a/examples/substrait-spark/gradle/wrapper/gradle-wrapper.jar b/examples/substrait-spark/gradle/wrapper/gradle-wrapper.jar
new file mode 100644
index 000000000..e6441136f
Binary files /dev/null and b/examples/substrait-spark/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/examples/substrait-spark/gradle/wrapper/gradle-wrapper.properties b/examples/substrait-spark/gradle/wrapper/gradle-wrapper.properties
new file mode 100644
index 000000000..b82aa23a4
--- /dev/null
+++ b/examples/substrait-spark/gradle/wrapper/gradle-wrapper.properties
@@ -0,0 +1,7 @@
+distributionBase=GRADLE_USER_HOME
+distributionPath=wrapper/dists
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
+networkTimeout=10000
+validateDistributionUrl=true
+zipStoreBase=GRADLE_USER_HOME
+zipStorePath=wrapper/dists
diff --git a/examples/substrait-spark/gradlew b/examples/substrait-spark/gradlew
new file mode 100755
index 000000000..1aa94a426
--- /dev/null
+++ b/examples/substrait-spark/gradlew
@@ -0,0 +1,249 @@
+#!/bin/sh
+
+#
+# Copyright © 2015-2021 the original authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+##############################################################################
+#
+# Gradle start up script for POSIX generated by Gradle.
+#
+# Important for running:
+#
+# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is
+# noncompliant, but you have some other compliant shell such as ksh or
+# bash, then to run this script, type that shell name before the whole
+# command line, like:
+#
+# ksh Gradle
+#
+# Busybox and similar reduced shells will NOT work, because this script
+# requires all of these POSIX shell features:
+# * functions;
+# * expansions «$var», «${var}», «${var:-default}», «${var+SET}»,
+# «${var#prefix}», «${var%suffix}», and «$( cmd )»;
+# * compound commands having a testable exit status, especially «case»;
+# * various built-in commands including «command», «set», and «ulimit».
+#
+# Important for patching:
+#
+# (2) This script targets any POSIX shell, so it avoids extensions provided
+# by Bash, Ksh, etc; in particular arrays are avoided.
+#
+# The "traditional" practice of packing multiple parameters into a
+# space-separated string is a well documented source of bugs and security
+# problems, so this is (mostly) avoided, by progressively accumulating
+# options in "$@", and eventually passing that to Java.
+#
+# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS,
+# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly;
+# see the in-line comments for details.
+#
+# There are tweaks for specific operating systems such as AIX, CygWin,
+# Darwin, MinGW, and NonStop.
+#
+# (3) This script is generated from the Groovy template
+# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
+# within the Gradle project.
+#
+# You can find Gradle at https://github.com/gradle/gradle/.
+#
+##############################################################################
+
+# Attempt to set APP_HOME
+
+# Resolve links: $0 may be a link
+app_path=$0
+
+# Need this for daisy-chained symlinks.
+while
+ APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path
+ [ -h "$app_path" ]
+do
+ ls=$( ls -ld "$app_path" )
+ link=${ls#*' -> '}
+ case $link in #(
+ /*) app_path=$link ;; #(
+ *) app_path=$APP_HOME$link ;;
+ esac
+done
+
+# This is normally unused
+# shellcheck disable=SC2034
+APP_BASE_NAME=${0##*/}
+# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
+APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit
+
+# Use the maximum available, or set MAX_FD != -1 to use that value.
+MAX_FD=maximum
+
+warn () {
+ echo "$*"
+} >&2
+
+die () {
+ echo
+ echo "$*"
+ echo
+ exit 1
+} >&2
+
+# OS specific support (must be 'true' or 'false').
+cygwin=false
+msys=false
+darwin=false
+nonstop=false
+case "$( uname )" in #(
+ CYGWIN* ) cygwin=true ;; #(
+ Darwin* ) darwin=true ;; #(
+ MSYS* | MINGW* ) msys=true ;; #(
+ NONSTOP* ) nonstop=true ;;
+esac
+
+CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
+
+
+# Determine the Java command to use to start the JVM.
+if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD=$JAVA_HOME/jre/sh/java
+ else
+ JAVACMD=$JAVA_HOME/bin/java
+ fi
+ if [ ! -x "$JAVACMD" ] ; then
+ die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+ fi
+else
+ JAVACMD=java
+ if ! command -v java >/dev/null 2>&1
+ then
+ die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+ fi
+fi
+
+# Increase the maximum file descriptors if we can.
+if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
+ case $MAX_FD in #(
+ max*)
+ # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
+ # shellcheck disable=SC2039,SC3045
+ MAX_FD=$( ulimit -H -n ) ||
+ warn "Could not query maximum file descriptor limit"
+ esac
+ case $MAX_FD in #(
+ '' | soft) :;; #(
+ *)
+ # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
+ # shellcheck disable=SC2039,SC3045
+ ulimit -n "$MAX_FD" ||
+ warn "Could not set maximum file descriptor limit to $MAX_FD"
+ esac
+fi
+
+# Collect all arguments for the java command, stacking in reverse order:
+# * args from the command line
+# * the main class name
+# * -classpath
+# * -D...appname settings
+# * --module-path (only if needed)
+# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables.
+
+# For Cygwin or MSYS, switch paths to Windows format before running java
+if "$cygwin" || "$msys" ; then
+ APP_HOME=$( cygpath --path --mixed "$APP_HOME" )
+ CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" )
+
+ JAVACMD=$( cygpath --unix "$JAVACMD" )
+
+ # Now convert the arguments - kludge to limit ourselves to /bin/sh
+ for arg do
+ if
+ case $arg in #(
+ -*) false ;; # don't mess with options #(
+ /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath
+ [ -e "$t" ] ;; #(
+ *) false ;;
+ esac
+ then
+ arg=$( cygpath --path --ignore --mixed "$arg" )
+ fi
+ # Roll the args list around exactly as many times as the number of
+ # args, so each arg winds up back in the position where it started, but
+ # possibly modified.
+ #
+ # NB: a `for` loop captures its iteration list before it begins, so
+ # changing the positional parameters here affects neither the number of
+ # iterations, nor the values presented in `arg`.
+ shift # remove old arg
+ set -- "$@" "$arg" # push replacement arg
+ done
+fi
+
+
+# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
+
+# Collect all arguments for the java command:
+# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
+# and any embedded shellness will be escaped.
+# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
+# treated as '${Hostname}' itself on the command line.
+
+set -- \
+ "-Dorg.gradle.appname=$APP_BASE_NAME" \
+ -classpath "$CLASSPATH" \
+ org.gradle.wrapper.GradleWrapperMain \
+ "$@"
+
+# Stop when "xargs" is not available.
+if ! command -v xargs >/dev/null 2>&1
+then
+ die "xargs is not available"
+fi
+
+# Use "xargs" to parse quoted args.
+#
+# With -n1 it outputs one arg per line, with the quotes and backslashes removed.
+#
+# In Bash we could simply go:
+#
+# readarray ARGS < <( xargs -n1 <<<"$var" ) &&
+# set -- "${ARGS[@]}" "$@"
+#
+# but POSIX shell has neither arrays nor command substitution, so instead we
+# post-process each arg (as a line of input to sed) to backslash-escape any
+# character that might be a shell metacharacter, then use eval to reverse
+# that process (while maintaining the separation between arguments), and wrap
+# the whole thing up as a single "set" statement.
+#
+# This will of course break if any of these variables contains a newline or
+# an unmatched quote.
+#
+
+eval "set -- $(
+ printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" |
+ xargs -n1 |
+ sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' |
+ tr '\n' ' '
+ )" '"$@"'
+
+exec "$JAVACMD" "$@"
diff --git a/examples/substrait-spark/gradlew.bat b/examples/substrait-spark/gradlew.bat
new file mode 100644
index 000000000..7101f8e46
--- /dev/null
+++ b/examples/substrait-spark/gradlew.bat
@@ -0,0 +1,92 @@
+@rem
+@rem Copyright 2015 the original author or authors.
+@rem
+@rem Licensed under the Apache License, Version 2.0 (the "License");
+@rem you may not use this file except in compliance with the License.
+@rem You may obtain a copy of the License at
+@rem
+@rem https://www.apache.org/licenses/LICENSE-2.0
+@rem
+@rem Unless required by applicable law or agreed to in writing, software
+@rem distributed under the License is distributed on an "AS IS" BASIS,
+@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@rem See the License for the specific language governing permissions and
+@rem limitations under the License.
+@rem
+
+@if "%DEBUG%"=="" @echo off
+@rem ##########################################################################
+@rem
+@rem Gradle startup script for Windows
+@rem
+@rem ##########################################################################
+
+@rem Set local scope for the variables with windows NT shell
+if "%OS%"=="Windows_NT" setlocal
+
+set DIRNAME=%~dp0
+if "%DIRNAME%"=="" set DIRNAME=.
+@rem This is normally unused
+set APP_BASE_NAME=%~n0
+set APP_HOME=%DIRNAME%
+
+@rem Resolve any "." and ".." in APP_HOME to make it shorter.
+for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
+
+@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
+
+@rem Find java.exe
+if defined JAVA_HOME goto findJavaFromJavaHome
+
+set JAVA_EXE=java.exe
+%JAVA_EXE% -version >NUL 2>&1
+if %ERRORLEVEL% equ 0 goto execute
+
+echo. 1>&2
+echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2
+echo. 1>&2
+echo Please set the JAVA_HOME variable in your environment to match the 1>&2
+echo location of your Java installation. 1>&2
+
+goto fail
+
+:findJavaFromJavaHome
+set JAVA_HOME=%JAVA_HOME:"=%
+set JAVA_EXE=%JAVA_HOME%/bin/java.exe
+
+if exist "%JAVA_EXE%" goto execute
+
+echo. 1>&2
+echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2
+echo. 1>&2
+echo Please set the JAVA_HOME variable in your environment to match the 1>&2
+echo location of your Java installation. 1>&2
+
+goto fail
+
+:execute
+@rem Setup the command line
+
+set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
+
+
+@rem Execute Gradle
+"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
+
+:end
+@rem End local scope for the variables with windows NT shell
+if %ERRORLEVEL% equ 0 goto mainEnd
+
+:fail
+rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
+rem the _cmd.exe /c_ return code!
+set EXIT_CODE=%ERRORLEVEL%
+if %EXIT_CODE% equ 0 set EXIT_CODE=1
+if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE%
+exit /b %EXIT_CODE%
+
+:mainEnd
+if "%OS%"=="Windows_NT" endlocal
+
+:omega
diff --git a/examples/substrait-spark/justfile b/examples/substrait-spark/justfile
new file mode 100644
index 000000000..9a138d278
--- /dev/null
+++ b/examples/substrait-spark/justfile
@@ -0,0 +1,56 @@
+# Main justfile to run all the development scripts
+# To install 'just' see https://github.com/casey/just#installation
+
+# Ensure all properties are exported as shell env-vars
+set export
+set dotenv-load
+
+# set the current directory, and the location of the test dats
+CWDIR := justfile_directory()
+
+SPARK_VERSION := "3.5.1"
+
+SPARK_MASTER_CONTAINER := "subtrait-spark-spark-1"
+
+_default:
+ @just -f {{justfile()}} --list
+
+buildapp:
+ #!/bin/bash
+ set -e -o pipefail
+
+ ${CWDIR}/gradlew build
+
+ # need to let the SPARK user be able to write to the _data mount
+ mkdir -p ${CWDIR}/_data && chmod g+w ${CWDIR}/_data
+ mkdir -p ${CWDIR}/_apps
+
+ cp ${CWDIR}/app/build/libs/app.jar ${CWDIR}/_apps
+ cp ${CWDIR}/app/src/main/resources/*.csv ${CWDIR}/_data
+
+dataset:
+ #!/bin/bash
+ set -e -o pipefail
+
+ docker exec -it ${SPARK_MASTER_CONTAINER} bash -c "/opt/bitnami/spark/bin/spark-submit --master spark://${SPARK_MASTER_CONTAINER}:7077 --driver-memory 1G --executor-memory 1G /opt/spark-apps/app.jar SparkDataset"
+
+sql:
+ #!/bin/bash
+ set -e -o pipefail
+
+ docker exec -it ${SPARK_MASTER_CONTAINER} bash -c "/opt/bitnami/spark/bin/spark-submit --master spark://${SPARK_MASTER_CONTAINER}:7077 --driver-memory 1G --executor-memory 1G /opt/spark-apps/app.jar SparkSQL"
+
+consume arg:
+ #!/bin/bash
+ set -e -o pipefail
+
+ docker exec -it ${SPARK_MASTER_CONTAINER} bash -c "/opt/bitnami/spark/bin/spark-submit --master spark://${SPARK_MASTER_CONTAINER}:7077 --driver-memory 1G --executor-memory 1G /opt/spark-apps/app.jar SparkConsumeSubstrait {{arg}}"
+
+
+spark:
+ #!/bin/bash
+ set -e -o pipefail
+
+ export MY_UID=$(id -u)
+ export MY_GID=$(id -g)
+ docker compose up
diff --git a/examples/substrait-spark/settings.gradle b/examples/substrait-spark/settings.gradle
new file mode 100644
index 000000000..ed37a683e
--- /dev/null
+++ b/examples/substrait-spark/settings.gradle
@@ -0,0 +1,20 @@
+/*
+ * This file was generated by the Gradle 'init' task.
+ *
+ * The settings file is used to specify which projects to include in your build.
+ * For more detailed information on multi-project builds, please refer to https://docs.gradle.org/8.7/userguide/multi_project_builds.html in the Gradle documentation.
+ * This project uses @Incubating APIs which are subject to change.
+ */
+
+pluginManagement {
+ // Include 'plugins build' to define convention plugins.
+ includeBuild('build-logic')
+}
+
+plugins {
+ // Apply the foojay-resolver plugin to allow automatic download of JDKs
+ id 'org.gradle.toolchains.foojay-resolver-convention' version '0.8.0'
+}
+
+rootProject.name = 'flexdata-spark'
+include('app')
diff --git a/readme.md b/readme.md
index 4ae53a2fc..d527584a9 100644
--- a/readme.md
+++ b/readme.md
@@ -33,5 +33,11 @@ SLF4J(W): Defaulting to no-operation (NOP) logger implementation
SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details.
```
+## Examples
+
+The [examples](./examples) folder contains examples on using Substrait with Java; please check each example for specific details of the requirements and how to run. The examples are aimed to be tested within the github workflow; depending on the setup required it might be only possible to validate compilation.
+
+- [Substrait-Spark](./examples/subtrait-spark/README.md) Using Substrait to produce and consume plans within Apache Spark
+
## Getting Involved
To learn more, head over [Substrait](https://substrait.io/), our parent project and join our [community](https://substrait.io/community/)