Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ae0741b
feat: add standalone shuffle benchmark binary for profiling
andygrove Mar 21, 2026
9b5b305
feat: add --limit option to shuffle benchmark (default 1M rows)
andygrove Mar 21, 2026
e1ab490
perf: apply limit during parquet read to avoid scanning all files
andygrove Mar 21, 2026
b7682f4
feat: move shuffle_bench binary into shuffle crate
andygrove Mar 23, 2026
ca36cbd
chore: add comment explaining parquet/rand deps in shuffle crate
andygrove Mar 23, 2026
7225afd
Merge remote-tracking branch 'apache/main' into shuffle-bench-binary
andygrove Mar 26, 2026
6e8bed2
perf: add max_buffered_batches config and stream shuffle bench from p…
andygrove Mar 26, 2026
16ce30f
merge apache/main, remove max_buffered_batches changes
andygrove Mar 27, 2026
2ef57e7
cargo fmt
andygrove Mar 27, 2026
9136e10
prettier
andygrove Mar 27, 2026
7e16819
machete
andygrove Mar 27, 2026
22fe804
feat: add --concurrent-tasks flag to shuffle benchmark
andygrove Mar 28, 2026
58ab927
show metrics
andygrove Mar 30, 2026
c469077
improve metrics
andygrove Mar 30, 2026
45b5e75
merge apache/main
andygrove Mar 30, 2026
8cccb45
refactor: address PR feedback, remove metrics and read-back
andygrove Apr 1, 2026
a56c201
style: apply cargo fmt and prettier
andygrove Apr 1, 2026
f60aedc
Merge remote-tracking branch 'apache/main' into shuffle-bench-binary
andygrove Apr 1, 2026
cd4e550
fix: default codec to lz4 to match Comet default, fix about string
andygrove Apr 1, 2026
cb0c88f
Refactor PartitionWriter and add SpillInfo for combined spill files
Shekharrajak Apr 6, 2026
dc5d494
Use single spill file per spill event in shuffle repartitioner
Shekharrajak Apr 6, 2026
9fa440b
Add unit tests for single spill file shuffle behavior
Shekharrajak Apr 6, 2026
74c404d
Merge branch 'main' into shuffle-bench-binary
mbutrovich Apr 6, 2026
e1279ce
Merge branch 'shuffle-bench-binary' into feature/single-spill-file-sh…
Shekharrajak Apr 6, 2026
989e7d1
Cache spill file handles during finalize to avoid N*S open() calls
Shekharrajak Apr 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions native/shuffle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ publish = false
arrow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
clap = { version = "4", features = ["derive"], optional = true }
crc32c = "0.6.8"
crc32fast = "1.3.2"
datafusion = { workspace = true }
Expand All @@ -43,6 +44,8 @@ itertools = "0.14.0"
jni = "0.21"
log = "0.4"
lz4_flex = { version = "0.13.0", default-features = false, features = ["frame"] }
# parquet is only used by the shuffle_bench binary (shuffle-bench feature)
parquet = { workspace = true, optional = true }
simd-adler32 = "0.3.9"
snap = "1.1"
tokio = { version = "1", features = ["rt-multi-thread"] }
Expand All @@ -54,10 +57,18 @@ datafusion = { workspace = true, features = ["parquet_encryption", "sql"] }
itertools = "0.14.0"
tempfile = "3.26.0"

[features]
shuffle-bench = ["clap", "parquet"]

[lib]
name = "datafusion_comet_shuffle"
path = "src/lib.rs"

[[bin]]
name = "shuffle_bench"
path = "src/bin/shuffle_bench.rs"
required-features = ["shuffle-bench"]

[[bench]]
name = "shuffle_writer"
harness = false
Expand Down
41 changes: 41 additions & 0 deletions native/shuffle/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,44 @@ This crate provides the shuffle writer and reader implementation for Apache Data
of the [Apache DataFusion Comet] subproject.

[Apache DataFusion Comet]: https://github.com/apache/datafusion-comet/

## Shuffle Benchmark Tool

A standalone benchmark binary (`shuffle_bench`) is included for profiling shuffle write
performance outside of Spark. It streams input data directly from Parquet files.

### Basic usage

```sh
cargo run --release --features shuffle-bench --bin shuffle_bench -- \
--input /data/tpch-sf100/lineitem/ \
--partitions 200 \
--codec lz4 \
--hash-columns 0,3
```

### Options

| Option | Default | Description |
| --------------------- | -------------------------- | ------------------------------------------------------ |
| `--input` | _(required)_ | Path to a Parquet file or directory of Parquet files |
| `--partitions` | `200` | Number of output shuffle partitions |
| `--partitioning` | `hash` | Partitioning scheme: `hash`, `single`, `round-robin` |
| `--hash-columns` | `0` | Comma-separated column indices to hash on (e.g. `0,3`) |
| `--codec` | `lz4` | Compression codec: `none`, `lz4`, `zstd`, `snappy` |
| `--zstd-level` | `1` | Zstd compression level (1–22) |
| `--batch-size` | `8192` | Batch size for reading Parquet data |
| `--memory-limit` | _(none)_ | Memory limit in bytes; triggers spilling when exceeded |
| `--write-buffer-size` | `1048576` | Write buffer size in bytes |
| `--limit` | `0` | Limit rows processed per iteration (0 = no limit) |
| `--iterations` | `1` | Number of timed iterations |
| `--warmup` | `0` | Number of warmup iterations before timing |
| `--output-dir` | `/tmp/comet_shuffle_bench` | Directory for temporary shuffle output files |

### Profiling with flamegraph

```sh
cargo flamegraph --release --features shuffle-bench --bin shuffle_bench -- \
--input /data/tpch-sf100/lineitem/ \
--partitions 200 --codec lz4
```
Loading