Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 150 additions & 1 deletion lib/dux/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,83 @@ defmodule Dux.Query do
For the full list, see the
[DuckDB Functions reference](https://duckdb.org/docs/sql/functions/overview).

For anything the macro doesn't support (window functions, subqueries),
## Window functions

Use `over/2` inside `mutate` to apply window functions. Each column can
have its own window specification:

# Ranking within groups
Dux.mutate(df,
rank: over(row_number(), partition_by: :dept, order_by: [desc: :salary])
)

# Running totals
Dux.mutate(df, running: over(sum(amount), order_by: :date))

# Percentage of group
Dux.mutate(df, pct: salary * 100.0 / over(sum(salary), partition_by: :dept))

# Lag / lead
Dux.mutate(df,
prev: over(lag(amount, 1), order_by: :date),
next: over(lead(amount, 1), order_by: :date)
)

### `over/2` options

* `:partition_by` — column or list of columns to partition by
* `:order_by` — column, or keyword list with `:asc`/`:desc` directions
* `:frame` — frame specification (see below)

### Frame specifications

Frames control which rows are included in the window. Pass a tuple of
`{type, start, end}` where:

* **type** — `:rows`, `:range`, or `:groups`
* **start** — negative integer (N PRECEDING), `:unbounded` (UNBOUNDED PRECEDING),
`:current` or `0` (CURRENT ROW)
* **end** — positive integer (N FOLLOWING), `:unbounded` (UNBOUNDED FOLLOWING),
`:current` or `0` (CURRENT ROW)

Common patterns:

# 3-row moving average (2 preceding + current)
over(avg(x), order_by: :date, frame: {:rows, -2, :current})

# Cumulative sum (all preceding rows)
over(sum(x), order_by: :date, frame: {:rows, :unbounded, :current})

# Centered 5-row window (2 preceding + current + 2 following)
over(avg(x), order_by: :date, frame: {:rows, -2, 2})

# All rows in the partition
over(sum(x), frame: {:rows, :unbounded, :unbounded})

# Range-based window (value range, not row count)
over(sum(x), order_by: :date, frame: {:range, :unbounded, :current})

Add an `exclude:` option for EXCLUDE clauses:

# Exclude current row from the window
over(avg(x), order_by: :val, frame: {:rows, -2, 2, exclude: :current})

# Exclude tied rows
over(avg(x), order_by: :val, frame: {:rows, :unbounded, :unbounded, exclude: :ties})

Exclude options: `:current`, `:group`, `:ties`, `:no_others`.

Raw SQL strings still work as a fallback for complex frames:

over(sum(x), frame: "ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING")

### Bare window

`over/1` with no options applies the function over the entire result set:

Dux.mutate(df, total: over(sum(amount)))

For anything the macro doesn't support (subqueries, etc.),
use the `_with` variants (`mutate_with/2`, `filter_with/2`) which accept
raw DuckDB SQL strings.

Expand Down Expand Up @@ -267,6 +343,23 @@ defmodule Dux.Query do
{{:concat, l_ast, r_ast}, pins}
end

# over(expr, opts) — window function: expr OVER (PARTITION BY ... ORDER BY ...)
defp traverse({:over, _meta, [expr, opts]}, pins) when is_list(opts) do
{expr_ast, pins} = traverse(expr, pins)
{partition_asts, pins} = traverse_partition_by(Keyword.get(opts, :partition_by), pins)
{order_asts, pins} = traverse_order_by(Keyword.get(opts, :order_by), pins)
# Resolve frame at macro expansion time — it's always a literal (tuple, string, or nil).
# Elixir AST represents 3+ element tuples as {:{}, meta, elements}, so we unwrap.
frame = opts |> Keyword.get(:frame) |> resolve_frame_ast()
{{:over, expr_ast, partition_asts, order_asts, frame}, pins}
end

# over(expr) — bare window (OVER ())
defp traverse({:over, _meta, [expr]}, pins) do
{expr_ast, pins} = traverse(expr, pins)
{{:over, expr_ast, [], [], nil}, pins}
end

# Function calls — aggregations and other functions
defp traverse({func, _meta, args}, pins) when is_atom(func) and is_list(args) do
{arg_asts, pins} =
Expand Down Expand Up @@ -323,4 +416,60 @@ defmodule Dux.Query do
defp translate_op(:/), do: :div
defp translate_op(:and), do: :and
defp translate_op(:or), do: :or

# --- Window frame AST resolution ---
# At macro expansion time, tuples with 3+ elements are represented as
# {:{}, meta, elements} in the AST. We unwrap them to plain tuples
# so the compiler receives runtime-like values.

defp resolve_frame_ast(nil), do: nil
defp resolve_frame_ast(s) when is_binary(s), do: s
defp resolve_frame_ast({a, b}), do: {a, b}

defp resolve_frame_ast({:{}, _meta, elements}) do
elements |> Enum.map(&resolve_frame_element/1) |> List.to_tuple()
end

defp resolve_frame_ast(other), do: other

# Negative literals are {:-, meta, [n]} in the AST
defp resolve_frame_element({:-, _meta, [n]}) when is_integer(n), do: -n
defp resolve_frame_element(other), do: other

# --- Window function helpers ---

defp traverse_partition_by(nil, pins), do: {[], pins}

defp traverse_partition_by(cols, pins) when is_list(cols),
do: Enum.map_reduce(cols, pins, &traverse/2)

defp traverse_partition_by(col, pins) do
{ast, pins} = traverse(col, pins)
{[ast], pins}
end

defp traverse_order_by(nil, pins), do: {[], pins}

defp traverse_order_by(specs, pins) when is_list(specs),
do: Enum.map_reduce(specs, pins, &traverse_order_spec/2)

defp traverse_order_by(col, pins) do
{ast, pins} = traverse(col, pins)
{[{:asc, ast}], pins}
end

defp traverse_order_spec({:asc, col}, pins) do
{ast, pins} = traverse(col, pins)
{{:asc, ast}, pins}
end

defp traverse_order_spec({:desc, col}, pins) do
{ast, pins} = traverse(col, pins)
{{:desc, ast}, pins}
end

defp traverse_order_spec(col, pins) do
{ast, pins} = traverse(col, pins)
{{:asc, ast}, pins}
end
end
79 changes: 79 additions & 0 deletions lib/dux/query/compiler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,22 @@ defmodule Dux.Query.Compiler do
{"#{sql_name}(#{Enum.join(arg_sqls, ", ")})", all_params, idx}
end

# --- Window functions: OVER (PARTITION BY ... ORDER BY ...) ---

defp compile({:over, expr, partition_by, order_by, frame}, pins, idx) do
{expr_sql, expr_params, idx} = compile(expr, pins, idx)
{partition_clause, partition_params, idx} = compile_partition_by(partition_by, pins, idx)
{order_clause, order_params, idx} = compile_order_by(order_by, pins, idx)
frame_clause = compile_frame(frame)

window_parts =
[partition_clause, order_clause, frame_clause]
|> Enum.reject(&(&1 == ""))
|> Enum.join(" ")

{"#{expr_sql} OVER (#{window_parts})", expr_params ++ partition_params ++ order_params, idx}
end

# --- CASE WHEN ---

defp compile({:case_when, pairs, else_expr}, pins, idx) do
Expand Down Expand Up @@ -202,6 +218,69 @@ defmodule Dux.Query.Compiler do
{"#{sql} DESC", params, idx}
end

# --- Frame clause compilation ---

defp compile_frame(nil), do: ""
defp compile_frame(s) when is_binary(s), do: s

defp compile_frame({type, start_bound, end_bound}) when type in [:rows, :range, :groups] do
type_str = type |> to_string() |> String.upcase()
"#{type_str} BETWEEN #{frame_bound(start_bound, :start)} AND #{frame_bound(end_bound, :end)}"
end

defp compile_frame({type, start_bound, end_bound, opts})
when type in [:rows, :range, :groups] and is_list(opts) do
base = compile_frame({type, start_bound, end_bound})

case Keyword.get(opts, :exclude) do
nil -> base
:current -> "#{base} EXCLUDE CURRENT ROW"
:group -> "#{base} EXCLUDE GROUP"
:ties -> "#{base} EXCLUDE TIES"
:no_others -> "#{base} EXCLUDE NO OTHERS"
end
end

# Frame bounds: negative = PRECEDING, positive = FOLLOWING, 0 = CURRENT ROW.
# :unbounded in start position = UNBOUNDED PRECEDING
# :unbounded in end position = UNBOUNDED FOLLOWING
# We compile start and end separately to handle :unbounded direction.
defp frame_bound(n, _position) when is_integer(n) and n < 0, do: "#{abs(n)} PRECEDING"
defp frame_bound(n, _position) when is_integer(n) and n > 0, do: "#{n} FOLLOWING"
defp frame_bound(0, _position), do: "CURRENT ROW"
defp frame_bound(:current, _position), do: "CURRENT ROW"
defp frame_bound(:unbounded, :start), do: "UNBOUNDED PRECEDING"
defp frame_bound(:unbounded, :end), do: "UNBOUNDED FOLLOWING"

# --- Window function helpers ---

defp compile_partition_by([], _pins, idx), do: {"", [], idx}

defp compile_partition_by(cols, pins, idx) do
{col_sqls, all_params, idx} = compile_list(cols, pins, idx)
{"PARTITION BY #{Enum.join(col_sqls, ", ")}", all_params, idx}
end

defp compile_order_by([], _pins, idx), do: {"", [], idx}

defp compile_order_by(specs, pins, idx) do
{spec_sqls, all_params, idx} =
Enum.reduce(specs, {[], [], idx}, fn {dir, col_ast}, {sqls, params, idx} ->
{sql, new_params, idx} = compile(col_ast, pins, idx)
dir_str = if dir == :desc, do: "DESC", else: "ASC"
{sqls ++ ["#{sql} #{dir_str}"], params ++ new_params, idx}
end)

{"ORDER BY #{Enum.join(spec_sqls, ", ")}", all_params, idx}
end

defp compile_list(items, pins, idx) do
Enum.reduce(items, {[], [], idx}, fn item, {sqls, params, idx} ->
{sql, new_params, idx} = compile(item, pins, idx)
{sqls ++ [sql], params ++ new_params, idx}
end)
end

# --- Helpers ---

defp quote_ident(name) do
Expand Down
Loading
Loading