Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic pruning filters from TopK state #15037

Open
adriangb opened this issue Mar 5, 2025 · 8 comments · May be fixed by #15301
Open

Dynamic pruning filters from TopK state #15037

adriangb opened this issue Mar 5, 2025 · 8 comments · May be fixed by #15301
Labels
enhancement New feature or request

Comments

@adriangb
Copy link
Contributor

adriangb commented Mar 5, 2025

Is your feature request related to a problem or challenge?

From discussion with @alamb yesterday the idea came up of optimizing queries like select * from data order by timestamp desc limit 10 for the case where the data is not perfectly sorted by timestamp but mostly follows a sorted pattern.

You can imagine this data gets created if multiple sources with clock skews, network delays, etc. are writing data and you don't do anything fancy to guarantee perfect sorting by timestamp (i.e. you naively write out the data to Parquet, maybe do some compaction, etc.). The point is that 99% of yesterday's files have a timestamp smaller than 99% of today's files but there may be a couple seconds of overlap between files. To be concrete, let's say this is our data:

file min max
1 1 10
2 9 19
3 20 31
4 30 35

Currently DataFusion will exhaustively open each file, read the timestamp column and feed it into a TopK.
I think we can do a lot better if we:

  • Use file stats to decide which files to work on first. In this case it makes sense to start with file 4 and 3 (assuming we have parallelism of 2).
  • Let's say that between those two we have 10 rows, so we've already filled up our TopK. The only way more things would get added to our TopK is if they are greater than the smallest item already seen (let's say that's 20, the smallest value in file 3).
  • Now we know just from statistics that we can skip files 2 and 1 because neither of them can have any timestamp > 20.

Extrapolating this to scenarios where you have years worth / TBs of data and want a limit 5 would yield orders of magnitude improvement I think.

@alamb mentioned this sounds similar to Dynamic Filters, I assume this must be a known technique (or my analysis may be completely wrong 😆 ) but I don't know what it would be called.

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

@adriangb adriangb added the enhancement New feature or request label Mar 5, 2025
@alamb
Copy link
Contributor

alamb commented Mar 5, 2025

@alamb mentioned this sounds similar to Dynamic Filters, I assume this must be a known technique (or my analysis may be completely wrong 😆 ) but I don't know what it would be called.

There was a talk at CIDR this year that mentioned this:

Sponsor Talk 3: The Fine Art of Work Skipping
Stefan Mandl, Snowflake

It seems they wrote a blog about it too here: https://www.snowflake.com/en/engineering-blog/optimizing-top-k-aggregation-snowflake/

@adriangb
Copy link
Contributor Author

adriangb commented Mar 5, 2025

Nice to know I'm not totally off on the idea 😄

@alamb
Copy link
Contributor

alamb commented Mar 5, 2025

Nice to know I'm not totally off on the idea 😄

Not at all!

@alamb
Copy link
Contributor

alamb commented Mar 12, 2025

BTW I am pretty sure DuckDB is using this technique and why they are so much faster on ClickBench Q23:

@adriangb
Copy link
Contributor Author

Does anyone have a handle on how we might implement this? I was thinking we’d need to add a method to exec operators called apply_filter but that basically sends down the additional filter and by default it gets forwarded to children until it hits an exec that knows what to do with it (eg DataSourceExec). But I’m not very clear beyond that.

@alamb
Copy link
Contributor

alamb commented Mar 18, 2025

Does anyone have a handle on how we might implement this? I was thinking we’d need to add a method to exec operators called apply_filter but that basically sends down the additional filter and by default it gets forwarded to children until it hits an exec that knows what to do with it (eg DataSourceExec). But I’m not very clear beyond that.

To begin with I would suggest:

  1. Make a new PhysicalExpr named something like TopKRuntimeFilter
  2. Add a physical optimizer pass that runs after all other passes (so the structure doesn't change) that finds TopK nodes and tries to find connected Scans the (start with some basic rules, don't try and go past joins, etc)
  3. Add TopKRuntimeFilter to those scans

Then the trick will be to figure out how to share the TopKHeap created in the TopK operator

With the TopKRuntimeFilter

And then orchestrate concurrent access to it somehow

adriangb added a commit to pydantic/datafusion that referenced this issue Mar 19, 2025
@adriangb adriangb linked a pull request Mar 19, 2025 that will close this issue
@adriangb
Copy link
Contributor Author

@alamb I implemented something like that in #15301

@alamb
Copy link
Contributor

alamb commented Mar 20, 2025

Thanks @adriangb -- I will try and review it asap (hopefully tomorrow afternoon or tomorrow)

adriangb added a commit to pydantic/datafusion that referenced this issue Mar 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants