-
Notifications
You must be signed in to change notification settings - Fork 315
build: Enable Spark SQL tests for Spark 4.1.1 #4093
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
0118dda
7a0dd7e
203b88a
b17726e
626c966
8521e41
7205d4d
330e400
50008a6
5daf943
e93e67d
58bd76a
05cd6c4
5a60be2
9a154b7
84379ec
dce8dfa
cf81dea
1190b5a
fc8e8e3
98a178c
ebbc249
f5edafa
2c79a49
59494d1
f50b4a7
fff9158
cb639a3
e91c669
c5a0b81
9f4ec9c
ef1c480
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.comet.shims | ||
|
|
||
| import org.apache.spark.sql.execution.datasources.VariantMetadata | ||
| import org.apache.spark.sql.types.{DataType, StringType, StructType} | ||
|
|
||
| trait CometTypeShim { | ||
| // A `StringType` carries collation metadata in Spark 4.0. Only non-default (non-UTF8_BINARY) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is common for Spark 4.0 and Spark 4.1, we can move it from
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks most shims in Spark 4.1 are identical to Spark 4.0 except for CometExprShim. I added a
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was addressed in another PR that is now merged, and this PR was rebased |
||
| // collations have semantics Comet's byte-level hashing/sorting/equality cannot honor. The | ||
| // default `StringType` object is `StringType(UTF8_BINARY_COLLATION_ID)`, so comparing | ||
| // `collationId` against that instance's id picks out non-default collations without needing | ||
| // `private[sql]` helpers on `StringType`. | ||
| def isStringCollationType(dt: DataType): Boolean = dt match { | ||
| case st: StringType => st.collationId != StringType.collationId | ||
| case _ => false | ||
| } | ||
|
|
||
| // Spark 4.0's `PushVariantIntoScan` rewrites `VariantType` columns into a `StructType` whose | ||
| // fields each carry `__VARIANT_METADATA_KEY` metadata, then pushes `variant_get` paths down as | ||
| // ordinary struct field accesses. Comet's native scans don't understand the on-disk Parquet | ||
| // variant shredding layout, so reading such a struct natively returns nulls. Detect the marker | ||
| // and force scan fallback. | ||
| def isVariantStruct(s: StructType): Boolean = VariantMetadata.isVariantStruct(s) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.comet.shims | ||
|
|
||
| import org.apache.spark.paths.SparkPath | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.execution.datasources.PartitionedFile | ||
|
|
||
| object ShimBatchReader { | ||
| def newPartitionedFile(partitionValues: InternalRow, file: String): PartitionedFile = | ||
| PartitionedFile( | ||
| partitionValues, | ||
| SparkPath.fromUrlString(file), | ||
| -1, // -1 means we read the entire file | ||
| -1, | ||
| Array.empty[String], | ||
| 0, | ||
| 0) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.comet.shims | ||
|
|
||
| trait ShimCometConf { | ||
| protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = true | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.comet.shims | ||
|
|
||
| import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat | ||
| import org.apache.spark.sql.execution.datasources.parquet.ParquetRowIndexUtil | ||
| import org.apache.spark.sql.types.StructType | ||
|
|
||
| object ShimFileFormat { | ||
| // A name for a temporary column that holds row indexes computed by the file format reader | ||
| // until they can be placed in the _metadata struct. | ||
| val ROW_INDEX_TEMPORARY_COLUMN_NAME = ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME | ||
|
|
||
| def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int = | ||
| ParquetRowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.comet.shims | ||
|
|
||
| import org.apache.spark.executor.TaskMetrics | ||
| import org.apache.spark.util.AccumulatorV2 | ||
|
|
||
| object ShimTaskMetrics { | ||
|
|
||
| def getTaskAccumulator(taskMetrics: TaskMetrics): Option[AccumulatorV2[_, _]] = | ||
| taskMetrics._externalAccums.lastOption | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also add 4.2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually, yes. This PR is for 4.1.1 but we can go through the same process for 4.2 in a separate PR.