Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ Dux.from_parquet("s3://data/sales/**/*.parquet")
|> Dux.to_rows()
```

## Performance

Dux pipelines compile to SQL and execute inside DuckDB — no data crosses into Elixir until you materialise. On a 10M-row dataset (Apple M3 Max, 36GB):

| Operation | Dux | Explorer (Polars) | Ratio |
|-----------|-----|-------------------|-------|
| Filter (10M rows) | 41ms | 13ms | 3.1x |
| Mutate (10M rows) | ~40ms | ~14ms | ~3x |
| Group + Summarise | ~12ms | ~21ms | **0.6x** |
| Memory per compute | 5-10 KB | 5-10 KB | ~same |

Dux is within 3x of Polars for single-node operations and **faster for aggregations** (DuckDB's columnar engine). The gap narrows further at scale — Dux can distribute across machines while Polars is single-node.

## Design

Dux is the successor to [Explorer](https://github.com/elixir-explorer/explorer). That means it borrows its verb design from dplyr and the tidyverse — constrained, composable operations that each do one thing well. If you've used `dplyr::filter()`, `mutate()`, `group_by() |> summarise()`, the Dux API will feel familiar.
Expand Down Expand Up @@ -180,6 +193,7 @@ Lazy pipelines render with source provenance, operations, and generated SQL. Com
- [Transformations](https://hexdocs.pm/dux/transformations.html) — filter, mutate, window functions
- [Joins & Reshape](https://hexdocs.pm/dux/joins-and-reshape.html) — join types, ASOF joins, pivots
- [Distributed Execution](https://hexdocs.pm/dux/distributed.html) — architecture, partitioning, distributed IO
- [FLAME Clusters](https://hexdocs.pm/dux/flame-clusters.html) — ad-hoc Spark-like clusters with Fly.io
- [Graph Analytics](https://hexdocs.pm/dux/graph-analytics.html) — PageRank, shortest paths, components
- [Cheatsheet](https://hexdocs.pm/dux/cheatsheet.html) — quick reference for all verbs

Expand Down
28 changes: 26 additions & 2 deletions guides/cheatsheet.cheatmd
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Dux.drop_secret(:s3)
### From SQL
```elixir
Dux.from_query("SELECT * FROM range(100) t(x)")
Dux.exec("SET threads = 8") # raw DDL/DML
```

## Filtering
Expand Down Expand Up @@ -101,6 +102,13 @@ Dux.slice(df, 5, 10) # offset 5, take 10
Dux.distinct(df) # deduplicate all columns
```

### Grouping
```elixir
Dux.group_by(df, :region) # set groups
Dux.group_by(df, [:region, :year]) # multi-column
Dux.ungroup(df) # clear groups
```

## Aggregation

### Group + Summarise
Expand Down Expand Up @@ -223,6 +231,17 @@ Dux.sql_preview(df) # → SQL string
Dux.sql_preview(df, pretty: true) # → formatted SQL
```

## SQL Macros

```elixir
# Reusable SQL functions — fully lazy, zero overhead
Dux.define(:double, [:x], "x * 2")
Dux.define(:risk, [:score], "CASE WHEN score > 0.8 THEN 'high' ELSE 'low' END")
Dux.define_table(:date_spine, [:s, :e], "SELECT * FROM generate_series(s::DATE, e::DATE, INTERVAL 1 DAY) t(d)")
Dux.undefine(:double)
Dux.list_macros()
```

## Distributed

### Reads
Expand Down Expand Up @@ -260,8 +279,13 @@ df |> Dux.distribute(workers) |> Dux.collect()

### FLAME: elastic cloud compute
```elixir
Dux.Flame.start_pool(backend: {FLAME.FlyBackend, ...}, max: 10)
workers = Dux.Flame.spin_up(5)
workers = Dux.Flame.spin_up(5,
pool: :dux_pool,
memory_limit: "4GB",
temp_directory: "/tmp/dux_spill"
)
Dux.distribute(df, workers) |> Dux.compute()
Dux.local(df) # back to single-node
```

## Graph Analytics
Expand Down
272 changes: 272 additions & 0 deletions guides/flame-clusters.livemd
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
# FLAME Clusters: Ad-Hoc Spark on the BEAM

```elixir
Mix.install([
{:dux, github: "elixir-dux/dux", branch: "docs/guides-and-flame-cluster", override: true},
{:kino_dux, "~> 0.1"},
{:flame, "~> 0.5"}
])
```

## Overview

This guide walks through building an ad-hoc distributed compute cluster
using [FLAME](https://github.com/phoenixframework/flame) and
[Fly.io](https://fly.io). We'll query the
[Ookla Speedtest](https://registry.opendata.aws/speedtest-global-performance/)
open dataset — ~20GB of global internet speed measurements stored as
Parquet on S3.

Each FLAME runner boots a fresh machine with its own DuckDB, reads S3
data directly, and auto-terminates when idle. Think of it as Spark-style
elastic compute, but on the BEAM — no JVM, no YARN, no cluster manager.

**Prerequisites:**

* A Fly.io account with a `FLY_API_TOKEN`
* This notebook running on a Fly.io Livebook instance

## The Dataset

[Ookla](https://www.ookla.com/ookla-for-good/open-data) publishes
quarterly internet speed test data as open Parquet files:

```
s3://ookla-open-data/parquet/performance/
type={fixed,mobile}/
year={2019..2025}/
quarter={1..4}/
*.parquet
```

~56 files, Hive-partitioned by connection type, year, and quarter.
Each file contains millions of tile-level measurements: download/upload
speeds, latency, test counts, and geographic quadkeys.

The data is **public — no S3 credentials needed**.

```elixir
require Dux
```

## 1. Configure Anonymous S3 Access

DuckDB reads S3 via the `httpfs` extension. For public buckets,
we create a secret with empty credentials — DuckDB makes unsigned requests.

```elixir
Dux.exec("INSTALL httpfs; LOAD httpfs")
Dux.create_secret(:ookla, type: :s3, provider: :config, key_id: "", secret: "", region: "us-west-2")
```

## 2. Explore Locally First

Before spinning up a cluster, let's look at a single quarter to
understand the data.

```elixir
one_quarter =
Dux.from_parquet(
"s3://ookla-open-data/parquet/performance/type=fixed/year=2024/quarter=4/*.parquet",
hive_partitioning: true
)

one_quarter
|> Dux.head(5)
|> Dux.to_rows()
```

```elixir
# How big is one quarter?
one_quarter |> Dux.n_rows()
```

```elixir
# Speed distribution
one_quarter
|> Dux.summarise(
median_download: median(avg_d_kbps / 1000.0),
median_upload: median(avg_u_kbps / 1000.0),
median_latency: median(avg_lat_ms),
total_tests: sum(tests),
total_devices: sum(devices)
)
|> Dux.to_rows()
```

## 3. Start the FLAME Pool

Now let's scale out. The pool configuration controls the machines FLAME boots.

```elixir
Kino.start_child!(
{FLAME.Pool,
name: :dux_pool,
code_sync: [
start_apps: true,
sync_beams: [Path.join(System.tmp_dir!(), "livebook_runtime")]
],
min: 0,
max: 10,
max_concurrency: 1,
backend: {FLAME.FlyBackend,
cpu_kind: "performance",
cpus: 4,
memory_mb: 8192,
token: System.fetch_env!("FLY_API_TOKEN"),
env: %{"LIVEBOOK_COOKIE" => Atom.to_string(Node.get_cookie())}
},
boot_timeout: 120_000,
idle_shutdown_after: :timer.minutes(5)}
)
```

Key settings:

* **`max_concurrency: 1`** — one DuckDB per machine. DuckDB saturates cores internally.
* **`memory_mb: 8192`** — 8GB per worker. DuckDB spills to `/tmp` if needed.
* **`idle_shutdown_after: 5 min`** — machines auto-terminate. You pay only for active compute.

## 4. Spin Up Workers

```elixir
# Start with 3 workers — each takes ~30s to boot (driver download + setup).
# Scale up with more if needed.
workers = Dux.Flame.spin_up(3,
pool: :dux_pool,
memory_limit: "4GB",
setup: fn ->
# Each worker needs httpfs + anonymous S3 access
Dux.exec("INSTALL httpfs; LOAD httpfs")
Dux.create_secret(:ookla, type: :s3, provider: :config, key_id: "", secret: "", region: "us-west-2")
end
)

IO.puts("#{length(workers)} workers ready")
```

## 5. Query the Full Dataset

Now read **all years of fixed broadband data** across the cluster.
Each worker reads its assigned Parquet files directly from S3 —
no data flows through your machine.

```elixir
all_fixed =
Dux.from_parquet(
"s3://ookla-open-data/parquet/performance/type=fixed/year=*/quarter=*/*.parquet",
hive_partitioning: true
)

# Global broadband trends by year
trends =
all_fixed
|> Dux.distribute(workers)
|> Dux.group_by(:year)
|> Dux.summarise(
median_download: median(avg_d_kbps / 1000.0),
median_upload: median(avg_u_kbps / 1000.0),
median_latency: median(avg_lat_ms),
total_tests: sum(tests),
total_devices: sum(devices)
)
|> Dux.sort_by(:year)
|> Dux.collect()
|> Dux.to_rows()
```

## 6. Compare Fixed vs Mobile

Query both connection types in one pipeline using SQL macros.

```elixir
Dux.define(:speed_tier, [:mbps], """
CASE
WHEN mbps >= 100 THEN 'fast (100+ Mbps)'
WHEN mbps >= 25 THEN 'moderate (25-100 Mbps)'
WHEN mbps >= 10 THEN 'slow (10-25 Mbps)'
ELSE 'very slow (<10 Mbps)'
END
""")

all_data =
Dux.from_parquet(
"s3://ookla-open-data/parquet/performance/type=*/year=2024/quarter=*/*.parquet",
hive_partitioning: true
)

speed_distribution =
all_data
|> Dux.distribute(workers)
|> Dux.mutate_with(tier: "speed_tier(avg_d_kbps / 1000.0)")
|> Dux.group_by([:type, :tier])
|> Dux.summarise(
tiles: count(tier),
total_tests: sum(tests)
)
|> Dux.sort_by([:type, desc: :tiles])
|> Dux.collect()
|> Dux.to_rows()
```

## 7. Heavy Aggregation: Latency by Quadkey Prefix

Quadkeys encode geographic tiles. The first few characters identify
the region. Let's find the areas with the worst latency.

```elixir
worst_latency =
all_fixed
|> Dux.distribute(workers)
|> Dux.filter(tests >= 10)
|> Dux.mutate_with(region: "LEFT(quadkey, 6)")
|> Dux.group_by(:region)
|> Dux.summarise(
avg_latency: avg(avg_lat_ms),
total_tests: sum(tests),
n_tiles: count(region)
)
|> Dux.filter(total_tests > 1000)
|> Dux.sort_by(desc: :avg_latency)
|> Dux.head(20)
|> Dux.collect()
|> Dux.to_rows()
```

## 8. Writing Results

Distributed writes go directly from workers to S3.

```elixir
# Write the aggregated trends back to your own bucket
# (uncomment and set your bucket)

# all_fixed
# |> Dux.distribute(workers)
# |> Dux.mutate(download_mbps: avg_d_kbps / 1000.0)
# |> Dux.to_parquet("s3://your-bucket/ookla-processed/", partition_by: [:year])
```

## 9. Cleanup

Workers auto-terminate after the idle timeout. To shut down immediately:

```elixir
Enum.each(workers, &GenServer.stop/1)
IO.puts("Workers stopped. FLAME runners will terminate shortly.")
```

## What Just Happened

You built a 5-machine compute cluster from a Livebook notebook.
Each machine:

1. Booted in ~30s via FLAME + Fly.io
2. Got a full copy of your notebook's compiled code
3. Started its own DuckDB with 4 cores and 8GB RAM
4. Read its assigned Parquet files directly from S3
5. Executed filter + group + aggregate locally
6. Sent small aggregated results back to the coordinator
7. Auto-terminated after 5 minutes idle

No infrastructure to manage. No cluster to maintain. Just notebooks and queries.
23 changes: 19 additions & 4 deletions lib/dux.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1740,13 +1740,28 @@ defmodule Dux do
...> |> Dux.n_rows()
42
"""
def n_rows(%Dux{} = dux) do
computed = compute(dux)
{:table, ref} = computed.source
conn = computed.conn || Dux.Connection.get_conn()
def n_rows(%Dux{source: {:table, ref}, ops: []} = dux) do
# Already materialized with no ops — just count directly
conn = dux.conn || Dux.Connection.get_conn()
Dux.Backend.table_n_rows(conn, ref)
end

def n_rows(%Dux{} = dux) do
# Compile the pipeline to SQL and wrap in COUNT(*) —
# DuckDB can push down the count without materializing all rows.
# For Parquet sources, this uses file metadata instead of scanning.
conn = dux.conn || Dux.Connection.get_conn()
{sql, source_setup} = Dux.QueryBuilder.build(dux, conn)

Enum.each(source_setup, fn setup_sql ->
Dux.Backend.execute(conn, setup_sql)
end)

result = Adbc.Connection.query!(conn, "SELECT count(*) AS n FROM (#{sql}) __cnt")
%{"n" => [n]} = Adbc.Result.to_map(result)
Dux.Backend.normalize_count(n)
end

# ---------------------------------------------------------------------------
# Nx interop
# ---------------------------------------------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions lib/dux/backend.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ defmodule Dux.Backend do
# Metadata
# ---------------------------------------------------------------------------

@doc false
def normalize_count(n), do: normalize_value(n)

@doc false
def table_names(conn, %TableRef{name: name}) do
{names, _types} = describe_table(conn, name)
Expand Down
Loading
Loading