Skip to content

Commit eb00b50

Browse files
committed
Implementation of over, partition by and window
1 parent 32e1a18 commit eb00b50

File tree

14 files changed

+877
-12
lines changed

14 files changed

+877
-12
lines changed

integration_test/cases/windows.exs

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
defmodule Ecto.Integration.WindowsTest do
2+
use Ecto.Integration.Case, async: Application.get_env(:ecto, :async_integration_tests, true)
3+
4+
alias Ecto.Integration.TestRepo
5+
import Ecto.Query
6+
7+
alias Ecto.Integration.Comment
8+
alias Ecto.Integration.User
9+
10+
test "count over partition" do
11+
u1 = TestRepo.insert!(%User{name: "Tester"})
12+
u2 = TestRepo.insert!(%User{name: "Developer"})
13+
c1 = TestRepo.insert!(%Comment{text: "1", author_id: u1.id})
14+
c2 = TestRepo.insert!(%Comment{text: "2", author_id: u1.id})
15+
c3 = TestRepo.insert!(%Comment{text: "3", author_id: u1.id})
16+
c4 = TestRepo.insert!(%Comment{text: "4", author_id: u2.id})
17+
18+
query = from(c in Comment, select: [c, count(c.id) |> over(partition_by(c.author_id))])
19+
20+
assert [[^c1, 3], [^c2, 3], [^c3, 3], [^c4, 1]] = TestRepo.all(query)
21+
end
22+
23+
test "last 2 of each author" do
24+
u1 = TestRepo.insert!(%User{name: "Tester"})
25+
u2 = TestRepo.insert!(%User{name: "Developer"})
26+
TestRepo.insert!(%Comment{text: "1", author_id: u1.id})
27+
TestRepo.insert!(%Comment{text: "2", author_id: u1.id})
28+
TestRepo.insert!(%Comment{text: "3", author_id: u1.id})
29+
TestRepo.insert!(%Comment{text: "4", author_id: u2.id})
30+
31+
subquery = from(c in Comment,
32+
windows: [rw: partition_by(c.author_id, order_by: :id)],
33+
select: %{
34+
comment: c.text,
35+
row: row_number() |> over(:rw),
36+
total: count(c.id) |> over(partition_by(c.author_id))
37+
},
38+
where: c.author_id in [^u1.id, ^u2.id]
39+
)
40+
41+
query = from(r in subquery(subquery),
42+
select: r.comment,
43+
where: (r.total - r.row) < 2
44+
)
45+
46+
assert ["2", "3", "4"] = TestRepo.all(query)
47+
end
48+
end

lib/ecto/adapters/mysql/connection.ex

+42-5
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,13 @@ if Code.ensure_loaded?(Mariaex) do
8282
where = where(query, sources)
8383
group_by = group_by(query, sources)
8484
having = having(query, sources)
85+
window = window(query, sources)
8586
order_by = order_by(query, sources)
8687
limit = limit(query, sources)
8788
offset = offset(query, sources)
8889
lock = lock(query.lock)
8990

90-
[select, from, join, where, group_by, having, order_by, limit, offset | lock]
91+
[select, from, join, where, group_by, having, window, order_by, limit, offset | lock]
9192
end
9293

9394
def update_all(query, prefix \\ nil) do
@@ -320,13 +321,34 @@ if Code.ensure_loaded?(Mariaex) do
320321
end)]
321322
end
322323

324+
defp window(%Query{windows: []}, _sources), do: []
325+
defp window(%Query{windows: windows} = query, sources) do
326+
[" WINDOW " |
327+
intersperse_map(windows, ", ", fn
328+
{name, definition} ->
329+
[quote_name(name), " AS ", partition_by(definition, sources, query)] end)]
330+
end
331+
332+
defp partition_by(%QueryExpr{expr: opts}, sources, %Query{} = query) do
333+
fields = Keyword.get(opts, :fields)
334+
order_bys = Keyword.get_values(opts, :order_by) |> Enum.concat
335+
fields = fields |> intersperse_map(", ", &expr(&1, sources, query))
336+
["(PARTITION BY ",
337+
fields,
338+
order_by(order_bys, query, sources),
339+
?)]
340+
end
341+
323342
defp order_by(%Query{order_bys: []}, _sources), do: []
324343
defp order_by(%Query{order_bys: order_bys} = query, sources) do
344+
order_bys = Enum.flat_map(order_bys, & &1.expr)
345+
order_by(order_bys, query, sources)
346+
end
347+
348+
defp order_by([], _query, _sources), do: []
349+
defp order_by(order_bys, query, sources) do
325350
[" ORDER BY " |
326-
intersperse_map(order_bys, ", ", fn
327-
%QueryExpr{expr: expr} ->
328-
intersperse_map(expr, ", ", &order_by_expr(&1, sources, query))
329-
end)]
351+
intersperse_map(order_bys, ", ", &order_by_expr(&1, sources, query))]
330352
end
331353

332354
defp order_by_expr({dir, expr}, sources, query) do
@@ -458,6 +480,21 @@ if Code.ensure_loaded?(Mariaex) do
458480
error!(query, "ilike is not supported by MySQL")
459481
end
460482

483+
defp expr({:over, _, [agg, %QueryExpr{} = window]}, sources, query) do
484+
aggregate = expr(agg, sources, query)
485+
[aggregate, " OVER ", partition_by(window, sources, query)]
486+
end
487+
488+
defp expr({:over, _, [agg, nil]}, sources, query) do
489+
aggregate = expr(agg, sources, query)
490+
[aggregate, " OVER ()"]
491+
end
492+
493+
defp expr({:over, _, [agg, name]}, sources, query) when is_atom(name) do
494+
aggregate = expr(agg, sources, query)
495+
[aggregate, " OVER ", quote_name(name)]
496+
end
497+
461498
defp expr({:{}, _, elems}, sources, query) do
462499
[?(, intersperse_map(elems, ?,, &expr(&1, sources, query)), ?)]
463500
end

lib/ecto/adapters/postgres/connection.ex

+40-1
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,13 @@ if Code.ensure_loaded?(Postgrex) do
121121
where = where(query, sources)
122122
group_by = group_by(query, sources)
123123
having = having(query, sources)
124+
window = window(query, sources)
124125
order_by = order_by(query, order_by_distinct, sources)
125126
limit = limit(query, sources)
126127
offset = offset(query, sources)
127128
lock = lock(query.lock)
128129

129-
[select, from, join, where, group_by, having, order_by, limit, offset | lock]
130+
[select, from, join, where, group_by, having, window, order_by, limit, offset | lock]
130131
end
131132

132133
def update_all(%{from: %{source: source}} = query, prefix \\ nil) do
@@ -380,9 +381,32 @@ if Code.ensure_loaded?(Postgrex) do
380381
end)]
381382
end
382383

384+
defp window(%Query{windows: []}, _sources), do: []
385+
defp window(%Query{windows: windows} = query, sources) do
386+
[" WINDOW " |
387+
intersperse_map(windows, ", ", fn
388+
{name, definition} ->
389+
[quote_name(name), " AS ", partition_by(definition, sources, query)] end)]
390+
end
391+
392+
defp partition_by(%QueryExpr{expr: opts}, sources, %Query{} = query) do
393+
fields = Keyword.get(opts, :fields)
394+
order_bys = Keyword.get_values(opts, :order_by) |> Enum.concat
395+
fields = fields |> intersperse_map(", ", &expr(&1, sources, query))
396+
["(PARTITION BY ",
397+
fields,
398+
order_by(order_bys, query, [], sources),
399+
?)]
400+
end
401+
383402
defp order_by(%Query{order_bys: []}, _distinct, _sources), do: []
384403
defp order_by(%Query{order_bys: order_bys} = query, distinct, sources) do
385404
order_bys = Enum.flat_map(order_bys, & &1.expr)
405+
order_by(order_bys, query, distinct, sources)
406+
end
407+
408+
defp order_by([], _query, _distinct, _sources), do: []
409+
defp order_by(order_bys, query, distinct, sources) do
386410
[" ORDER BY " |
387411
intersperse_map(distinct ++ order_bys, ", ", &order_by_expr(&1, sources, query))]
388412
end
@@ -509,6 +533,21 @@ if Code.ensure_loaded?(Postgrex) do
509533
[aggregate, " FILTER (WHERE ", expr(filter, sources, query), ?)]
510534
end
511535

536+
defp expr({:over, _, [agg, %QueryExpr{} = window]}, sources, query) do
537+
aggregate = expr(agg, sources, query)
538+
[aggregate, " OVER ", partition_by(window, sources, query)]
539+
end
540+
541+
defp expr({:over, _, [agg, nil]}, sources, query) do
542+
aggregate = expr(agg, sources, query)
543+
[aggregate, " OVER ()"]
544+
end
545+
546+
defp expr({:over, _, [agg, name]}, sources, query) when is_atom(name) do
547+
aggregate = expr(agg, sources, query)
548+
[aggregate, " OVER ", quote_name(name)]
549+
end
550+
512551
defp expr({:{}, _, elems}, sources, query) do
513552
[?(, intersperse_map(elems, ?,, &expr(&1, sources, query)), ?)]
514553
end

lib/ecto/query.ex

+23-3
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ defmodule Ecto.Query do
354354

355355
defstruct [prefix: nil, sources: nil, from: nil, joins: [], aliases: %{}, wheres: [], select: nil,
356356
order_bys: [], limit: nil, offset: nil, group_bys: [], updates: [],
357-
havings: [], preloads: [], assocs: [], distinct: nil, lock: nil]
357+
havings: [], preloads: [], assocs: [], distinct: nil, lock: nil, windows: []]
358358

359359
defmodule FromExpr do
360360
@moduledoc false
@@ -398,7 +398,7 @@ defmodule Ecto.Query do
398398
@opaque dynamic :: %DynamicExpr{}
399399

400400
alias Ecto.Query.Builder
401-
alias Ecto.Query.Builder.{Distinct, Dynamic, Filter, From, GroupBy, Join,
401+
alias Ecto.Query.Builder.{Distinct, Dynamic, Filter, From, GroupBy, Join, Windows,
402402
LimitOffset, Lock, OrderBy, Preload, Select, Update}
403403

404404
@doc """
@@ -464,6 +464,26 @@ defmodule Ecto.Query do
464464
Dynamic.build(binding, expr, __CALLER__)
465465
end
466466

467+
@doc """
468+
Defines windows which can be used with `Ecto.Query.API.over/2`.
469+
470+
Receives a keyword list where keys are names of the windows
471+
and values are `Ecto.Query.API.partition_by/2` expression.
472+
473+
## Examples
474+
475+
# Compare each employee's salary with the average salary in his or her department
476+
from e in Employee,
477+
select: {e.depname, e.empno, e.salary, avg(e.salary) |> over(:department)},
478+
windows: [department: partition_by(e.depname)]
479+
480+
Note: MySQL older than 8.0 doesn't support window functions,
481+
so you can use it only with MySQL newer than 8.0 or with any version of PostgreSQL.
482+
"""
483+
defmacro windows(query, binding \\ [], expr) do
484+
Windows.build(query, binding, expr, __CALLER__)
485+
end
486+
467487
@doc """
468488
Converts a query into a subquery.
469489
@@ -648,7 +668,7 @@ defmodule Ecto.Query do
648668

649669
@from_join_opts [:as, :prefix, :hints]
650670
@no_binds [:lock]
651-
@binds [:where, :or_where, :select, :distinct, :order_by, :group_by] ++
671+
@binds [:where, :or_where, :select, :distinct, :order_by, :group_by, :windows] ++
652672
[:having, :or_having, :limit, :offset, :preload, :update, :select_merge]
653673

654674
defp from([{type, expr}|t], env, count_bind, quoted, binds) when type in @binds do

0 commit comments

Comments
 (0)