Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented May 13, 2025

Which issue does this PR close?

Part of #1254

Closes #1252

Rationale for this change

The config COMET_SHUFFLE_FALLBACK_TO_COLUMNAR was added as a hack so that we could avoid fixing some failing Spark SQL tests. However, the failing tests are real issues and only show up now because more queries are running natively.

What changes are included in this PR?

This PR removes the hack and updates the diff files to either fix or ignore failing tests. Some of these tests will be enabled later as part of #1254.

  • Remove the hack
  • Update 3.4.3 diffs
  • Update 3.5.4 diffs
  • Update 3.5.5 diffs
  • Update 4.0.0-preview1 diffs

How are these changes tested?

@codecov-commenter
Copy link

codecov-commenter commented May 13, 2025

Codecov Report

Attention: Patch coverage is 0% with 4 lines in your changes missing coverage. Please review.

Project coverage is 59.40%. Comparing base (f09f8af) to head (c105513).
Report is 247 commits behind head on main.

Files with missing lines Patch % Lines
...he/comet/rules/EliminateRedundantTransitions.scala 0.00% 1 Missing and 2 partials ⚠️
...pache/spark/sql/comet/CometColumnarToRowExec.scala 0.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1736      +/-   ##
============================================
+ Coverage     56.12%   59.40%   +3.27%     
- Complexity      976     1151     +175     
============================================
  Files           119      130      +11     
  Lines         11743    12661     +918     
  Branches       2251     2375     +124     
============================================
+ Hits           6591     7521     +930     
+ Misses         4012     3930      -82     
- Partials       1140     1210      +70     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@andygrove
Copy link
Member Author

There are now many more test failures than when this PR was first created.

@coderfender
Copy link
Contributor

Working on this

1 similar comment
@coderfender
Copy link
Contributor

Working on this

@andygrove
Copy link
Member Author

The remaining failure is related to exchange reuse in TPC-DS q44.

2025-06-06T00:48:02.3079684Z OUTPUT: TakeOrderedAndProject(limit=100, orderBy=[rnk#18761 ASC NULLS FIRST], output=[rnk#18761,best_performing#18767,worst_performing#18768])
2025-06-06T00:48:02.3081088Z +- Project [rnk#18761, i_product_name#18894 AS best_performing#18767, i_product_name#18905 AS worst_performing#18768]
2025-06-06T00:48:02.3082152Z    +- BroadcastHashJoin [item_sk#18762], [i_item_sk#18796], Inner, BuildRight, false
2025-06-06T00:48:02.3082940Z       :- Project [rnk#18761, item_sk#18762, i_product_name#18894]
2025-06-06T00:48:02.3083811Z       :  +- BroadcastHashJoin [item_sk#18757], [i_item_sk#722], Inner, BuildRight, false
2025-06-06T00:48:02.3084580Z       :     :- Project [item_sk#18757, rnk#18761, item_sk#18762]
2025-06-06T00:48:02.3085314Z       :     :  +- BroadcastHashJoin [rnk#18761], [rnk#18766], Inner, BuildRight, false
2025-06-06T00:48:02.3086015Z       :     :     :- Project [item_sk#18757, rnk#18761]
2025-06-06T00:48:02.3086766Z       :     :     :  +- Filter ((rnk#18761 < 11) AND isnotnull(item_sk#18757))
2025-06-06T00:48:02.3088247Z       :     :     :     +- Window [rank(rank_col#18758) windowspecdefinition(rank_col#18758 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#18761], [rank_col#18758 ASC NULLS FIRST]
2025-06-06T00:48:02.3089876Z       :     :     :        +- WindowGroupLimit [rank_col#18758 ASC NULLS FIRST], rank(rank_col#18758), 10, Final
2025-06-06T00:48:02.3090689Z       :     :     :           +- Sort [rank_col#18758 ASC NULLS FIRST], false, 0
2025-06-06T00:48:02.3091724Z       :     :     :              +- Filter (isnotnull(rank_col#18758) AND (cast(rank_col#18758 as decimal(13,7)) > (0.9 * ReusedSubquery Subquery subquery#18765, [id=#147283])))
2025-06-06T00:48:02.3092900Z       :     :     :                 :  +- ReusedSubquery Subquery subquery#18765, [id=#147283]
2025-06-06T00:48:02.3093962Z       :     :     :                 +- HashAggregate(keys=[ss_item_sk#701], functions=[avg(UnscaledValue(ss_net_profit#721))], output=[item_sk#18757, rank_col#18758])
2025-06-06T00:48:02.3094946Z       :     :     :                    +- CometColumnarToRow
2025-06-06T00:48:02.3095489Z       :     :     :                       +- ShuffleQueryStage 0
2025-06-06T00:48:02.3096562Z       :     :     :                          +- CometExchange hashpartitioning(ss_item_sk#701, 1), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=146851]
2025-06-06T00:48:02.3097956Z       :     :     :                             +- !CometHashAggregate [ss_item_sk#701, ss_net_profit#721], Partial, [ss_item_sk#701], [partial_avg(UnscaledValue(ss_net_profit#721))]
2025-06-06T00:48:02.3099165Z       :     :     :                                +- CometProject [ss_item_sk#701, ss_net_profit#721], [ss_item_sk#701, ss_net_profit#721]
2025-06-06T00:48:02.3100381Z       :     :     :                                   +- CometFilter [ss_item_sk#701, ss_store_sk#706, ss_net_profit#721], (isnotnull(ss_store_sk#706) AND (ss_store_sk#706 = 4))
2025-06-06T00:48:02.3103650Z       :     :     :                                      +- CometScan parquet spark_catalog.default.store_sales[ss_item_sk#701,ss_store_sk#706,ss_net_profit#721] Batched: true, DataFilters: [isnotnull(ss_store_sk#706), (ss_store_sk#706 = 4)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/__w/datafusion-comet/datafusion-comet/tpcds-sf-1/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk), EqualTo(ss_store_sk,4)], ReadSchema: struct<ss_item_sk:int,ss_store_sk:int,ss_net_profit:decimal(7,2)>
2025-06-06T00:48:02.3106642Z       :     :     +- BroadcastQueryStage 4
2025-06-06T00:48:02.3107602Z       :     :        +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, false] as bigint)),false), [plan_id=147448]
2025-06-06T00:48:02.3108617Z       :     :           +- *(2) Project [item_sk#18762, rnk#18766]
2025-06-06T00:48:02.3109338Z       :     :              +- *(2) Filter ((rnk#18766 < 11) AND isnotnull(item_sk#18762))
2025-06-06T00:48:02.3110827Z       :     :                 +- Window [rank(rank_col#18763) windowspecdefinition(rank_col#18763 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#18766], [rank_col#18763 DESC NULLS LAST]
2025-06-06T00:48:02.3112485Z       :     :                    +- WindowGroupLimit [rank_col#18763 DESC NULLS LAST], rank(rank_col#18763), 10, Final
2025-06-06T00:48:02.3113319Z       :     :                       +- *(1) Sort [rank_col#18763 DESC NULLS LAST], false, 0
2025-06-06T00:48:02.3114412Z       :     :                          +- *(1) Filter (isnotnull(rank_col#18763) AND (cast(rank_col#18763 as decimal(13,7)) > (0.9 * Subquery subquery#18765, [id=#147283])))
2025-06-06T00:48:02.3115373Z       :     :                             :  +- Subquery subquery#18765, [id=#147283]
2025-06-06T00:48:02.3115975Z       :     :                             :     +- AdaptiveSparkPlan isFinalPlan=true
2025-06-06T00:48:02.3116693Z                                                    +- == Final Plan ==
2025-06-06T00:48:02.3117643Z                                                       *(1) HashAggregate(keys=[ss_store_sk#18830], functions=[avg(UnscaledValue(ss_net_profit#18845))], output=[rank_col#18759])
2025-06-06T00:48:02.3118606Z                                                       +- *(1) CometColumnarToRow
2025-06-06T00:48:02.3119179Z                                                          +- ShuffleQueryStage 0
2025-06-06T00:48:02.3120162Z                                                             +- CometExchange hashpartitioning(ss_store_sk#18830, 1), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=147498]
2025-06-06T00:48:02.3121750Z                                                                +- !CometHashAggregate [ss_store_sk#18830, ss_net_profit#18845], Partial, [ss_store_sk#18830], [partial_avg(UnscaledValue(ss_net_profit#18845))]
2025-06-06T00:48:02.3123117Z                                                                   +- CometProject [ss_store_sk#18830, ss_net_profit#18845], [ss_store_sk#18830, ss_net_profit#18845]
2025-06-06T00:48:02.3124510Z                                                                      +- CometFilter [ss_addr_sk#18829, ss_store_sk#18830, ss_net_profit#18845], ((isnotnull(ss_store_sk#18830) AND (ss_store_sk#18830 = 4)) AND isnull(ss_addr_sk#18829))
2025-06-06T00:48:02.3128544Z                                                                         +- CometScan parquet spark_catalog.default.store_sales[ss_addr_sk#18829,ss_store_sk#18830,ss_net_profit#18845] Batched: true, DataFilters: [isnotnull(ss_store_sk#18830), (ss_store_sk#18830 = 4), isnull(ss_addr_sk#18829)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/__w/datafusion-comet/datafusion-comet/tpcds-sf-1/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk), EqualTo(ss_store_sk,4), IsNull(ss_addr_sk)], ReadSchema: struct<ss_addr_sk:int,ss_store_sk:int,ss_net_profit:decimal(7,2)>
2025-06-06T00:48:02.3131704Z                                                    +- == Initial Plan ==
2025-06-06T00:48:02.3132670Z                                                       HashAggregate(keys=[ss_store_sk#18830], functions=[avg(UnscaledValue(ss_net_profit#18845))], output=[rank_col#18759])
2025-06-06T00:48:02.3134048Z                                                       +- CometExchange hashpartitioning(ss_store_sk#18830, 1), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=146421]
2025-06-06T00:48:02.3135539Z                                                          +- !CometHashAggregate [ss_store_sk#18830, ss_net_profit#18845], Partial, [ss_store_sk#18830], [partial_avg(UnscaledValue(ss_net_profit#18845))]
2025-06-06T00:48:02.3139421Z                                                             +- CometProject [ss_store_sk#18830, ss_net_profit#18845], [ss_store_sk#18830, ss_net_profit#18845]
2025-06-06T00:48:02.3140970Z                                                                +- CometFilter [ss_addr_sk#18829, ss_store_sk#18830, ss_net_profit#18845], ((isnotnull(ss_store_sk#18830) AND (ss_store_sk#18830 = 4)) AND isnull(ss_addr_sk#18829))
2025-06-06T00:48:02.3144752Z                                                                   +- CometScan parquet spark_catalog.default.store_sales[ss_addr_sk#18829,ss_store_sk#18830,ss_net_profit#18845] Batched: true, DataFilters: [isnotnull(ss_store_sk#18830), (ss_store_sk#18830 = 4), isnull(ss_addr_sk#18829)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/__w/datafusion-comet/datafusion-comet/tpcds-sf-1/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk), EqualTo(ss_store_sk,4), IsNull(ss_addr_sk)], ReadSchema: struct<ss_addr_sk:int,ss_store_sk:int,ss_net_profit:decimal(7,2)>
2025-06-06T00:48:02.3148516Z       :     :                             +- *(1) HashAggregate(keys=[ss_item_sk#18775], functions=[avg(UnscaledValue(ss_net_profit#18795))], output=[item_sk#18762, rank_col#18763])
2025-06-06T00:48:02.3149536Z       :     :                                +- *(1) ColumnarToRow
2025-06-06T00:48:02.3150077Z       :     :                                   +- ShuffleQueryStage 1
2025-06-06T00:48:02.3151311Z       :     :                                      +- ReusedExchange [ss_item_sk#18775, sum#18915, count#18916L], CometExchange hashpartitioning(ss_item_sk#701, 1), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=146851]
2025-06-06T00:48:02.3152516Z       :     +- CometColumnarToRow
2025-06-06T00:48:02.3152984Z       :        +- BroadcastQueryStage 2
2025-06-06T00:48:02.3153616Z       :           +- CometBroadcastExchange [i_item_sk#722, i_product_name#18894]
2025-06-06T00:48:02.3155540Z       :              +- CometProject [i_item_sk#722, i_product_name#18894], [i_item_sk#722, staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, StringType, readSidePadding, i_product_name#743, 50, true, false, true) AS i_product_name#18894]
2025-06-06T00:48:02.3157539Z       :                 +- CometFilter [i_item_sk#722, i_product_name#743], isnotnull(i_item_sk#722)
2025-06-06T00:48:02.3160081Z       :                    +- CometScan parquet spark_catalog.default.item[i_item_sk#722,i_product_name#743] Batched: true, DataFilters: [isnotnull(i_item_sk#722)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/__w/datafusion-comet/datafusion-comet/tpcds-sf-1/item], PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk)], ReadSchema: struct<i_item_sk:int,i_product_name:string>
2025-06-06T00:48:02.3162411Z       +- ColumnarToRow
2025-06-06T00:48:02.3162821Z          +- BroadcastQueryStage 3
2025-06-06T00:48:02.3163722Z             +- ReusedExchange [i_item_sk#18796, i_product_name#18905], CometBroadcastExchange [i_item_sk#722, i_product_name#18894]

Note the C2R around a reused exchange:

2025-06-06T00:48:02.3162411Z       +- ColumnarToRow
2025-06-06T00:48:02.3162821Z          +- BroadcastQueryStage 3
2025-06-06T00:48:02.3163722Z             +- ReusedExchange [i_item_sk#18796, i_product_name#18905], 

@andygrove
Copy link
Member Author

Test now pass.

@andygrove andygrove marked this pull request as ready for review June 6, 2025 22:56
@andygrove
Copy link
Member Author

replaced with #1865

@andygrove andygrove closed this Jun 8, 2025
@andygrove andygrove deleted the remove-shuffle-fallback-hack branch July 10, 2025 22:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Spark SQL test "test with low buffer spill threshold" fails when shuffle mode is jvm

4 participants