You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In the system I'm working on I want to perform multiple aggregates using different group by criteria over large data sets.
I don't think grouping sets are an option since those support computing a single set of aggregates over multiple groupings. What I'm trying to achieve instead is one multiple sets of aggregates that each have their own group by strategy.
A simple way to do this is to just run multiple queries of course. That works but requires scanning through the data multiple times. That becomes prohibitive pretty quickly as the number of sets of aggregates increases.
While I was experimenting with the multiple query approach and combining those into a single query using 'union all' I started wondering if I couldn't write an operator to have my cake and eat it. So rather than this:
select 1 as setid, k1 as groupkey1, count(1) as agg1, null as groupkey2, null as agg2 from table group by groupkey1
union all
select 2 as setid, null as groupkey1, null as agg1, k2 as groupkey2, sum(col1) as agg2 from table group by groupkey2
which results in a logical plan that sort of looks like this (edited for brevity/clarity)
Union
Projection: 1 AS setid, k1 AS groupkey1, count(1) AS agg1, NULL AS groupkey2, NULL AS agg2
Aggregate: groupBy=[k1], aggr=[[count(1)]] |
TableScan: table
Projection: 2 AS setid, NULL AS groupkey1, NULL AS agg1, k2 AS groupkey2, count(1) AS agg2
Aggregate: groupBy=[k2], aggr=[[sum(col1)]]
TableScan: table
what I would want to do instead is something like this
Unify
Projection: 1 AS setid, k1 AS groupkey1, count(1) AS agg1, NULL AS groupkey2, NULL AS agg2
Aggregate: groupBy=[k1], aggr=[[count(1)]] |
CommonInputPlaceholder
Projection: 2 AS setid, NULL AS groupkey1, NULL AS agg1, k2 AS groupkey2, count(1) AS agg2
Aggregate: groupBy=[k2], aggr=[[sum(col1)]]
CommonInputPlaceholder
CommonInput
TableScan: table
CommonInputPlaceholder is a stub node that has the same schema as the CommonInput child.
The Unify operator works by setting up queues for each CommonInputPlaceholder. It polls the CommonInput child, and places a duplicate of each record batch it receives onto each queue. This is kind of similar to how RepartitionExec does its thing but instead of assigning each record batch once, we duplicate and assign it multiple times.
With quite some trial and error I've been able to get something up and running, but I have a feeling I'm going against the grain of the framework. Getting the optimizer to do the right thing for instance proved to be a challenge since it expects plans to be trees rather than DAGs.
My question for the group is if someone else has tried to implement something like this before? Or if what I'm trying to accomplish can be done in some other way? Perhaps someone has advice on how to best go about implementing this?
I realize this colors outside the lines of what you can express in SQL (as far as I know at least). I'm creating my queries by directly instantiating logical plans so for now that's not an issue for the system I'm working on.
Edit: I accidentally ended up writing an example that can be done with grouping sets since the sets of aggregates were identical. Update example to use different aggregates.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
In the system I'm working on I want to perform multiple aggregates using different group by criteria over large data sets.
I don't think grouping sets are an option since those support computing a single set of aggregates over multiple groupings. What I'm trying to achieve instead is one multiple sets of aggregates that each have their own group by strategy.
A simple way to do this is to just run multiple queries of course. That works but requires scanning through the data multiple times. That becomes prohibitive pretty quickly as the number of sets of aggregates increases.
While I was experimenting with the multiple query approach and combining those into a single query using 'union all' I started wondering if I couldn't write an operator to have my cake and eat it. So rather than this:
which results in a logical plan that sort of looks like this (edited for brevity/clarity)
what I would want to do instead is something like this
CommonInputPlaceholder
is a stub node that has the same schema as theCommonInput
child.The Unify operator works by setting up queues for each
CommonInputPlaceholder
. It polls theCommonInput
child, and places a duplicate of each record batch it receives onto each queue. This is kind of similar to how RepartitionExec does its thing but instead of assigning each record batch once, we duplicate and assign it multiple times.With quite some trial and error I've been able to get something up and running, but I have a feeling I'm going against the grain of the framework. Getting the optimizer to do the right thing for instance proved to be a challenge since it expects plans to be trees rather than DAGs.
My question for the group is if someone else has tried to implement something like this before? Or if what I'm trying to accomplish can be done in some other way? Perhaps someone has advice on how to best go about implementing this?
I realize this colors outside the lines of what you can express in SQL (as far as I know at least). I'm creating my queries by directly instantiating logical plans so for now that's not an issue for the system I'm working on.
Edit: I accidentally ended up writing an example that can be done with grouping sets since the sets of aggregates were identical. Update example to use different aggregates.
Beta Was this translation helpful? Give feedback.
All reactions