Fix Iceberg streaming resume, deletion detection, and no_action handling#284
Closed
tlee732 wants to merge 8 commits into
Closed
Fix Iceberg streaming resume, deletion detection, and no_action handling#284tlee732 wants to merge 8 commits into
tlee732 wants to merge 8 commits into
Conversation
Format-agnostic tests for 6 companion operations: - AggregatePushdownCorrectness: 11 tests — COUNT/SUM/AVG/MIN/MAX, GROUP BY (exact values) - BucketAggregation: 9 tests — Histogram/DateHistogram/Range with per-bucket verification - DropPartitions: 9 tests — equality/range/compound predicates, cumulative drops, re-sync - MergeSplits: 9 tests — companion field preservation, data/agg/filter correctness post-merge - Purge: 9 tests — orphan deletion, retention, DRY RUN, data integrity - TruncateTimeTravel: 9 tests — version cleanup, checkpoint creation, metadata preservation 3 tests intentionally fail, catching real bugs: - Merged Iceberg companion splits fail file:// parquet path resolution - Iceberg sync returns "success" instead of "no_action" for unchanged snapshots - Iceberg file deletion not detected by companion anti-join Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The native Rust layer received file:// URIs as parquetTableRoot for local Iceberg companions but only supports bare filesystem paths or cloud URIs. - ConfigUtils.normalizeStorageUrl(): strip file:// to bare paths - SyncTaskExecutor.extractTableBasePath(): add file:// to scheme pattern - Fix merge test data layout to use common data directory - Remove known-bug comments from test files Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Five locations stripped file:// URIs using different approaches (URI.getPath, substring, stripPrefix), two with bugs: SyncTaskExecutor.downloadFile had unguarded URI.getPath (NPE risk), LocalCopyDownloader.normalizeLocalPath produced wrong result for file://hostname/path URIs. Adds CloudPathUtils.stripFileScheme with URI.getPath + null/exception guards and substring fallback for malformed URIs (e.g., unescaped spaces). All five call sites now delegate to this shared utility. Adds unit tests for both stripFileScheme and extractTableBasePath. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- IcebergRoundTripTest: 6 tests — scalar/Date/Timestamp/Array/Struct/Map/partitioned data fidelity - IcebergBatchingSyncTest: 4 tests — batchSize/maxConcurrentBatches configuration - IcebergCompactStringTest: 4 tests — exact_only/text_uuid_exactonly/text_uuid_strip modes - IcebergPartitionEvolutionTest: 3 tests — unpartitioned→partitioned, add partition, mixed specs - IcebergMiscFeaturesTest: 8 tests — error handling, write guard, fast field modes, FROM SNAPSHOT 2 tests intentionally fail, catching real bugs: - Partitioned round-trip: file:// parquet path resolution failure - FROM SNAPSHOT: returns wrong snapshot ID in result row Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
appendSnapshot wrote each batch's parquet files to separate batch-N/ subdirectories, but extractTableBasePath() derives the storage root from the first file's path. Files from other batches couldn't be resolved relative to that root, causing "Failed to create parquet stream builder" on partitioned tests. Real Iceberg tables store all data files under a single data/ directory — mirror that layout so the test reflects production behavior. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
FROM SNAPSHOT was incorrectly implemented as an incremental filter (get changes after this snapshot) instead of time-travel (build companion from the state at this snapshot). This caused the wrong files to be synced and the wrong snapshot ID reported in the result. Two changes: - SyncToExternalCommand: pass fromSnapshot as snapshotId (time-travel) instead of fromSnapshotId (incremental filter) - IcebergSourceReader.sourceVersion: return the requested snapshot ID when one was explicitly provided, instead of the first file entry's ID Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…hods Six Iceberg test files each had their own copy of appendSnapshot with slight variations. Four still used the old direct-write pattern that breaks on multi-batch tests. Consolidated into IcebergSnapshotHelper trait in IcebergTestSupport.scala with staging+move to a common data/ directory, optional partitionPath, and explicit schema parameter. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Separates streaming resume from SQL FROM SNAPSHOT/FROM VERSION fields, adds incremental deletion detection via manifest set-difference, and fixes invalidation-only sync commits. Production changes: - Add lastSyncedVersion field to SyncToExternalCommand for format-agnostic streaming resume, keeping fromVersion (Delta SQL) and fromSnapshot (Iceberg SQL) as separate concerns - Enhance DistributedSourceScanner manifest-diff to read both old-only and new-only manifests, computing file-level adds AND deletes via set difference (previously only detected new files) - Add invalidation-only commit path for deletion-only syncs where no new files need indexing but stale splits must be removed - Auto-read lastSyncedVersion from companion transaction log for non-streaming one-shot re-syncs (fixes indextables#268 no_action) - Add Parquet format filter to incremental manifest reads - Handle GC'd old manifests gracefully with fallback to full scan Fixes indextables#267, indextables#268, indextables#270 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
lastSyncedVersionfield onSyncToExternalCommandkeeps streaming incremental resume independent fromfromVersion(Delta SQL) andfromSnapshot(Iceberg SQL). Streaming resume is now format-agnostic; format-specific routing happens at the scanner call site.DistributedSourceScannerreads entries from both old-only and new-only manifests, computing file-level adds AND deletes. Previously only detected new files, missing deletions entirely.lastSyncedVersionfrom the companion transaction log to enable incremental fast path and correctno_actiondetection.Supersedes #263 and #264. Fixes #267, #268, #270.
Test plan
All tests pass (8 new tests added).
🤖 Generated with Claude Code