Skip to content

Commit 19c4836

Browse files
committed
Implementation of over, partition by and window for PostgreSQL
1 parent 32e1a18 commit 19c4836

File tree

11 files changed

+374
-9
lines changed

11 files changed

+374
-9
lines changed

lib/ecto/adapters/postgres/connection.ex

+38-2
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ if Code.ensure_loaded?(Postgrex) do
109109
end
110110

111111
alias Ecto.Query
112-
alias Ecto.Query.{BooleanExpr, JoinExpr, QueryExpr}
112+
alias Ecto.Query.{BooleanExpr, JoinExpr, QueryExpr, PartitionByExpr}
113113

114114
def all(query) do
115115
sources = create_names(query)
@@ -121,12 +121,13 @@ if Code.ensure_loaded?(Postgrex) do
121121
where = where(query, sources)
122122
group_by = group_by(query, sources)
123123
having = having(query, sources)
124+
window = window(query, sources)
124125
order_by = order_by(query, order_by_distinct, sources)
125126
limit = limit(query, sources)
126127
offset = offset(query, sources)
127128
lock = lock(query.lock)
128129

129-
[select, from, join, where, group_by, having, order_by, limit, offset | lock]
130+
[select, from, join, where, group_by, having, window, order_by, limit, offset | lock]
130131
end
131132

132133
def update_all(%{from: %{source: source}} = query, prefix \\ nil) do
@@ -380,6 +381,22 @@ if Code.ensure_loaded?(Postgrex) do
380381
end)]
381382
end
382383

384+
defp window(%Query{windows: []}, _sources), do: []
385+
defp window(%Query{windows: windows} = query, sources) do
386+
[" WINDOW " |
387+
intersperse_map(windows, ", ", fn
388+
{name, definition} ->
389+
[Atom.to_string(name), " AS ", partition_by(definition, sources, query)] end)]
390+
end
391+
392+
defp partition_by(%PartitionByExpr{fields: fields, order_bys: order_bys}, sources, %Query{} = query) do
393+
partition_query = %Query{ query | order_bys: order_bys }
394+
["(PARTITION BY ",
395+
intersperse_map(fields, ", ", &expr(&1, sources, query)),
396+
order_by(partition_query, [], sources),
397+
?)]
398+
end
399+
383400
defp order_by(%Query{order_bys: []}, _distinct, _sources), do: []
384401
defp order_by(%Query{order_bys: order_bys} = query, distinct, sources) do
385402
order_bys = Enum.flat_map(order_bys, & &1.expr)
@@ -509,6 +526,25 @@ if Code.ensure_loaded?(Postgrex) do
509526
[aggregate, " FILTER (WHERE ", expr(filter, sources, query), ?)]
510527
end
511528

529+
defp expr({:over, _, [agg, %PartitionByExpr{} = window]}, sources, query) do
530+
aggregate = expr(agg, sources, query)
531+
[aggregate, " OVER ", partition_by(window, sources, query)]
532+
end
533+
534+
defp expr({:over, _, [agg, nil]}, sources, query) do
535+
aggregate = expr(agg, sources, query)
536+
[aggregate, " OVER ()"]
537+
end
538+
539+
defp expr({:over, _, [agg, name]}, sources, %Ecto.Query{ windows: windows } = query) when is_atom(name) do
540+
if Keyword.has_key?(windows, name) do
541+
aggregate = expr(agg, sources, query)
542+
[aggregate, " OVER ", Atom.to_string(name)]
543+
else
544+
error!(query, "window #{name} is undefined")
545+
end
546+
end
547+
512548
defp expr({:{}, _, elems}, sources, query) do
513549
[?(, intersperse_map(elems, ?,, &expr(&1, sources, query)), ?)]
514550
end

lib/ecto/query.ex

+21-4
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ defmodule Ecto.Query do
354354

355355
defstruct [prefix: nil, sources: nil, from: nil, joins: [], aliases: %{}, wheres: [], select: nil,
356356
order_bys: [], limit: nil, offset: nil, group_bys: [], updates: [],
357-
havings: [], preloads: [], assocs: [], distinct: nil, lock: nil]
357+
havings: [], preloads: [], assocs: [], distinct: nil, lock: nil, windows: []]
358358

359359
defmodule FromExpr do
360360
@moduledoc false
@@ -366,6 +366,11 @@ defmodule Ecto.Query do
366366
defstruct [:fun, :binding, :file, :line]
367367
end
368368

369+
defmodule PartitionByExpr do
370+
@moduledoc false
371+
defstruct [:fields, :file, :line, order_bys: []]
372+
end
373+
369374
defmodule QueryExpr do
370375
@moduledoc false
371376
defstruct [:expr, :file, :line, params: []]
@@ -398,8 +403,8 @@ defmodule Ecto.Query do
398403
@opaque dynamic :: %DynamicExpr{}
399404

400405
alias Ecto.Query.Builder
401-
alias Ecto.Query.Builder.{Distinct, Dynamic, Filter, From, GroupBy, Join,
402-
LimitOffset, Lock, OrderBy, Preload, Select, Update}
406+
alias Ecto.Query.Builder.{Distinct, Dynamic, Filter, From, GroupBy, Join, PartitionBy,
407+
Window, LimitOffset, Lock, OrderBy, Preload, Select, Update}
403408

404409
@doc """
405410
Builds a dynamic query expression.
@@ -464,6 +469,18 @@ defmodule Ecto.Query do
464469
Dynamic.build(binding, expr, __CALLER__)
465470
end
466471

472+
defmacro partition_by(binding \\ [], expr, opts \\ []) do
473+
partition = PartitionBy.build(binding, expr, __CALLER__)
474+
case Keyword.get(opts, :order_by) do
475+
nil -> partition
476+
fields -> OrderBy.build(partition, binding, fields, __CALLER__)
477+
end
478+
end
479+
480+
defmacro window(query, binding \\ [], expr) do
481+
Window.build(query, binding, expr, __CALLER__)
482+
end
483+
467484
@doc """
468485
Converts a query into a subquery.
469486
@@ -648,7 +665,7 @@ defmodule Ecto.Query do
648665

649666
@from_join_opts [:as, :prefix, :hints]
650667
@no_binds [:lock]
651-
@binds [:where, :or_where, :select, :distinct, :order_by, :group_by] ++
668+
@binds [:where, :or_where, :select, :distinct, :order_by, :group_by, :window] ++
652669
[:having, :or_having, :limit, :offset, :preload, :update, :select_merge]
653670

654671
defp from([{type, expr}|t], env, count_bind, quoted, binds) when type in @binds do

lib/ecto/query/api.ex

+5
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,11 @@ defmodule Ecto.Query.API do
176176
"""
177177
def coalesce(value, expr), do: doc! [value, expr]
178178

179+
## FIXME doc
180+
@doc """
181+
"""
182+
def over(value, window), do: doc! [value, window]
183+
179184
@doc """
180185
Applies the given expression as a FILTER clause against an
181186
aggregate. This is currently only supported by Postgres.

lib/ecto/query/builder.ex

+32
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,12 @@ defmodule Ecto.Query.Builder do
266266
{expr, params_acc}
267267
end
268268

269+
def escape({:row_number, _, [arg]}, type, params_acc, vars, env) do
270+
{arg, params_acc} = escape(arg, type, params_acc, vars, env)
271+
expr = {:{}, [], [:row_number, [], [arg]]}
272+
{expr, params_acc}
273+
end
274+
269275
def escape({:filter, _, [aggregate]}, type, params_acc, vars, env) do
270276
escape(aggregate, type, params_acc, vars, env)
271277
end
@@ -282,6 +288,32 @@ defmodule Ecto.Query.Builder do
282288
{{:{}, [], [:coalesce, [], [left, right]]}, params_acc}
283289
end
284290

291+
def escape({:over, _, [aggregate]}, type, params_acc, vars, env) do
292+
{aggregate, params_acc} = escape(aggregate, type, params_acc, vars, env)
293+
{{:{}, [], [:over, [], [aggregate, nil]]}, params_acc}
294+
end
295+
296+
def escape({:over, _, [aggregate, {:^, _, [window]}]}, type, params_acc, vars, env) do
297+
{aggregate, params_acc} = escape(aggregate, type, params_acc, vars, env)
298+
{{:{}, [], [:over, [], [aggregate, window]]}, params_acc}
299+
end
300+
301+
def escape({:over, _, [aggregate, {:partition_by, meta, fields}]}, type, params_acc, vars, env) do
302+
{aggregate, params_acc} = escape(aggregate, type, params_acc, vars, env)
303+
window = {:partition_by, meta, [{:vars, vars} | fields]}
304+
{{:{}, [], [:over, [], [aggregate, window]]}, params_acc}
305+
end
306+
307+
def escape({:over, _, [aggregate, {window_name, _, nil}]}, type, params_acc, vars, env) when is_atom(window_name) do
308+
{aggregate, params_acc} = escape(aggregate, type, params_acc, vars, env)
309+
{{:{}, [], [:over, [], [aggregate, window_name]]}, params_acc}
310+
end
311+
312+
def escape({:over, _, [aggregate, window_name]}, type, params_acc, vars, env) when is_atom(window_name) do
313+
{aggregate, params_acc} = escape(aggregate, type, params_acc, vars, env)
314+
{{:{}, [], [:over, [], [aggregate, window_name]]}, params_acc}
315+
end
316+
285317
def escape({:=, _, _} = expr, _type, _params_acc, _vars, _env) do
286318
error! "`#{Macro.to_string(expr)}` is not a valid query expression. " <>
287319
"The match operator is not supported: `=`. " <>

lib/ecto/query/builder/order_by.ex

+9
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,18 @@ defmodule Ecto.Query.Builder.OrderBy do
138138
params: unquote(params),
139139
file: unquote(env.file),
140140
line: unquote(env.line)}
141+
141142
Builder.apply_query(query, __MODULE__, [order_by], env)
142143
end
143144

145+
@doc """
146+
The callback applied by `build/4` to build the partition.
147+
"""
148+
@spec apply(Ecto.Query.PartitionByExpr.t, term) :: Ecto.Query.PartitionByExpr.t
149+
def apply(%Ecto.Query.PartitionByExpr{order_bys: order_bys} = partition, expr) do
150+
%{partition | order_bys: order_bys ++ [expr]}
151+
end
152+
144153
@doc """
145154
The callback applied by `build/4` to build the query.
146155
"""
+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import Kernel, except: [apply: 2]
2+
3+
defmodule Ecto.Query.Builder.PartitionBy do
4+
@moduledoc false
5+
6+
alias Ecto.Query.Builder
7+
8+
@doc false
9+
@spec build(Tuple.t, [Macro.t], Macro.Env.t) :: Macro.t
10+
def build({:vars, vars}, fields, env) when is_list(fields) do
11+
{fields, {_, :acc}} = Builder.escape(fields, :any, {%{}, :acc}, vars, env)
12+
13+
quote do
14+
%Ecto.Query.PartitionByExpr{
15+
fields: unquote(fields),
16+
file: unquote(env.file),
17+
line: unquote(env.line)}
18+
end
19+
end
20+
21+
@doc false
22+
@spec build(Tuple.t, Macro.t, Macro.Env.t) :: Macro.t
23+
def build({:vars, _} = vars, field, env) do
24+
build(vars, [field], env)
25+
end
26+
27+
@doc """
28+
Builds a partition by expression.
29+
"""
30+
@spec build([Macro.t], Macro.t, Macro.Env.t) :: Macro.t
31+
def build(binding, expr, env) do
32+
{_, vars} = Builder.escape_binding(quote(do: query), binding, env)
33+
build({:vars, vars}, expr, env)
34+
end
35+
end

lib/ecto/query/builder/window.ex

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import Kernel, except: [apply: 3]
2+
3+
defmodule Ecto.Query.Builder.Window do
4+
@moduledoc false
5+
6+
alias Ecto.Query.Builder
7+
8+
@doc """
9+
"""
10+
@spec escape(Macro.t, Keyword.t, Macro.Env.t) :: {Atom.t, Macro.t}
11+
def escape({name, _, [{:as, _, [expr]}]}, binding, env) when is_atom(name) do
12+
escape(name, expr, binding, env)
13+
end
14+
15+
@spec escape(Atom.t, Macro.t, Keyword.t, Macro.Env.t) :: {String.t, Macro.t}
16+
def escape(name, {:^, _, [expr]}, _binding, _env) do
17+
{name, expr}
18+
end
19+
20+
def escape(name, {:partition_by, meta, [fields, opts]}, binding, _env) when is_list(opts) do
21+
expr = {:partition_by, meta, [binding, fields, opts]}
22+
{name, expr}
23+
end
24+
25+
def escape(name, {:partition_by, meta, [fields]}, binding, _env) when is_list(fields) do
26+
expr = {:partition_by, meta, [binding, fields]}
27+
{name, expr}
28+
end
29+
30+
def escape(name, {:partition_by, meta, [field]}, binding, _env) do
31+
expr = {:partition_by, meta, [binding, [field]]}
32+
{name, expr}
33+
end
34+
35+
@doc """
36+
Builds a quoted expression.
37+
38+
The quoted expression should evaluate to a query at runtime.
39+
If possible, it does all calculations at compile time to avoid
40+
runtime work.
41+
"""
42+
@spec build(Macro.t, [Macro.t], Macro.t, Macro.Env.t) :: Macro.t
43+
def build(query, binding, expr, env) do
44+
{name, partition} = escape(expr, binding, env)
45+
46+
Builder.apply_query(query, __MODULE__, [partition, name], env)
47+
end
48+
49+
@doc """
50+
The callback applied by `build/4` to build the query.
51+
"""
52+
@spec apply(Ecto.Queryable.t, Macro.t, term) :: Ecto.Query.t
53+
def apply(%Ecto.Query{windows: windows} = query, expr, name) do
54+
if Keyword.has_key?(windows, name) do
55+
Builder.error! "window with name #{name} is already defined"
56+
else
57+
%{query | windows: windows ++ [{name, expr}]}
58+
end
59+
end
60+
def apply(query, expr, name) do
61+
apply(Ecto.Queryable.to_query(query), expr, name)
62+
end
63+
end

lib/ecto/query/inspect.ex

+31-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import Inspect.Algebra
22
import Kernel, except: [to_string: 1]
33

4-
alias Ecto.Query.{DynamicExpr, JoinExpr, QueryExpr}
4+
alias Ecto.Query.{DynamicExpr, JoinExpr, QueryExpr, PartitionByExpr}
55

66
defimpl Inspect, for: Ecto.Query.DynamicExpr do
77
def inspect(%DynamicExpr{binding: binding} = dynamic, opts) do
@@ -52,6 +52,7 @@ defimpl Inspect, for: Ecto.Query do
5252
joins = joins(query.joins, names)
5353
preloads = preloads(query.preloads)
5454
assocs = assocs(query.assocs, names)
55+
windows = windows(query.windows, names)
5556

5657
wheres = bool_exprs(%{and: :where, or: :or_where}, query.wheres, names)
5758
group_bys = kw_exprs(:group_by, query.group_bys, names)
@@ -65,7 +66,7 @@ defimpl Inspect, for: Ecto.Query do
6566
select = kw_expr(:select, query.select, names)
6667
distinct = kw_expr(:distinct, query.distinct, names)
6768

68-
Enum.concat [from, joins, wheres, group_bys, havings, order_bys,
69+
Enum.concat [from, joins, wheres, group_bys, havings, windows, order_bys,
6970
limit, offset, lock, distinct, updates, select, preloads, assocs]
7071
end
7172

@@ -121,6 +122,30 @@ defimpl Inspect, for: Ecto.Query do
121122
end
122123
end
123124

125+
defp windows(windows, names) do
126+
Enum.map(windows, &window(&1, names))
127+
end
128+
129+
defp window({name, definition}, names) do
130+
{:window, "(#{name} as " <> partition_by(definition, names) <> ")"}
131+
end
132+
133+
defp window({name, %{ order_bys: order_bys } = definition}, names) do
134+
{:window, string} = window({name, %{ definition | order_bys: [] }}, names)
135+
[order_by: order_string] = kw_exprs(:order_by, order_bys, names)
136+
{:window, string <> " order_by " <> order_string}
137+
end
138+
139+
defp partition_by(%{ order_bys: order_bys } = definition, names) do
140+
partition = "partition_by " <> expr(definition, names)
141+
case order_bys do
142+
[] -> partition
143+
fields ->
144+
[order_by: order_string] = kw_exprs(:order_by, fields, names)
145+
partition <> ", order_by: " <> order_string
146+
end
147+
end
148+
124149
defp bool_exprs(keys, exprs, names) do
125150
Enum.map exprs, fn %{expr: expr, op: op} = part ->
126151
{Map.fetch!(keys, op), expr(expr, names, part)}
@@ -141,6 +166,10 @@ defimpl Inspect, for: Ecto.Query do
141166
kw_inspect(:as, as) ++ kw_inspect(:prefix, prefix)
142167
end
143168

169+
defp expr(%PartitionByExpr{fields: fields} = part, names) do
170+
Macro.to_string(fields, &expr_to_string(&1, &2, names, part))
171+
end
172+
144173
defp expr(%{expr: expr} = part, names) do
145174
expr(expr, names, part)
146175
end

lib/ecto/query/planner.ex

+6-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Ecto.Query.Planner do
44

55
alias Ecto.Query.{BooleanExpr, DynamicExpr, JoinExpr, QueryExpr, SelectExpr}
66

7-
if map_size(%Ecto.Query{}) != 18 do
7+
if map_size(%Ecto.Query{}) != 19 do
88
raise "Ecto.Query match out of date in builder"
99
end
1010

@@ -1022,6 +1022,11 @@ defmodule Ecto.Query.Planner do
10221022
{type, [expr | fields], from}
10231023
end
10241024

1025+
defp collect_fields({:over, _, [call, _]} = expr, fields, from, query, take) do
1026+
{type, _, _} = collect_fields(call, fields, from, query, take)
1027+
{type, [expr | fields], from}
1028+
end
1029+
10251030
defp collect_fields({{:., _, [{:&, _, [ix]}, field]}, _, []} = expr,
10261031
fields, from, %{select: select} = query, _take) do
10271032
type = source_type!(:select, query, select, ix, field)

0 commit comments

Comments
 (0)