Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
cb7cb51
docs: design spec for one IPC stream per partition shuffle format
andygrove Apr 2, 2026
8b05f48
docs: add validation skip requirement to shuffle stream reader spec
andygrove Apr 2, 2026
018f97d
docs: implementation plan for IPC stream per partition shuffle format
andygrove Apr 2, 2026
bc5c1b3
feat: enable Arrow IPC compression feature for shuffle format
andygrove Apr 2, 2026
afc2ef7
feat: replace custom shuffle block format with Arrow IPC streams
andygrove Apr 2, 2026
c3b354c
feat: add JniInputStream and ShuffleStreamReader for shuffle read path
andygrove Apr 2, 2026
3931952
feat: update JVM read side to use streaming shuffle decode API
andygrove Apr 2, 2026
a48791f
fix: resolve clippy warnings and update shuffle_scan tests for new IP…
andygrove Apr 2, 2026
fc1839c
feat: update ShuffleScanExec to use ShuffleStreamReader for Arrow IPC…
andygrove Apr 2, 2026
24056ad
fix: apply spotless formatting
andygrove Apr 2, 2026
f186d7e
fix: handle empty streams and concatenated IPC streams in shuffle reader
andygrove Apr 2, 2026
924f53f
chore: remove unrelated files accidentally committed
andygrove Apr 2, 2026
36c43e5
refactor: clean up shuffle format migration dead code and review find…
andygrove Apr 2, 2026
7e3a24e
format
andygrove Apr 2, 2026
8dbbe59
fix: add write_time metric to SinglePartitionShufflePartitioner
andygrove Apr 8, 2026
a56db2e
fix: adapt IPC stream shuffle code for jni 0.22 and arrow 58.1
andygrove Apr 8, 2026
7b45660
chore: apply rustfmt formatting
andygrove Apr 8, 2026
c7f2e74
fix: skip IPC stream for empty shuffle partitions and ignore Miri test
andygrove Apr 8, 2026
d30780b
feat: wrap shuffle output files in BufWriter and reduce default buffe…
andygrove Apr 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,11 @@ object CometConf extends ShimCometConf {
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.category(CATEGORY_SHUFFLE)
.doc(
"The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and " +
"snappy are supported. Compression can be disabled by setting " +
"The codec of Comet native shuffle used to compress shuffle data. " +
"Supported codecs: lz4, zstd. Compression can be disabled by setting " +
"spark.shuffle.compress=false.")
.stringConf
.checkValues(Set("zstd", "lz4", "snappy"))
.checkValues(Set("zstd", "lz4"))
.createWithDefault("lz4")

val COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL: ConfigEntry[Int] =
Expand Down Expand Up @@ -528,11 +528,10 @@ object CometConf extends ShimCometConf {
.category(CATEGORY_SHUFFLE)
.doc("Size of the write buffer in bytes used by the native shuffle writer when writing " +
"shuffle data to disk. Larger values may improve write performance by reducing " +
"the number of system calls, but will use more memory. " +
"The default is 1MB which provides a good balance between performance and memory usage.")
.bytesConf(ByteUnit.MiB)
"the number of system calls, but will use more memory.")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0, "Write buffer size must be positive")
.createWithDefault(1)
.createWithDefault(8)

val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf(
"spark.comet.shuffle.preferDictionary.ratio")
Expand Down
Loading
Loading