Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ClickBench Q23 Go Faster #15177

Open
alamb opened this issue Mar 12, 2025 · 18 comments
Open

Make ClickBench Q23 Go Faster #15177

alamb opened this issue Mar 12, 2025 · 18 comments
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Mar 12, 2025

Is your feature request related to a problem or challenge?

Comparing ClickBench on DataFusion 45 and DuckDB (link)

You can see that for 23 DataFusion is almost 2x slower (around 10s where DuckDB is 5s)
Image

You can run this query like this:

cd datafusion
cd benchmarks
# download data
./bench.sh data clickbench_partitioned
# run query with datafusion-cli (note escapes
datafusion-cli -c "SELECT * FROM 'data/hits_partitioned' WHERE \"URL\" LIKE '%google%' ORDER BY \"EventTime\" LIMIT 10;"

Here is the explain plan

andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion/benchmarks$ datafusion-cli -c "EXPLAIN SELECT * FROM 'data/hits_partitioned' WHERE \"URL\" LIKE '%google%' ORDER BY \"EventTime\" LIMIT 10;"
DataFusion CLI v46.0.0
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan|

| logical_plan  | Sort: data/hits_partitioned.EventTime ASC NULLS LAST, fetch|
|               |   Filter: CAST(data/hits_partitioned.URL AS Utf8View) LIKE Utf8View("%google|
|               |     TableScan: data/hits_partitioned projection=[WatchID, JavaEnable, Title, GoodEvent, EventTime, EventDate, CounterID, ClientIP, RegionID, UserID, CounterClass, OS, UserAgent, URL, Referer, IsRefresh, RefererCategoryID, RefererRegionID, URLCategoryID, URLRegionID, ResolutionWidth, ResolutionHeight, ResolutionDepth, FlashMajor, FlashMinor, FlashMinor2, NetMajor, NetMinor, UserAgentMajor, UserAgentMinor, CookieEnable, JavascriptEnable, IsMobile, MobilePhone, MobilePhoneModel, Params, IPNetworkID, TraficSourceID, SearchEngineID, SearchPhrase, AdvEngineID, IsArtifical, WindowClientWidth, WindowClientHeight, ClientTimeZone, ClientEventTime, SilverlightVersion1, SilverlightVersion2, SilverlightVersion3, SilverlightVersion4, PageCharset, CodeVersion, IsLink, IsDownload, IsNotBounce, FUniqID, OriginalURL, HID, IsOldCounter, IsEvent, IsParameter, DontCountHits, WithHash, HitColor, LocalEventTime, Age, Sex, Income, Interests, Robotness, RemoteIP, WindowName, OpenerName, HistoryLength, BrowserLanguage, BrowserCountry, SocialNetwork, SocialAction, HTTPError, SendTiming, DNSTiming, ConnectTiming, ResponseStartTiming, ResponseEndTiming, FetchTiming, SocialSourceNetworkID, SocialSourcePage, ParamPrice, ParamOrderID, ParamCurrency, ParamCurrencyID, OpenstatServiceName, OpenstatCampaignID, OpenstatAdID, OpenstatSourceID, UTMSource, UTMMedium, UTMCampaign, UTMContent, UTMTerm, FromTag, HasGCLID, RefererHash, URLHash, CLID], partial_filters=[CAST(data/hits_partitioned.URL AS Utf8View) LIKE Utf8View("%google|
| physical_plan | SortPreservingMergeExec: [EventTime@4 ASC NULLS LAST], fetch|
|               |   SortExec: TopK(fetch=10), expr=[EventTime@4 ASC NULLS LAST], preserve_partitioning=[true|
|               |     CoalesceBatchesExec: target_batch_size|
|               |       FilterExec: CAST(URL@13 AS Utf8View) LIKE %google|
|               |         DataSourceExec: file_groups={16 groups: [[Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_0.parquet:0..122446530, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_1.parquet:0..174965044, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_10.parquet:0..101513258, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_11.parquet:0..118419888, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_12.parquet:0..149514164, ...], [Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_14.parquet:108113265..151121699, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_15.parquet:0..103098894, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_16.parquet:0..101067219, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_17.parquet:0..116867853, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_18.parquet:0..133119589, ...], [Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_21.parquet:3887560..113455196, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_22.parquet:0..79775901, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_23.parquet:0..79631107, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_24.parquet:0..78257049, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_25.parquet:0..144169728, ...], [Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_28.parquet:106905624..162772407, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_29.parquet:0..79213288, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_3.parquet:0..192507052, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_30.parquet:0..124187913, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_31.parquet:0..123065410, ...], [Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_35.parquet:54087340..153632381, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_36.parquet:0..92487304, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_37.parquet:0..108247781, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_38.parquet:0..132005180, Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned/hits_39.parquet:0..103522954, ...], ...]}, projection=[WatchID, JavaEnable, Title, GoodEvent, EventTime, EventDate, CounterID, ClientIP, RegionID, UserID, CounterClass, OS, UserAgent, URL, Referer, IsRefresh, RefererCategoryID, RefererRegionID, URLCategoryID, URLRegionID, ResolutionWidth, ResolutionHeight, ResolutionDepth, FlashMajor, FlashMinor, FlashMinor2, NetMajor, NetMinor, UserAgentMajor, UserAgentMinor, CookieEnable, JavascriptEnable, IsMobile, MobilePhone, MobilePhoneModel, Params, IPNetworkID, TraficSourceID, SearchEngineID, SearchPhrase, AdvEngineID, IsArtifical, WindowClientWidth, WindowClientHeight, ClientTimeZone, ClientEventTime, SilverlightVersion1, SilverlightVersion2, SilverlightVersion3, SilverlightVersion4, PageCharset, CodeVersion, IsLink, IsDownload, IsNotBounce, FUniqID, OriginalURL, HID, IsOldCounter, IsEvent, IsParameter, DontCountHits, WithHash, HitColor, LocalEventTime, Age, Sex, Income, Interests, Robotness, RemoteIP, WindowName, OpenerName, HistoryLength, BrowserLanguage, BrowserCountry, SocialNetwork, SocialAction, HTTPError, SendTiming, DNSTiming, ConnectTiming, ResponseStartTiming, ResponseEndTiming, FetchTiming, SocialSourceNetworkID, SocialSourcePage, ParamPrice, ParamOrderID, ParamCurrency, ParamCurrencyID, OpenstatServiceName, OpenstatCampaignID, OpenstatAdID, OpenstatSourceID, UTMSource, UTMMedium, UTMCampaign, UTMContent, UTMTerm, FromTag, HasGCLID, RefererHash, URLHash, CLID], file_type=parquet, predicate=CAST(URL@13 AS Utf8View) LIKE %google% |
|               ||

2 row(s) fetched.
Elapsed 0.056 seconds.

Something that immediately jumps out at me in the explain plan is this line

|               |         DataSourceExec: file_groups={16 groups: ...}, projection=[WatchID, JavaEnable, Title, GoodEvent, EventTime, EventDate, CounterID, ClientIP, RegionID, UserID, CounterClass, OS, UserAgent, URL, Referer, IsRefresh, RefererCategoryID, RefererRegionID, URLCategoryID, URLRegionID, ResolutionWidth, ResolutionHeight, ResolutionDepth, FlashMajor, FlashMinor, FlashMinor2, NetMajor, NetMinor, UserAgentMajor, UserAgentMinor, CookieEnable, JavascriptEnable, IsMobile, MobilePhone, MobilePhoneModel, Params, IPNetworkID, TraficSourceID, SearchEngineID, SearchPhrase, AdvEngineID, IsArtifical, WindowClientWidth, WindowClientHeight, ClientTimeZone, ClientEventTime, SilverlightVersion1, SilverlightVersion2, SilverlightVersion3, SilverlightVersion4, PageCharset, CodeVersion, IsLink, IsDownload, IsNotBounce, FUniqID, OriginalURL, HID, IsOldCounter, IsEvent, IsParameter, DontCountHits, WithHash, HitColor, LocalEventTime, Age, Sex, Income, Interests, Robotness, RemoteIP, WindowName, OpenerName, HistoryLength, BrowserLanguage, BrowserCountry, SocialNetwork, SocialAction, HTTPError, SendTiming, DNSTiming, ConnectTiming, ResponseStartTiming, ResponseEndTiming, FetchTiming, SocialSourceNetworkID, SocialSourcePage, ParamPrice, ParamOrderID, ParamCurrency, ParamCurrencyID, OpenstatServiceName, OpenstatCampaignID, OpenstatAdID, OpenstatSourceID, UTMSource, UTMMedium, UTMCampaign, UTMContent, UTMTerm, FromTag, HasGCLID, RefererHash, URLHash, CLID], file_type=parquet, predicate=CAST(URL@13 AS Utf8View) LIKE %google% |

"Projection" I think means that all of those columns are being read/ decoded from parquet, which makes sense as the query has a SELECT * on it.

However, in this case all but the top 10 rows are returned (out of 100M rows in the file)

So this means that most of the decoded data is decoded and thrown away immediately

Describe the solution you'd like

I would like to close the gap with DuckDB with some general purpose improvement

Describe alternatives you've considered

I think the way to improve performance here is to defer decoding ("Materializing") the other columns until we know what the top 10 rows are.

some wacky ideas:

  1. Push the topk / ordering into the scan somehow
  2. implement "late materialization"

Late materialization would look something like

  1. decode only the EventTime column and a row_id
  2. determine the top 10 row_id by sorting by EventTime
  3. Decode only those 10 rows from the parquet file(s)

Additional context

No response

@alamb
Copy link
Contributor Author

alamb commented Mar 12, 2025

OOOO -- here is the duckdb plan and it shows what they are doing!

The key is this line:

│          Filters:         │
│ optional: Dynamic Filter  │
│        (EventTime)        │

What I think this is referring to is what @adriangb is describing in :

Specifically, the Top_N operator passes down a filter into the scan. The filter is "dynamic" in the sense that

  1. the TOP_N operator knows what the smallest maximum value currently is
  2. That means the scan can filter rows where the current timestamp is less than that number
┌─────────────────────────────┐
│┌───────────────────────────┐│
││       Physical Plan       ││
│└───────────────────────────┘│
└─────────────────────────────┘
┌───────────────────────────┐
│           TOP_N           │
│    ────────────────────   │
│          Top: 10          │
│                           │
│         Order By:         │
│ memory.main.hits.EventTime│
│             ASC           │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│           FILTER          │
│    ────────────────────   │
│  contains(URL, 'google')  │
│                           │
│       ~20000000 Rows      │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│       PARQUET_SCAN        │
│    ────────────────────   │
│         Function:         │
│        PARQUET_SCAN       │
│                           │
│        Projections:       │
│          WatchID          │
│         JavaEnable        │
│           Title           │
│         GoodEvent         │
│         EventTime         │
│         EventDate         │
│         CounterID         │
│          ClientIP         │
│          RegionID         │
│           UserID          │
│        CounterClass       │
│             OS            │
│         UserAgent         │
│            URL            │
│            ...            │
│      ParamCurrencyID      │
│    OpenstatServiceName    │
│     OpenstatCampaignID    │
│        OpenstatAdID       │
│      OpenstatSourceID     │
│         UTMSource         │
│         UTMMedium         │
│        UTMCampaign        │
│         UTMContent        │
│          UTMTerm          │
│          FromTag          │
│          HasGCLID         │
│        RefererHash        │
│          URLHash          │
│            CLID           │
│                           │
│          Filters:         │
│ optional: Dynamic Filter  │
│        (EventTime)        │
│                           │
│      ~100000000 Rows      │
└───────────────────────────┘

@alamb
Copy link
Contributor Author

alamb commented Mar 12, 2025

The topk dynamic filtering is described here:
https://www.snowflake.com/en/blog/super-fast-top-k-queries/

@alamb
Copy link
Contributor Author

alamb commented Mar 12, 2025

BTW apparently DuckDB uses the "late materialization" technique with its own native format. Here is an explain courtesy of Joe Issacs and Robert Kruszewski

┌─────────────────────────────┐
│┌───────────────────────────┐│
││       Physical Plan       ││
│└───────────────────────────┘│
└─────────────────────────────┘
┌───────────────────────────┐
│         PROJECTION        │
│    ────────────────────   │
│             #0            │
│__internal_decompress_integ│
│    ral_smallint(#1, 0)    │
│             #2            │
│__internal_decompress_integ│
│    ral_smallint(#3, 1)    │
│             #4            │
│             #5            │
│             #6            │
│             #7            │
│             #8            │
│             #9            │
│__internal_decompress_integ│
│    ral_smallint(#10, 0)   │
│__internal_decompress_integ│
│    ral_smallint(#11, 0)   │
│__internal_decompress_integ│
│    ral_smallint(#12, 0)   │
│            #13            │
│            #14            │
│            ...            │
│            #90            │
│            #91            │
│            #92            │
│            #93            │
│            #94            │
│            #95            │
│            #96            │
│            #97            │
│            #98            │
│            #99            │
│            #100           │
│__internal_decompress_integ│
│   ral_smallint(#101, 0)   │
│            #102           │
│            #103           │
│            #104           │
│                           │
│       ~99997497 Rows      │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│          ORDER_BY         │
│    ────────────────────   │
│    memory.main.hits_tb    │
│       .EventTime ASC      │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│         PROJECTION        │
│    ────────────────────   │
│             #0            │
│__internal_compress_integra│
│     l_utinyint(#1, 0)     │
│             #2            │
│__internal_compress_integra│
│     l_utinyint(#3, 1)     │
│             #4            │
│             #5            │
│             #6            │
│             #7            │
│             #8            │
│             #9            │
│__internal_compress_integra│
│     l_utinyint(#10, 0)    │
│__internal_compress_integra│
│     l_utinyint(#11, 0)    │
│__internal_compress_integra│
│     l_utinyint(#12, 0)    │
│            #13            │
│            #14            │
│            ...            │
│            #90            │
│            #91            │
│            #92            │
│            #93            │
│            #94            │
│            #95            │
│            #96            │
│            #97            │
│            #98            │
│            #99            │
│            #100           │
│__internal_compress_integra│
│    l_utinyint(#101, 0)    │
│            #102           │
│            #103           │
│            #104           │
│                           │
│       ~99997497 Rows      │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│         PROJECTION        │
│    ────────────────────   │
│          WatchID          │
│         JavaEnable        │
│           Title           │
│         GoodEvent         │
│         EventTime         │
│         EventDate         │
│         CounterID         │
│          ClientIP         │
│          RegionID         │
│           UserID          │
│        CounterClass       │
│             OS            │
│         UserAgent         │
│            URL            │
│          Referer          │
│            ...            │
│      ParamCurrencyID      │
│    OpenstatServiceName    │
│     OpenstatCampaignID    │
│        OpenstatAdID       │
│      OpenstatSourceID     │
│         UTMSource         │
│         UTMMedium         │
│        UTMCampaign        │
│         UTMContent        │
│          UTMTerm          │
│          FromTag          │
│          HasGCLID         │
│        RefererHash        │
│          URLHash          │
│            CLID           │
│                           │
│       ~99997497 Rows      │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│         HASH_JOIN         │
│    ────────────────────   │
│      Join Type: SEMI      │
│                           │
│        Conditions:        ├──────────────┐
│       rowid = rowid       │              │
│                           │              │
│       ~99997497 Rows      │              │
└─────────────┬─────────────┘              │
┌─────────────┴─────────────┐┌─────────────┴─────────────┐
│         SEQ_SCAN          ││           TOP_N           │
│    ────────────────────   ││    ────────────────────   │
│       Table: hits_tb      ││          Top: 10          │
│   Type: Sequential Scan   ││                           │
│                           ││         Order By:         │
│        Projections:       ││    memory.main.hits_tb    │
│            URL            ││       .EventTime ASC      │
│          WatchID          ││                           │
│         JavaEnable        ││                           │
│           Title           ││                           │
│         GoodEvent         ││                           │
│         EventTime         ││                           │
│         EventDate         ││                           │
│         CounterID         ││                           │
│          ClientIP         ││                           │
│          RegionID         ││                           │
│           UserID          ││                           │
│        CounterClass       ││                           │
│             OS            ││                           │
│         UserAgent         ││                           │
│            ...            ││                           │
│      ParamCurrencyID      ││                           │
│    OpenstatServiceName    ││                           │
│     OpenstatCampaignID    ││                           │
│        OpenstatAdID       ││                           │
│      OpenstatSourceID     ││                           │
│         UTMSource         ││                           │
│         UTMMedium         ││                           │
│        UTMCampaign        ││                           │
│         UTMContent        ││                           │
│          UTMTerm          ││                           │
│          FromTag          ││                           │
│          HasGCLID         ││                           │
│        RefererHash        ││                           │
│          URLHash          ││                           │
│            CLID           ││                           │
│                           ││                           │
│       ~99997497 Rows      ││                           │
└───────────────────────────┘└─────────────┬─────────────┘
                             ┌─────────────┴─────────────┐
                             │         PROJECTION        │
                             │    ────────────────────   │
                             │             #1            │
                             │             #2            │
                             │                           │
                             │       ~19999499 Rows      │
                             └─────────────┬─────────────┘
                             ┌─────────────┴─────────────┐
                             │           FILTER          │
                             │    ────────────────────   │
                             │  contains(URL, 'google')  │
                             │                           │
                             │       ~19999499 Rows      │
                             └─────────────┬─────────────┘
                             ┌─────────────┴─────────────┐
                             │         SEQ_SCAN          │
                             │    ────────────────────   │
                             │       Table: hits_tb      │
                             │   Type: Sequential Scan   │
                             │                           │
                             │        Projections:       │
                             │            URL            │
                             │         EventTime         │
                             │                           │
                             │          Filters:         │
                             │ optional: Dynamic Filter  │
                             │        (EventTime)        │
                             │                           │
                             │       ~99997497 Rows      │
                             └───────────────────────────┘

@suibianwanwank
Copy link
Contributor

This looks cool! Very interested in this.

@robert3005
Copy link

There's two optimizations here that go together, if you check clickbench results duckdb on their own format is significantly faster than parquet. The two optimizer rules that do this is 1) TopN https://github.com/duckdb/duckdb/blob/main/src/optimizer/topn_optimizer.cpp#L105 2) Late materialization https://github.com/duckdb/duckdb/blob/main/src/optimizer/late_materialization.cpp#L180 (join back the filter result to obtain rest of the columns)

@alamb
Copy link
Contributor Author

alamb commented Mar 12, 2025

Note that late materialization (the join / semi join rewrite) needs join operator support that DataFusion doesn't yet have (we could add it but it will take non trivial effort)

My suggested order of implementation is:

  1. Enable parquet filter pushdown by default #3463 with @XiangpengHao (so that we can actually evaluate the topk filter during scan)
  2. Then implement topk filtering Dynamic pruning filters from TopK state #15037

I actually think that will likely get us quite fast. I am not sure how much more improvement late materialized joins will get without a specialized file format.

I don't have time to help plan out late materializing joins at the moment, but I am quite interested in pushing along the predicate pushdown

@xudong963
Copy link
Member

There is a similar thought named prewhere: https://clickhouse.com/docs/sql-reference/statements/select/prewhere.

Even though it aims to filter, the idea is similar, for example:

Table t has 100 columns, one of them is a, for sql: select * from t where a = 1, it'll do the following steps:

  1. First, read only the data in column a
  2. Apply a = 1 filter to filter out matching rows.
  3. Read the remaining 99 columns only for those matching rows.

Back to topk, select * from t order by a limit 10

  1. First, read only the data in column a
  2. Perform a sort to find the row_id of the top 10 rows.
  3. Row identifiers as determined by 2 and selectively read only the other columns of these 10 rows.

We can spilt the idea to the query:

WITH ids AS (SELECT row_id, a FROM t ORDER BY a LIMIT 10)
SELECT t.* FROM t JOIN ids WHERE t.row_id IN (SELECT row_id FROM ids)

@alamb
Copy link
Contributor Author

alamb commented Mar 12, 2025

We can spilt the idea to the query:

I agree -- this is what I meant by "late materialization" . Your example / explanation is much better than mine @xudong963 🙏

@Dandandan
Copy link
Contributor

Dandandan commented Mar 20, 2025

Note that late materialization (the join / semi join rewrite) needs join operator support that DataFusion doesn't yet have (we could add it but it will take non trivial effort)

I did not fully get this part. DF has semi join support and some rewrites to utilize it in similar cases?
The query transformation in SQL as given by @xudong963 is optimized to a SEMI join + TopK, so I think it could be implemented as logical optimization rule (i.e. adding a filter with subquery on the ids).

> CREATE TABLE t (a int, b int, row_id int);
0 row(s) fetched. 
Elapsed 0.004 seconds.

> EXPLAIN (WITH ids AS (SELECT row_id, a FROM t ORDER BY a LIMIT 10)
SELECT t.* FROM t JOIN ids WHERE t.row_id IN (SELECT row_id FROM ids));
+---------------+--------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                       |
+---------------+--------------------------------------------------------------------------------------------+
| logical_plan  | LeftSemi Join: t.row_id = __correlated_sq_1.row_id                                         |
|               |   Cross Join:                                                                              |
|               |     TableScan: t projection=[a, b, row_id]                                                 |
|               |     SubqueryAlias: ids                                                                     |
|               |       Projection:                                                                          |
|               |         Sort: t.a ASC NULLS LAST, fetch=10                                                 |
|               |           TableScan: t projection=[a]                                                      |
|               |   SubqueryAlias: __correlated_sq_1                                                         |
|               |     SubqueryAlias: ids                                                                     |
|               |       Projection: t.row_id                                                                 |
|               |         Sort: t.a ASC NULLS LAST, fetch=10                                                 |
|               |           Projection: t.row_id, t.a                                                        |
|               |             TableScan: t projection=[a, row_id]                                            |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192                                                |
|               |   HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(row_id@2, row_id@0)]            |
|               |     CrossJoinExec                                                                          |
|               |       DataSourceExec: partitions=1, partition_sizes=[0]                                    |
|               |       ProjectionExec: expr=[]                                                              |
|               |         SortExec: TopK(fetch=10), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] |
|               |           DataSourceExec: partitions=1, partition_sizes=[0]                                |
|               |     ProjectionExec: expr=[row_id@0 as row_id]                                              |
|               |       SortExec: TopK(fetch=10), expr=[a@1 ASC NULLS LAST], preserve_partitioning=[false]   |
|               |         DataSourceExec: partitions=1, partition_sizes=[0]                                  |
|               |                                                                                            |
+---------------+--------------------------------------------------------------------------------------------+
2 row(s) fetched. 
Elapsed 0.005 seconds.

> 

@Dandandan
Copy link
Contributor

Dandandan commented Mar 20, 2025

Ah actually, the query given by @xudong963 is I think slightly off, I think it should be the following (without the explicit join).

This yields the same plan as DuckDB:

> EXPLAIN (WITH ids AS (SELECT row_id, a FROM t ORDER BY a LIMIT 10)
SELECT t.* FROM t WHERE t.row_id IN (SELECT row_id FROM ids));
+---------------+------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                     |
+---------------+------------------------------------------------------------------------------------------+
| logical_plan  | LeftSemi Join: t.row_id = __correlated_sq_1.row_id                                       |
|               |   TableScan: t projection=[a, b, row_id]                                                 |
|               |   SubqueryAlias: __correlated_sq_1                                                       |
|               |     SubqueryAlias: ids                                                                   |
|               |       Projection: t.row_id                                                               |
|               |         Sort: t.a ASC NULLS LAST, fetch=10                                               |
|               |           Projection: t.row_id, t.a                                                      |
|               |             TableScan: t projection=[a, row_id]                                          |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192                                              |
|               |   HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(row_id@2, row_id@0)]          |
|               |     DataSourceExec: partitions=1, partition_sizes=[0]                                    |
|               |     ProjectionExec: expr=[row_id@0 as row_id]                                            |
|               |       SortExec: TopK(fetch=10), expr=[a@1 ASC NULLS LAST], preserve_partitioning=[false] |
|               |         DataSourceExec: partitions=1, partition_sizes=[0]                                |
|               |                                                                                          |
+---------------+------------------------------------------------------------------------------------------+
2 row(s) fetched. 
Elapsed 0.004 seconds.

@adriangb
Copy link
Contributor

I think the difference is that DuckDB dynamically pushes down the current state of the TopK heap into file opening as described in #15037 and implemented in #15301

@alamb
Copy link
Contributor Author

alamb commented Mar 20, 2025

I did not fully get this part. DF has semi join support and some rewrites to utilize it in similar cases?

The query transformation in SQL as given by @xudong963 is optimized to a SEMI join + TopK, so I think it could be implemented as logical optimization rule (i.e. adding a filter with subquery on the ids).

@Dandandan --

I think it would be interesting to try and rewrite q23 manually to that pattern and see how it goes fast

I suspect (but have not measured), if we implemented this rewrite we would find it runs much more slowly than the existing code because what would happen is that the entire input file (all columns) would be decoded and all but 10 rows are thrown away

To avoid this we need to push the join filters into the scan (and get predicate pushdown on by default)

Edit: although now I say this maybe it would be much better as we have to decode all the columns now....

@alamb
Copy link
Contributor Author

alamb commented Mar 20, 2025

I tried the rewrite into a Semi join and indeed it is over 2x slower (5.3sec vs 12sec)

> SELECT * from 'hits_partitioned' WHERE "URL" LIKE '%google%' ORDER BY "EventTime" LIMIT 10;
Elapsed 5.320 seconds.

Here is what I think the rewrite is

> SELECT * from 'hits_partitioned' WHERE "WatchID" IN (
  SELECT "WatchID" FROM 'hits_partitioned' WHERE "URL" LIKE '%google%' ORDER BY "EventTime" LIMIT 10
);

Elapsed 12.023 seconds.

WatchID is a unique key

> select count(distinct "WatchID"), count(*) from 'hits_partitioned';
+------------------------------------------+----------+
| count(DISTINCT hits_partitioned.WatchID) | count(*) |
+------------------------------------------+----------+
| 99997493                                 | 99997497 |
+------------------------------------------+----------+

I also double checked the output

## orig
datafusion-cli -c "SELECT * FROM 'hits_partitioned' WHERE \"URL\" LIKE '%google%' ORDER BY \"EventTime\" LIMIT 10;" > orig.out

## rewrite
 datafusion-cli -c "SELECT * from 'hits_partitioned' WHERE \"WatchID\" IN (SELECT \"WatchID\" FROM 'hits_partitioned' WHERE \"URL\" LIKE '%google%' ORDER BY \"EventTime\" LIMIT 10);" > rewrite.out

## check
sort orig.out > orig.out.sort
sort rewrite.out > rewrite.out.sort
diff orig.out.sort rewrite.out.sort

7c7
< Elapsed 5.649 seconds.
---
> Elapsed 11.067 seconds.

@alamb
Copy link
Contributor Author

alamb commented Mar 20, 2025

I am not really sure where the time is going 🤔
output of explain analyze: explain.txt

@Dandandan
Copy link
Contributor

Dandandan commented Mar 20, 2025

Thanks for checking @alamb !

I think a large portion is spent in the hash join (repartitioning the right side input) - I think because it runs as Partitioned hash join, instead of realizing it could use CollectLeft (see repartition_time=55.648133838s) for the small side (10 rows).

@alamb
Copy link
Contributor Author

alamb commented Mar 20, 2025

Thanks for checking @alamb !

I think a large portion is spent in the hash join (repartitioning the right side input) - I think because it runs as Partitioned hash join, instead of realizing it could use CollectLeft (see repartition_time=55.648133838s) for the small side (10 rows).

I also think the Url, WatchID and EventDate columns will be decoded twice. To make that join go really fast we would need something like:

@alamb
Copy link
Contributor Author

alamb commented Mar 20, 2025

BTW combined with @adriangb's PR here

It will likely go crazy fast 🚀

@Dandandan
Copy link
Contributor

Dandandan commented Mar 20, 2025

I think a large portion is spent in the hash join (repartitioning the right side input) - I think because it runs as Partitioned hash join, instead of realizing it could use CollectLeft (see repartition_time=55.648133838s) for the small side (10 rows).

I traced this down to an issue in the planner, which uses PartitionMode::Auto iff stats are collected (datafusion.execution.collect_statistics)
We can however still use file and plan-derived statistics, so let's change that.
#15339

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

6 participants