Skip to content

Fix Iceberg streaming resume, deletion detection, and no_action handling#284

Closed
tlee732 wants to merge 8 commits into
indextables:mainfrom
tlee732:fix/iceberg-streaming-and-deletion
Closed

Fix Iceberg streaming resume, deletion detection, and no_action handling#284
tlee732 wants to merge 8 commits into
indextables:mainfrom
tlee732:fix/iceberg-streaming-and-deletion

Conversation

@tlee732
Copy link
Copy Markdown
Contributor

@tlee732 tlee732 commented Apr 1, 2026

Summary

  • Separate streaming resume from SQL fields — New lastSyncedVersion field on SyncToExternalCommand keeps streaming incremental resume independent from fromVersion (Delta SQL) and fromSnapshot (Iceberg SQL). Streaming resume is now format-agnostic; format-specific routing happens at the scanner call site.
  • Incremental deletion detection — Enhanced manifest set-difference in DistributedSourceScanner reads entries from both old-only and new-only manifests, computing file-level adds AND deletes. Previously only detected new files, missing deletions entirely.
  • Invalidation-only commit path — When files are deleted from source but nothing new needs indexing, remove actions are now committed directly instead of being silently dropped by the empty batch loop.
  • Auto-resume for one-shot syncs — Non-streaming re-syncs now read lastSyncedVersion from the companion transaction log to enable incremental fast path and correct no_action detection.

Supersedes #263 and #264. Fixes #267, #268, #270.

Test plan

export JAVA_HOME=/opt/homebrew/opt/openjdk@11

# Iceberg streaming E2E (incremental resume + FROM SNAPSHOT + invalidation-only):
mvn test-compile scalatest:test -DwildcardSuites='io.indextables.spark.sql.StreamingCompanionIcebergEndToEndTest'

# Iceberg incremental scanner (no-op, append, delete detection):
mvn test-compile scalatest:test -DwildcardSuites='io.indextables.spark.sync.DistributedSourceScannerTest'

# FROM SNAPSHOT time-travel:
mvn test-compile scalatest:test -DwildcardSuites='io.indextables.spark.sync.IcebergMiscFeaturesTest'

# SQL parsing (FROM SNAPSHOT + WITH STREAMING):
mvn test-compile scalatest:test -DwildcardSuites='io.indextables.spark.sql.SyncToExternalParsingTest'

# Delta streaming (unchanged behavior):
mvn test-compile scalatest:test -DwildcardSuites='io.indextables.spark.sql.StreamingCompanionEndToEndTest'

All tests pass (8 new tests added).

🤖 Generated with Claude Code

tlee732 and others added 8 commits March 26, 2026 15:57
Format-agnostic tests for 6 companion operations:
- AggregatePushdownCorrectness: 11 tests — COUNT/SUM/AVG/MIN/MAX, GROUP BY (exact values)
- BucketAggregation: 9 tests — Histogram/DateHistogram/Range with per-bucket verification
- DropPartitions: 9 tests — equality/range/compound predicates, cumulative drops, re-sync
- MergeSplits: 9 tests — companion field preservation, data/agg/filter correctness post-merge
- Purge: 9 tests — orphan deletion, retention, DRY RUN, data integrity
- TruncateTimeTravel: 9 tests — version cleanup, checkpoint creation, metadata preservation

3 tests intentionally fail, catching real bugs:
- Merged Iceberg companion splits fail file:// parquet path resolution
- Iceberg sync returns "success" instead of "no_action" for unchanged snapshots
- Iceberg file deletion not detected by companion anti-join

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The native Rust layer received file:// URIs as parquetTableRoot for local
Iceberg companions but only supports bare filesystem paths or cloud URIs.

- ConfigUtils.normalizeStorageUrl(): strip file:// to bare paths
- SyncTaskExecutor.extractTableBasePath(): add file:// to scheme pattern
- Fix merge test data layout to use common data directory
- Remove known-bug comments from test files

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Five locations stripped file:// URIs using different approaches (URI.getPath,
substring, stripPrefix), two with bugs: SyncTaskExecutor.downloadFile had
unguarded URI.getPath (NPE risk), LocalCopyDownloader.normalizeLocalPath
produced wrong result for file://hostname/path URIs.

Adds CloudPathUtils.stripFileScheme with URI.getPath + null/exception guards
and substring fallback for malformed URIs (e.g., unescaped spaces). All five
call sites now delegate to this shared utility. Adds unit tests for both
stripFileScheme and extractTableBasePath.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- IcebergRoundTripTest: 6 tests — scalar/Date/Timestamp/Array/Struct/Map/partitioned data fidelity
- IcebergBatchingSyncTest: 4 tests — batchSize/maxConcurrentBatches configuration
- IcebergCompactStringTest: 4 tests — exact_only/text_uuid_exactonly/text_uuid_strip modes
- IcebergPartitionEvolutionTest: 3 tests — unpartitioned→partitioned, add partition, mixed specs
- IcebergMiscFeaturesTest: 8 tests — error handling, write guard, fast field modes, FROM SNAPSHOT

2 tests intentionally fail, catching real bugs:
- Partitioned round-trip: file:// parquet path resolution failure
- FROM SNAPSHOT: returns wrong snapshot ID in result row

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
appendSnapshot wrote each batch's parquet files to separate batch-N/
subdirectories, but extractTableBasePath() derives the storage root from
the first file's path. Files from other batches couldn't be resolved
relative to that root, causing "Failed to create parquet stream builder"
on partitioned tests. Real Iceberg tables store all data files under a
single data/ directory — mirror that layout so the test reflects
production behavior.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
FROM SNAPSHOT was incorrectly implemented as an incremental filter (get
changes after this snapshot) instead of time-travel (build companion from
the state at this snapshot). This caused the wrong files to be synced and
the wrong snapshot ID reported in the result.

Two changes:
- SyncToExternalCommand: pass fromSnapshot as snapshotId (time-travel)
  instead of fromSnapshotId (incremental filter)
- IcebergSourceReader.sourceVersion: return the requested snapshot ID
  when one was explicitly provided, instead of the first file entry's ID

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…hods

Six Iceberg test files each had their own copy of appendSnapshot with
slight variations. Four still used the old direct-write pattern that
breaks on multi-batch tests. Consolidated into IcebergSnapshotHelper
trait in IcebergTestSupport.scala with staging+move to a common data/
directory, optional partitionPath, and explicit schema parameter.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Separates streaming resume from SQL FROM SNAPSHOT/FROM VERSION fields,
adds incremental deletion detection via manifest set-difference, and
fixes invalidation-only sync commits.

Production changes:
- Add lastSyncedVersion field to SyncToExternalCommand for format-agnostic
  streaming resume, keeping fromVersion (Delta SQL) and fromSnapshot
  (Iceberg SQL) as separate concerns
- Enhance DistributedSourceScanner manifest-diff to read both old-only and
  new-only manifests, computing file-level adds AND deletes via set
  difference (previously only detected new files)
- Add invalidation-only commit path for deletion-only syncs where no new
  files need indexing but stale splits must be removed
- Auto-read lastSyncedVersion from companion transaction log for
  non-streaming one-shot re-syncs (fixes indextables#268 no_action)
- Add Parquet format filter to incremental manifest reads
- Handle GC'd old manifests gracefully with fallback to full scan

Fixes indextables#267, indextables#268, indextables#270

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@tlee732 tlee732 closed this Apr 1, 2026
@tlee732 tlee732 deleted the fix/iceberg-streaming-and-deletion branch April 1, 2026 19:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Iceberg scoped invalidation does not detect deleted files

1 participant