Skip to content

feat: enable CometLocalTableScanExec by default#4393

Draft
mbutrovich wants to merge 24 commits into
apache:mainfrom
mbutrovich:enable_localtablescan
Draft

feat: enable CometLocalTableScanExec by default#4393
mbutrovich wants to merge 24 commits into
apache:mainfrom
mbutrovich:enable_localtablescan

Conversation

@mbutrovich
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #4347.

Rationale for this change

Currently CometLocalTableScanExec is disabled by default. A lot of Spark SQL tests (UDFs, expressions) don't write their input to sources that Comet reads natively (e.g., Parquet, Iceberg) so they are likely not being exercised through Comet.

What changes are included in this PR?

Enable localTableScan translation to Comet by default.

How are these changes tested?

Existing tests.

@mbutrovich mbutrovich self-assigned this May 22, 2026
@mbutrovich mbutrovich marked this pull request as draft May 22, 2026 11:18
@mbutrovich
Copy link
Copy Markdown
Contributor Author

Failure counts

Log archive Job set Failures Notes
logs_70206724473 Comet (Linux, Spark 3.4–4.2) 5 All same test
logs_70206740518 Comet (macOS, Spark 4.0–4.2) 3 Same test; no platform-specific issue
logs_70206737291 Upstream Spark SQL (3.4.3–4.1.1) ~691 reported + 5 jobs hit 6h timeout/OOM True total likely 800–1000
logs_70206724472 Iceberg integration (Spark 3.4/3.5 × Iceberg 1.8/1.9/1.10) 220 Silent assertion mismatches; no Comet stack frames

Clean: catalyst modules, all iceberg-spark-runtime runs, TPC-DS, TPC-H, Rust tests, lint, native builds, sql_hive-2.

Root-cause buckets (ranked by blast radius)

B1 — NullType rejected by Utils.toArrowType (~600 failures, ~75–85% of Spark SQL)

Stack identical across occurrences:

java.lang.UnsupportedOperationException: Unsupported data type: NullType
  at org.apache.spark.sql.comet.util.Utils$.toArrowType(Utils.scala:155)
  at CometArrowConverters$ArrowBatchIterBase.<init>(54)
  at CometLocalTableScanExec.doExecuteColumnar(78)

Triggers any time Seq(...).toDF / values (..., null) yields a void column, including nested MapType(_, NullType) (e.g. DatasetPrimitiveSuite "special floating point values").

Fix options:

  • (a) Reject schemas containing NullType (including nested) in the rule that builds CometLocalTableScanExec, so plan falls back. Smallest blast radius.
  • (b) Wire NullType → Arrow Null in Utils.toArrowType (+ corresponding ArrowWriter case).

Refs:

  • spark/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala:155
  • spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/CometArrowConverters.scala:54
  • spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala:66-81

B2 — TimeType rejected (~25 failures, Spark 4.1.x only)

SparkException: not support type: TimeType(6)
  at ArrowWriter$.createFieldWriter(ArrowWriters.scala:89)

Hits Spark 4.1 TIME-type tests (SPARK-51402, SPARK-52883, SPARK-53929, SPARK-53109, SPARK-53107, SPARK-53108, SPARK-52626, SPARK-52660, etc.). Same fix shape as B1; fallback is the immediate move since the rest of Comet does not yet support TIME.

Refs:

  • spark/src/main/scala/org/apache/spark/sql/comet/execution/arrow/ArrowWriters.scala:89

B3 — Nested-type nullability mismatch (~20 Spark SQL + likely root cause of all 220 Iceberg failures)

DataFusion error shape:

Incorrect datatype for StructArray field "nested",
  expected Struct("a": Int32, "b": Int64),
  got      Struct("a": non-null Int32, "b": non-null Int64)

CometArrowConverters derives child nullability from the Spark schema; downstream native operators were planned against schemas with different child-nullability inference. One side has to normalize.

Iceberg connection:

  • Every failing Iceberg test seeds rows via spark.createDataFrame(input, Employee.class) (TestDelete.java:1526) — a LocalRelation that now becomes CometLocalTableScanExec.
  • Failures concentrate on the conjunction branch=test, distributionMode=none, fanout=false, vectorized=true (other parameterizations of the same test methods pass).
  • Symptoms are silent: assertion failures like expected:<199> but was:<200>, [Snapshot property added-data-files has unexpected value], View should have correct data: expected:<2> but was:<0>. No exception, no Comet stack frame.
  • CometLocalTableScanExec.isFfiSafe = false (line 110) due to array reuse in CometArrowConverters. Safe for Comet-native consumers (the native side copies based on the proto flag). Unsafe for non-Comet consumers that buffer batches across next() calls. The Iceberg branch-write path likely buffers, which combined with B3-style nullability mismatches could produce the silent loss.

Investigate first:

  • Run TestCopyOnWriteDelete.testSkewDelete with the failing parameterization locally; dump physical plan and per-batch row counts.
  • Determine whether Iceberg's DSv2 columnar branch write honors isFfiSafe=false, or whether the gap is purely nullability.

Refs:

  • spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala:48-81,110
  • spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala:44,68
  • Iceberg spark-extensions TestDelete.append(String, Employee...) (TestDelete.java:1520-1528) and SparkRowLevelOperationsTestBase.createAndInitTable / append / commitTarget (SparkRowLevelOperationsTestBase.java:282,294,433)

B4 — Stale CometWindowExecSuite assertion (8 failures: Comet's own suites, all platforms)

Test: window function: partition and order expressions, all 5 Spark versions on Linux + 3 on macOS. Inverted assertion:

// CometWindowExecSuite.scala:111-119
} else {
  // we fall back to Spark for shuffle because we do not support
  // native shuffle with a LocalTableScan input, and we do not fall
  // back to Comet columnar shuffle due to
  // https://github.com/apache/datafusion-comet/issues/1248
  assert(cometShuffles.isEmpty)
}

The premise is no longer true; native shuffle now composes with the new local scan.

Action: update the assertion to cometShuffles.length == 1, drop the comment, re-evaluate whether #1248 is still relevant. Grep for siblings: "LocalTableScan input", "issues/1248", "fall back to Spark for shuffle".

Refs:

  • spark/src/test/scala/org/apache/comet/exec/CometWindowExecSuite.scala:108-119

B5 — Plan-shape / explain / metrics assertions in upstream Spark SQL (~40 failures)

Tests grep the physical plan for Spark-only nodes or check WholeStageCodegen / SQL metrics that Comet doesn't emit. Examples:

  • ExplainSuite "Support ExplainMode in Dataset.explain"
  • SQLAppStatusListenerSuite "SPARK-29894 test Codegen Stage Id", "SPARK-32615,SPARK-33016"
  • BatchEvalPythonExecSuite "Python UDF: push down deterministic FilterExec predicates"
  • JoinHintSuite "broadcast", "shuffle-replicate-nl"
  • DataFrameSetOperationsSuite SPARK-37371
  • OptimizeLocalRelationsSuite SPARK-25860 / SPARK-33847

Not Comet defects. Either extend the upstream-test skip list or add per-test config disable for localTableScan.

B6 — Subquery not found when plan root is CometLocalTableScan (~8 failures)

CometRuntimeException: Subquery NNN not found for plan MMM

Hits CTEHintSuite "subquery in repartition", SPARK-36447, CTE Predicate push-down and column pruning. Real Comet bug — subquery registration path doesn't handle the new root operator.

B7 — Long tail (~20 failures total)

  • ULP-level float math diffs in MathFunctionsSuite (asinh, acosh, cosh, tan, cot, cbrt, pow, atan2, etc.). Known JVM-libm vs Rust-libm gap; no Comet carve-out for upstream suite.
  • bit_length / octet_length on BinaryType rejected by DataFusion's string-only UDF (SPARK-36751).
  • null IN () returns false in Comet, should be null (EmptyInSuite).
  • RuntimeNullChecksV2Writes expects SparkRuntimeException, gets SparkException (wrapper depth).
  • to_binary/unhex error class differs from Spark's CONVERSION_INVALID_INPUT.
  • collect_set element order differs.
  • 5 Spark SQL shards hit GitHub Actions 6h wall clock / exit 137 — likely cascade from B1 stack-trace volume; should resolve once B1 is fixed.

Suggested fix order

Order Action Eliminates Risk
1 B1 fallback (NullType in schema) ~600 + likely unblocks the 5 timed-out shards Low
2 B2 fallback (TimeType in schema) ~25 Low
3 B3 investigation + fix (Iceberg silent corruption) 220 + ~20 Spark SQL Medium (data correctness)
4 B4 stale assertion update + sibling grep 8 Trivial
5 B6 subquery registration ~8 Low-medium
6 B5 skip list (or per-test config) ~40 Low
7 B7 individually ~20 Varies

After (1)–(2), failure count should drop from ~1,200 to ~250 and most remaining failures will be the substantive ones.

Open questions

  • B3: does Iceberg's columnar DSv2 branch-write path honor setArrowFfiSafe(false), or does the contract only apply to native consumers? If only native, CometLocalTableScanExec needs to copy batches before exposing them to non-Comet consumers (or be disallowed as a direct columnar input to non-Comet writers).
  • B5: skip list vs per-test disable — preference? A skip list is easier to maintain; per-test config keeps Comet exercised.
  • Whether to keep the default flip in PR feat: enable CometLocalTableScanExec by default #4393 and land fixes incrementally, or to gate the flip behind B1–B3 landing first.

@mbutrovich
Copy link
Copy Markdown
Contributor Author

mbutrovich commented May 27, 2026

Branch status

JVM and native input now use the canonical Arrow C Stream Interface: Data.exportArrayStream on the JVM, ArrowArrayStreamReader on the native side. The bespoke CometBatchIterator JNI machinery and the arrow_ffi_safe flag are gone. Closes #3770. Net diff vs main: ~1070 insertions, ~660 deletions across 35 files.

JVM-side input is unified behind three ArrowReader subclasses, replacing the per-shape conversion paths that funneled into CometBatchIterator:

  • RowArrowReader for InternalRow input
  • SparkColumnarArrowReader for non-Comet ColumnarBatch input
  • ColumnarBatchArrowReader for Comet ColumnarBatch input

CometExecRDD / CometExecIterator / operators.scala route input slots from the protobuf (findShuffleScanIndices) instead of a conf flag, so JVM and native agree on which slot is ShuffleScan vs Scan. Allocator/reader/stream lifecycle is bound to TaskContext with rollback on partial-setup failure.

Native side: ScanExec.input_source: Option<Arc<Mutex<AlignedArrowStreamReader>>>. AlignedArrowStreamReader is a thin Comet-side replacement for arrow::ffi_stream::ArrowArrayStreamReader that calls ArrayData::align_buffers() between FFI import and typed-array construction (rationale below). The arrow_ffi_safe-gated deep-copy branch is removed. copy_or_unpack_array is preserved as the boundary that strips dictionary encoding before downstream DataFusion operators see it.

Bucket status

Bucket Status
B1 NullType end-to-end Landed
B2 TimeType fallback via DataTypeSupport Landed
B3b Iceberg silent corruption Resolved (consumer-owned reader lifecycle)
B4 Stale window-suite assertion Landed
B6 Subquery under CometLocalTableScan Expected transitive on B1, pending CI
B3a Nested-type nullability residual Deferred
B5 Plan-shape / explain / metrics Deferred, needs skip-list discussion
B7 Long tail Deferred, individual fixes

Recent fixes since the last CI sweep

Dictionary-encoded ColumnarBatch input

Native HashAggregate's row converter emits Dictionary<Int32, T> columns for string group keys. CometColumnarShuffle round-trips them, so the next stage's input is a CometDictionaryVector. ColumnarBatchArrowReader builds its stable VSR from the logical (non-dict) Spark schema; the unload/load step then trips inside VectorLoader.loadBuffers with no more buffers for field ...: Utf8. Expected 3 before any data reaches the C Stream. Fix: decode dictionary source columns via DictionaryEncoder.decode before VectorUnloader. Native still unpacks downstream via copy_or_unpack_array, so end-to-end semantics are unchanged. Caught by CometAggregateSuite "multiple column distinct count".

Shaded Arrow C Data Interface IllegalAccessError

Data.exportArrayStream (Data.java:231) does new ArrayStreamExporter(...), a package-private constructor. ArrayStreamExporter constructs a package-private inner ExportedArrayStreamPrivateData that Arrow's JNI looks up by literal classname (jni_wrapper.cc:341). Shading any subset of these splits package-private access across the shaded and original packages and surfaces as IllegalAccessError. Fix: exclude the entire org.apache.arrow.c.** from relocation. Spark does not ship arrow-c-data, so no classpath clash is possible. The other Arrow modules (vector, memory, format) are still shaded as before. Caught by ParquetEncryptionITCase and Iceberg TestRoundTrip.

Decimal128 alignment panic on JVM-Arrow imports

CometFuzzTestSuite "join (jvm shuffle, nativeC2R=true)" panics with Memory pointer from external source (e.g, FFI) is not aligned with the specified scalar type when a batch contains array<decimal(10,2)>. Since Rust 1.77 / LLVM 18, align_of::<i128>() == 16 on every platform we run on, so arrow-rs's ScalarBuffer::<i128>::from(Buffer) panics on Decimal128 buffers landing on offsets that are 8-byte but not 16-byte aligned. The Arrow C Data Interface only recommends 8-byte alignment, and arrow-java's VectorUnloader and NettyAllocationManager only guarantee 8-byte. The producer is spec-conformant, the consumer-side check is unilateral. Stock arrow::ffi_stream::ArrowArrayStreamReader::next does not call ArrayData::align_buffers() before constructing typed arrays, so it panics before the caller ever sees a RecordBatch. The old bespoke import path called align_buffers() explicitly in from_spark; the canonical reader switch dropped that step.

Comet-side fix: AlignedArrowStreamReader, a 70-line replacement that drives FFI_ArrowArrayStream::get_next directly, runs from_ffi_and_data_type, calls align_buffers(), then builds the RecordBatch. Once apache/arrow-rs#10030 lands (calls align_buffers() inside from_ffi itself, mirroring the IPC reader's default and matching the existing arrow-pyarrow workaround), AlignedArrowStreamReader can be deleted in favor of ArrowArrayStreamReader. Tracking issue: apache/arrow-rs#10028.

In-flight

Latest commit pushed; CI sweep running. Will update once it settles.

Deferred (won't block this PR)

  • B3a, nested struct nullability residual. Reproducer: DataFrameSetOperationsSuite "SPARK-35756: unionByName support struct having same col names but different sequence". unionByName reorders the right side via named_struct(...) (children become nullable); the left passes through (children stay non-null); DataFusion strict-validates Union/Project schemas. Only reproduces under the spark.plugins=...CometPlugin path; no CometTestBase reproducer yet.
  • B5, plan-shape / explain / metrics assertions in upstream Spark SQL. ~40 tests grep for Spark-only nodes (WholeStageCodegen, FilterExec, BroadcastHashJoinExec) or codegen-stage IDs Comet operators don't emit. Needs an upstream-test skip list or per-test spark.comet.exec.localTableScan.enabled=false.
  • B7, long tail. ULP-level float math (asinh / acosh / cosh / tan / cot / cbrt / pow / atan2), bit_length/octet_length on BinaryType, null IN () returning false instead of null, RuntimeNullChecksV2Writes error-class wrapper depth, to_binary/unhex error class, collect_set ordering. Each is a separate fix.

Splittable into separate PRs

Once CI is clean these can peel off independently from the localTableScan default flip:

  1. NullType end-to-end (B1): Utils.scala, CometShuffleExchangeExec, native/shuffle/src/spark_unsafe/row.rs, three test suites.
  2. TimeType fallback via DataTypeSupport (B2): self-contained type-check addition on CometLocalTableScanExec.
  3. Stale window-suite assertion fix (B4): one-line test fix.
  4. Arrow C Stream Interface input + close of Remove mutable buffer use from CometArrowConverters #3770: the bulk of this PR. Doesn't strictly require the default flip but unblocks it.
  5. Default flip for localTableScan.enabled = true: lands last, after B3a residuals are resolved or accepted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Enable spark.comet.exec.localTableScan.enabled when running Spark SQL tests

1 participant