Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
d170c04
feat: Add Grace Hash Join operator with spill-to-disk support
andygrove Feb 21, 2026
3a77b6c
Add join microbenchmark comparing Sort Merge, Hash, and Grace Hash joins
andygrove Feb 21, 2026
f01a852
chore: Apply formatting (cargo fmt + spotless)
andygrove Feb 21, 2026
02809ec
feat: Remove join type restrictions in RewriteJoin when grace hash jo…
andygrove Feb 21, 2026
05ab761
Revert join type restriction removal in RewriteJoin
andygrove Feb 21, 2026
af71b50
perf: Optimize Grace Hash Join partitioning with prefix-sum algorithm
andygrove Feb 21, 2026
60400c2
perf: Optimize Grace Hash Join with take() kernel and whole-batch pas…
andygrove Feb 21, 2026
d6ab6b0
fix: Improve Grace Hash Join repartition heuristic to prevent OOM
andygrove Feb 21, 2026
3e5c2d9
fix: Track probe-side memory in Grace Hash Join and spill on pressure
andygrove Feb 21, 2026
c4a662f
fix: Spill probe-side partitions aggressively to prevent OOM
andygrove Feb 21, 2026
f521b41
fix: Spill ALL partitions on first probe-side memory pressure
andygrove Feb 21, 2026
d3bc4bf
fix: Stream spilled probe data during join phase to prevent OOM
andygrove Feb 21, 2026
5749096
perf: Skip probe partitioning when build side is small in Grace Hash …
andygrove Feb 21, 2026
572eabf
Revert "perf: Skip probe partitioning when build side is small in Gra…
andygrove Feb 22, 2026
92d3818
perf: Increase spill I/O buffer size from 8KB to 1MB in Grace Hash Join
andygrove Feb 22, 2026
41a0333
perf: Parallelize spill file reads with async I/O in Grace Hash Join
andygrove Feb 22, 2026
5f40ba0
fix: Keep spill file handle alive in spawn_blocking reader
andygrove Feb 22, 2026
23d3d6f
perf: Coalesce small spill batches into 8192-row chunks before joining
andygrove Feb 22, 2026
b6bc859
perf: Parallelize join phase across cores with tokio::spawn per parti…
andygrove Feb 22, 2026
ffa4104
perf: Add fast-path streaming join when build side is small
andygrove Feb 22, 2026
fa8e471
fix: Shrink reservation before fast-path HashJoinExec to avoid double…
andygrove Feb 22, 2026
5580296
docs: Move Grace Hash Join design doc to contributor guide
andygrove Feb 22, 2026
54350c7
add doc
andygrove Feb 22, 2026
e0e819e
docs: Add ASF license header to Grace Hash Join design doc
andygrove Feb 22, 2026
7834ae3
perf: Parallelize Phase 3 build-side reads and join execution in GHJ
andygrove Feb 22, 2026
b687402
fix: Restore fast-path reservation shrink and limit Phase 3 concurrency
andygrove Feb 22, 2026
069a799
fix: Remove internal parallelism from GHJ Phase 3
andygrove Feb 22, 2026
1b7ef63
fix: Keep GHJ reservation alive for FairSpillPool headroom
andygrove Feb 22, 2026
bb4a08b
fix: Free GHJ reservation before Phase 3 to avoid double-counting
andygrove Feb 22, 2026
2653645
fix: Use actual batch sizes for fast-path memory check
andygrove Feb 22, 2026
40df743
fix: Remove fast path that creates massive non-spillable hash tables
andygrove Feb 22, 2026
6727829
docs: Update GHJ design doc to explain fast path removal
andygrove Feb 22, 2026
8222a94
perf: Add bounded parallelism (3 concurrent) to Phase 3 partition joins
andygrove Feb 22, 2026
c4dbb7b
perf: Restore fast path for tiny build sides (< 10 MB actual)
andygrove Feb 22, 2026
6d1ec99
feat: Make GHJ fast path threshold configurable
andygrove Feb 22, 2026
50b6161
fix: Divide GHJ fast path threshold by executor cores
andygrove Feb 22, 2026
9b1902b
perf: Enable LZ4 compression for GHJ spill files
andygrove Feb 22, 2026
ac13fb3
perf: Coalesce tiny sub-batches before partition joins
andygrove Feb 22, 2026
0eb8c24
perf: Merge adjacent GHJ partitions to reduce HashJoinExec calls
andygrove Feb 22, 2026
0ed953d
fix: Use spill writer bytes for merge size calculation
andygrove Feb 22, 2026
d4641ab
fix: Force repartitioning when build side exceeds 32 MB target
andygrove Feb 22, 2026
2c38c20
debug: Add logging to all HashJoinExec creation paths + add memory-co…
andygrove Feb 22, 2026
9bf90c1
debug: Add logging to planner fallback HashJoinExec path
andygrove Feb 22, 2026
03d31e4
debug: Add plan logging to all GHJ HashJoinExec creation paths
andygrove Feb 22, 2026
eea613d
debug: Add row emission logging to detect exploding joins
andygrove Feb 22, 2026
e9b90a3
debug: Add GHJ instance IDs and pool state tracing
andygrove Feb 22, 2026
b8d0e1d
fix: Prevent batch splitting in GHJ Phase 3 to fix phantom OOM
andygrove Feb 23, 2026
bef66d6
fix: Use actual row count for batch_size instead of usize::MAX
andygrove Feb 23, 2026
024c6f2
fix: Use StreamSourceExec for build side to avoid Arrow i32 offset ov…
andygrove Feb 23, 2026
88a15b5
style: Run cargo fmt
andygrove Feb 23, 2026
534b39f
style: Run prettier on markdown files
andygrove Feb 23, 2026
cba9146
fix: Use StreamSourceExec for probe side too to avoid splitting overhead
andygrove Feb 23, 2026
48f1712
fix: Use larger output batch_size for HashJoinExec to avoid per-batch…
andygrove Feb 23, 2026
f707825
fix: Remove concat_batches and use usize::MAX output batch_size
andygrove Feb 23, 2026
e8d8b59
fix: Use 10M output batch_size instead of usize::MAX
andygrove Feb 23, 2026
906e963
fix: Reduce GHJ output batch_size from 10M to 1M to prevent OOM
andygrove Feb 23, 2026
cf2f920
fix: Restore concat_batches in per-partition paths for performance
andygrove Feb 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,39 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("localTableScan", defaultValue = false)

val COMET_EXEC_GRACE_HASH_JOIN_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.graceHashJoin.enabled")
.category(CATEGORY_EXEC)
.doc(
"Whether to enable Grace Hash Join. When enabled, Comet will use a Grace Hash Join " +
"operator that partitions both sides into buckets and can spill to disk when memory " +
"is tight. Supports all join types. This is an experimental feature.")
.booleanConf
.createWithDefault(false)

val COMET_EXEC_GRACE_HASH_JOIN_NUM_PARTITIONS: ConfigEntry[Int] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.graceHashJoin.numPartitions")
.category(CATEGORY_EXEC)
.doc("The number of partitions (buckets) to use for Grace Hash Join. A higher number " +
"reduces the size of each partition but increases overhead.")
.intConf
.checkValue(v => v > 0, "The number of partitions must be positive.")
.createWithDefault(16)

val COMET_EXEC_GRACE_HASH_JOIN_FAST_PATH_THRESHOLD: ConfigEntry[Int] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.graceHashJoin.fastPathThreshold")
.category(CATEGORY_EXEC)
.doc(
"Total memory budget in bytes for Grace Hash Join fast-path hash tables across " +
"all concurrent tasks. This is divided by spark.executor.cores to get the per-task " +
"threshold. When a build side fits in memory and is smaller than the per-task " +
"threshold, the join executes as a single HashJoinExec without spilling. " +
"Set to 0 to disable the fast path. Larger values risk OOM because HashJoinExec " +
"creates non-spillable hash tables.")
.intConf
.checkValue(v => v >= 0, "The fast path threshold must be non-negative.")
.createWithDefault(10 * 1024 * 1024) // 10 MB

val COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.columnarToRow.native.enabled")
.category(CATEGORY_EXEC)
Expand Down
333 changes: 333 additions & 0 deletions docs/source/contributor-guide/grace-hash-join-design.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ edition = "2021"
rust-version = "1.88"

[workspace.dependencies]
arrow = { version = "57.3.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow = { version = "57.3.0", features = ["prettyprint", "ffi", "chrono-tz", "ipc_compression"] }
async-trait = { version = "0.1" }
bytes = { version = "1.11.1" }
parquet = { version = "57.2.0", default-features = false, features = ["experimental"] }
Expand Down
1 change: 1 addition & 0 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ include = [
publish = false

[dependencies]
ahash = "0.8"
arrow = { workspace = true }
parquet = { workspace = true, default-features = false, features = ["experimental", "arrow"] }
futures = { workspace = true }
Expand Down
6 changes: 5 additions & 1 deletion native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ struct ExecutionContext {
pub memory_pool_config: MemoryPoolConfig,
/// Whether to log memory usage on each call to execute_plan
pub tracing_enabled: bool,
/// Spark configuration map for comet-specific settings
pub spark_conf: HashMap<String, String>,
}

/// Accept serialized query plan and return the address of the native query plan.
Expand Down Expand Up @@ -320,6 +322,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
explain_native,
memory_pool_config,
tracing_enabled,
spark_conf: spark_config,
});

Ok(Box::into_raw(exec_context) as i64)
Expand Down Expand Up @@ -531,7 +534,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
let start = Instant::now();
let planner =
PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx), partition)
.with_exec_id(exec_context_id);
.with_exec_id(exec_context_id)
.with_spark_conf(exec_context.spark_conf.clone());
let (scans, root_op) = planner.create_plan(
&exec_context.spark_plan,
&mut exec_context.input_sources.clone(),
Expand Down
Loading
Loading