docs: FLAME cluster guide + fixes for distributed AVG rewrite, macro replay, n_rows#47
Merged
Merged
Conversation
4c47480 to
364ae28
Compare
New guide: flame-clusters.livemd - Full walkthrough from zero to 5-machine cluster with FLAME + Fly.io - Uses Ookla Speedtest open dataset (~20GB public Parquet on S3) - Covers: anonymous S3 access, FLAME pool config, spin_up with memory limits and setup callbacks, distributed queries, joins, SQL macros on workers, distributed writes, monitoring, cleanup - Runnable as a Livebook on Fly.io Cheatsheet updates: - Added SQL macros section (define, define_table, undefine, list_macros) - Added grouping section (group_by, ungroup) - Added exec/1 to SQL section - Updated FLAME section with memory_limit, temp_directory, local/1 README: - Added performance section with Dux vs Explorer (Polars) benchmarks - Added FLAME clusters guide to guides list Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
364ae28 to
ecf2291
Compare
n_rows optimization: - For pipelines with pending ops, compile to SQL and wrap in SELECT count(*) instead of materializing the full result set. DuckDB can push down the count and use Parquet metadata. - For already-materialized tables, count directly (no change). FLAME guide: - Add require Dux cell (needed for macro expressions like sum(), median(), etc. to compile) - The missing require was causing "undefined function" errors Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
create_secret(:ookla, type: :s3, provider: :config, key_id: "", secret: "") works for public buckets. Cleaner than three SET calls and consistent with how authenticated secrets are created. Also consolidate require Dux into a single early cell. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
sort_by on Hive partition columns during distributed execution can fail when the coordinator merge doesn't preserve all column names. Moving sort after collect (where data is local) avoids the issue. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
SQL macros defined via Dux.define/3 were stored in :persistent_term which is node-local. Workers on remote BEAM nodes (FLAME runners) couldn't see them, causing "undefined function" errors for macros in distributed pipelines. Fix: QueryBuilder.build/2 now prepends macro setup SQLs to the source_setup list. Workers execute source_setup before the main query, so macros are available regardless of which node the worker runs on. CREATE OR REPLACE is idempotent — safe to replay on every build. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Each FLAME runner takes ~30s to boot (DuckDB driver download + setup). 5 sequential boots on a small coordinator is 2.5+ minutes. 3 workers is more practical for getting started. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The previous fix attempted to send macros through QueryBuilder.build, but build runs on the worker node where persistent_term is empty. New approach: the coordinator reads macro SQLs from its local persistent_term and sends them to each worker via execute_sqls/2 before the fan-out. execute_sqls runs the SQL directly on the worker's private DuckDB connection (GenServer state), not the app-level Dux.Connection. This ensures macros defined on the coordinator are available on remote FLAME workers regardless of which node they run on. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The view path in compute/1 could fail for coordinator post-merge processing because the merged temp table's ref may be GC'd before the view is queried. The AVG rewrite columns (__avg_sum_*, etc.) would be missing, causing "column not found" errors. Fix: finalize bypasses Dux.compute and uses Backend.query directly (CREATE TABLE AS). This is safe because post-merge data is small (aggregation results) — CTAS cost is negligible. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When summarise contains AVG (or STDDEV/COUNT DISTINCT), the pipeline splitter rewrites it to SUM+COUNT on workers. But sort_by, filter, and head ops after the summarise still reference the original column names (e.g. avg_latency), which don't exist on workers — they only exist after the coordinator reconstructs them from __avg_sum/__avg_count. Fix: when summarise has rewrites, all subsequent ops go to the coordinator only. Workers execute up to and including the rewritten summarise; the coordinator handles filter, sort, head after the AVG reconstruction. Also reverts the CTAS workaround in finalize — the real fix is in the pipeline splitter, not bypassing views. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
New FLAME cluster guide using the Ookla Speedtest dataset, plus several bugs found and fixed while testing it end-to-end on Fly.io.
New: FLAME cluster guide (
guides/flame-clusters.livemd)Full walkthrough for building ad-hoc Spark-like clusters with FLAME + Fly.io, using the Ookla Speedtest open dataset (~20GB public Parquet on S3). Covers:
create_secretwithprovider: :configspin_upwithmemory_limitandsetupcallbacksFix: pipeline splitter pushes post-AVG ops to workers
When
summarisecontainsavg(), the pipeline splitter rewrites it toSUM + COUNTon workers. Butsort_by,filter, andheadops after the summarise were still pushed to workers referencing the original column names (e.g.avg_latency), which don't exist — they're only reconstructed on the coordinator from__avg_sum / __avg_count.Fix: when summarise has AVG/STDDEV rewrites, all subsequent ops go to the coordinator only.
Fix: macro replay on remote FLAME workers
SQL macros defined via
Dux.define/3are stored in:persistent_termwhich is node-local. Remote FLAME workers couldn't see them. Fix: the coordinator reads macros from its local persistent_term and sends them to each worker viaWorker.execute_sqls/2(runs on the worker's private DuckDB connection) before fan-out.Fix: n_rows skips materialization
n_rows/1previously calledcompute()(materializing the full result) thenSELECT count(*). Now it compiles the pipeline to SQL and wraps inSELECT count(*) FROM (...)— DuckDB can push down the count and use Parquet metadata without scanning rows.Updated: cheatsheet + README
exec/1,local/1, updated FLAME sectionTest plan
mix docsbuilds with new guide🤖 Generated with Claude Code