Fix Iceberg streaming resume, deletion detection, and no_action handling#286
Fix Iceberg streaming resume, deletion detection, and no_action handling#286tlee732 wants to merge 18 commits into
Conversation
Format-agnostic tests for 6 companion operations: - AggregatePushdownCorrectness: 11 tests — COUNT/SUM/AVG/MIN/MAX, GROUP BY (exact values) - BucketAggregation: 9 tests — Histogram/DateHistogram/Range with per-bucket verification - DropPartitions: 9 tests — equality/range/compound predicates, cumulative drops, re-sync - MergeSplits: 9 tests — companion field preservation, data/agg/filter correctness post-merge - Purge: 9 tests — orphan deletion, retention, DRY RUN, data integrity - TruncateTimeTravel: 9 tests — version cleanup, checkpoint creation, metadata preservation 3 tests intentionally fail, catching real bugs: - Merged Iceberg companion splits fail file:// parquet path resolution - Iceberg sync returns "success" instead of "no_action" for unchanged snapshots - Iceberg file deletion not detected by companion anti-join Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Closes the 5:1 Delta-to-Iceberg test gap for sync operations: - IcebergMutationSyncTest: 10 tests — append/delete/overwrite/mixed ops, partitioned tables - IcebergDistributedSyncTest: 12 tests — WHERE clause (=, IN, !=, range, AND, OR), scoped invalidation - IcebergWhereClauseExtendedTest: 5 tests — compound AND, OR, strict >, BETWEEN, range filters - IcebergSchemaEvolutionSyncTest: 4 tests — add/drop/rename column, new column indexing 7 tests intentionally fail, catching real Iceberg bugs: - Scoped invalidation doesn't detect deleted files within WHERE range - Sync returns "success" instead of "no_action" for unchanged snapshots - Sync after add/drop/rename column returns "error" (schema evolution not handled) - Partitioned delete not detected by anti-join Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The native Rust layer received file:// URIs as parquetTableRoot for local Iceberg companions but only supports bare filesystem paths or cloud URIs. - ConfigUtils.normalizeStorageUrl(): strip file:// to bare paths - SyncTaskExecutor.extractTableBasePath(): add file:// to scheme pattern - Fix merge test data layout to use common data directory - Remove known-bug comments from test files Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Schema evolution support doesn't exist yet (indextables#269), so these tests shouldn't be part of this PR. They can be reintroduced alongside the feature implementation. Addresses reviewer feedback from @schenksj. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Five locations stripped file:// URIs using different approaches (URI.getPath, substring, stripPrefix), two with bugs: SyncTaskExecutor.downloadFile had unguarded URI.getPath (NPE risk), LocalCopyDownloader.normalizeLocalPath produced wrong result for file://hostname/path URIs. Adds CloudPathUtils.stripFileScheme with URI.getPath + null/exception guards and substring fallback for malformed URIs (e.g., unescaped spaces). All five call sites now delegate to this shared utility. Adds unit tests for both stripFileScheme and extractTableBasePath. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- IcebergRoundTripTest: 6 tests — scalar/Date/Timestamp/Array/Struct/Map/partitioned data fidelity - IcebergBatchingSyncTest: 4 tests — batchSize/maxConcurrentBatches configuration - IcebergCompactStringTest: 4 tests — exact_only/text_uuid_exactonly/text_uuid_strip modes - IcebergPartitionEvolutionTest: 3 tests — unpartitioned→partitioned, add partition, mixed specs - IcebergMiscFeaturesTest: 8 tests — error handling, write guard, fast field modes, FROM SNAPSHOT 2 tests intentionally fail, catching real bugs: - Partitioned round-trip: file:// parquet path resolution failure - FROM SNAPSHOT: returns wrong snapshot ID in result row Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
appendSnapshot wrote each batch's parquet files to separate batch-N/ subdirectories, but extractTableBasePath() derives the storage root from the first file's path. Files from other batches couldn't be resolved relative to that root, causing "Failed to create parquet stream builder" on partitioned tests. Real Iceberg tables store all data files under a single data/ directory — mirror that layout so the test reflects production behavior. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ndextables#268) Three changes: 1. Read lastSyncedVersion from companion metadata for non-streaming Iceberg syncs, enabling the snapshot ID comparison that returns "no_action" when the source hasn't changed. (fixes indextables#268) 2. Replace the incremental manifest-diff path with a full-scan fallback when the Iceberg snapshot has changed. The manifest-diff only detected new files — it never populated removedSourcePaths, so file deletions were invisible. The full-scan anti-join already handles both new and deleted files correctly. (fixes indextables#267) 3. Handle invalidation-only syncs (splits to remove, no new files). Previously, remove actions were only committed during batch processing — if there were no files to index, no batch ran and the stale splits were never removed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
FROM SNAPSHOT was incorrectly implemented as an incremental filter (get changes after this snapshot) instead of time-travel (build companion from the state at this snapshot). This caused the wrong files to be synced and the wrong snapshot ID reported in the result. Two changes: - SyncToExternalCommand: pass fromSnapshot as snapshotId (time-travel) instead of fromSnapshotId (incremental filter) - IcebergSourceReader.sourceVersion: return the requested snapshot ID when one was explicitly provided, instead of the first file entry's ID Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Replace hand-built Row in invalidation-only path with buildResultRow - Hoist val externalStorageRoot before step 8b to eliminate duplicate icebergStorageRoot.orElse(resolvedStorageLocation) expression - Trim verbose comments: effectiveFromSnapshot (4→1 line), remove comment duplicating log message, simplify scanner block comment Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…hods Six Iceberg test files each had their own copy of appendSnapshot with slight variations. Four still used the old direct-write pattern that breaks on multi-batch tests. Consolidated into IcebergSnapshotHelper trait in IcebergTestSupport.scala with staging+move to a common data/ directory, optional partitionPath, and explicit schema parameter. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…/iceberg-streaming-and-deletion # Conflicts: # src/main/scala/io/indextables/spark/sql/SyncToExternalCommand.scala
Separates streaming resume from SQL FROM SNAPSHOT/FROM VERSION fields, adds incremental deletion detection via manifest set-difference, and fixes invalidation-only sync commits. Production changes: - Add lastSyncedVersion field to SyncToExternalCommand for format-agnostic streaming resume, keeping fromVersion (Delta SQL) and fromSnapshot (Iceberg SQL) as separate concerns - Enhance DistributedSourceScanner manifest-diff to read both old-only and new-only manifests, computing file-level adds AND deletes via set difference (previously only detected new files) - Add invalidation-only commit path for deletion-only syncs where no new files need indexing but stale splits must be removed - Auto-read lastSyncedVersion from companion transaction log for non-streaming one-shot re-syncs (fixes indextables#268 no_action) - Add Parquet format filter to incremental manifest reads - Handle GC'd old manifests gracefully with fallback to full scan Fixes indextables#267, indextables#268, indextables#270 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
schenksj
left a comment
There was a problem hiding this comment.
PR #286 Review — Fix Iceberg streaming resume, deletion detection, and no_action handling
Summary
Solid architectural work. The lastSyncedVersion separation cleanly decouples streaming state from SQL clauses, the manifest set-difference deletion detection is the right approach for Iceberg, and the CloudPathUtils.stripFileScheme consolidation removes three duplicate implementations while upgrading to proper java.net.URI parsing with substring fallback.
Issues
1. Duplicate invalidation-only commit path (dead code)
There are two blocks handling the same condition, and the first one makes the second unreachable.
Block 1 (~line 698 in SyncToExternalCommand.scala):
if (parquetFilesToIndex.isEmpty && splitsToInvalidate.nonEmpty) {
...
return Seq(Row(destPath, sourcePath, "success", ..., s"Removed ${splitsToInvalidate.size} invalidated splits (no new files)"))
}Block 2 (~line 737):
if (groups.isEmpty && splitsToInvalidate.nonEmpty) {
transactionLog.invalidateCache()
return buildResultRow(Seq.empty, splitsToInvalidate.size, ...)
}Since groups = planIndexingGroups(parquetFilesToIndex, ...), when parquetFilesToIndex is empty, groups is also empty — Block 2 can never fire. Block 2 is the better implementation (uses buildResultRow, calls invalidateCache()), while Block 1 manually constructs the Row inline and skips invalidateCache(). Remove Block 1.
2. return inside foreach lambda in scanIcebergTable
oldOnlyEntriesOpt.foreach { oldOnlyEntries =>
...
return DistributedScanResult(...) // NonLocalReturnControl
}
// fall through to full scanreturn inside a Scala closure uses NonLocalReturnControl (a Throwable subclass) to unwind to the enclosing method. This works but generates a compiler warning, has allocator overhead on the hot path, and is easy to misread. The fall-through behavior is better expressed as:
oldOnlyEntriesOpt match {
case Some(oldOnlyEntries) =>
...
return DistributedScanResult(...)
case None =>
logger.warn("...falling back to full scan")
// fall through
}Or restructure so the full-scan path is in the None branch rather than relying on fall-through.
Minor
-
The
IcebergSourceReader.sourceVersion()change fromif (entries.isEmpty) snapshotId else ...tosnapshotId.orElse(...)is intentional and correct: constructor-providedsnapshotId(fromFROM SNAPSHOT) now takes precedence over the entry's embedded snapshot ID. Worth a brief comment since the old behavior was the opposite priority order. -
The
maxIncrementalManifeststhreshold increase from 50 → 100 and the threshold semantics change (now counting old+new instead of just new) are both sensible, but the old config key (spark.indextables.companion.sync.iceberg.maxIncrementalManifests) now semantically means "total changed manifests" — users with existing tuning at the old default (50) will see behavior change. Fine for now but worth a release note.
Tests
The new StreamingCompanionIcebergEndToEndTest cases (FROM SNAPSHOT seeding + invalidation-only) directly cover the fixed paths. DistributedSourceScannerTest additions look comprehensive. No concerns.
…rn to match
- Remove duplicate invalidation-only early return that was unreachable
(Block 2 after planIndexingGroups already handles this case with
buildResultRow + invalidateCache)
- Move invalidation-only message into buildResultRow so the single
code path produces the correct result message
- Refactor oldOnlyEntriesOpt.foreach { return } to match/case to
avoid NonLocalReturnControl overhead and compiler warning
- Add comment on IcebergSourceReader.sourceVersion() priority order
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Thanks for the thorough review, Scott! Addressed all points in a36ab52:
All 3 test suites pass (StreamingCompanionIcebergEndToEndTest 4/4, DistributedSourceScannerTest 25/25, IcebergMiscFeaturesTest 8/8). |
schenksj
left a comment
There was a problem hiding this comment.
Code Review — PR #286 (Full Re-review)
Summary
The four main changes are well-motivated and the overall logic is correct. A few issues worth addressing before merge:
Bug: Removed-path normalization reimplements normalizeSourcePath without scheme handling
The new inline normalization in the Iceberg incremental deletion path:
val normalizedRemovedPaths = storageRoot match {
case Some(basePath) =>
val normalizedBase = basePath.stripSuffix("/")
removedPaths.map { p =>
val normalized = p.stripSuffix("/")
if (normalized.startsWith(normalizedBase))
normalized.substring(normalizedBase.length).stripPrefix("/")
else p
}
case None => removedPaths
}This is a reimplementation of DistributedSourceScanner.normalizeSourcePath(rawPath, basePath), which already strips file:// schemes, applies normalizeLocalPath, and handles leading-slash normalization. The inline version does none of that.
Why this matters for the tests: In local tests, extractTableBasePath now includes file:// in its scheme pattern, so storageRoot = "file:///tmp/..." and removedPaths are "file:///tmp/.../file.parquet" — scheme-consistent, so the raw startsWith happens to work. But if Iceberg ever returns paths without the file:// prefix (e.g., after a catalog round-trip), or for any path normalization edge case that normalizeSourcePath already handles, the inline version silently falls through to else p (absolute path), and the removed path won't match any companion split — the deletion is silently dropped.
Fix: use the existing utility:
val normalizedRemovedPaths = storageRoot match {
case Some(basePath) => removedPaths.map(normalizeSourcePath(_, basePath))
case None => removedPaths
}Bug: fromSnapshot leaks into all streaming cycles after the first
The new streaming cycle copy:
case Some(v) =>
copy(lastSyncedVersion = Some(v), streamingPollIntervalMs = None)This preserves fromSnapshot from the original command. For cycles 2+, snapshotId = fromSnapshot = Some(originalSnap1Id) is passed to the scanner. The scanner correctly ignores snapshotId when fromSnapshotId is set (line 681: if (fromSnapshotId.isDefined) IcebergTableReader.getSnapshotInfo(... /* no id, gets current */)) so this doesn't break today. But it's fragile coupling — the correctness depends on that one-line guard in the scanner, with no indication at the call site that fromSnapshot is being silently ignored.
Fix: explicitly clear fromSnapshot when transitioning to lastSyncedVersion-based resumption:
case Some(v) =>
copy(lastSyncedVersion = Some(v), fromSnapshot = None, streamingPollIntervalMs = None)maxIncrementalManifests threshold effectively doubles for pure appends
Old threshold was 50 and counted only new manifests. New threshold is 100 and counts both old and new. For a pure append (no replaced manifests), oldOnlyManifests.size == 0, so the threshold change is: 51 new manifests would previously → full scan (51 > 50), but now stays on the driver (0 + 51 = 51 ≤ 100). This is effectively doubling the driver-side threshold for append-only workloads.
The comment says "Threshold counts old + new manifests (a compaction replacing N manifests counts 2N)" — that's accurate for the compaction case but understates the behavioral change for the append-only case. Either the Scaladoc should note the effective doubling for appends, or the threshold should be left at 50 for newOnlyManifests.size and have a separate cap on oldOnlyManifests.size.
case e: Exception in old-manifest catch should be NonFatal
} catch {
case e: Exception =>
logger.warn(...)
None
}Same pattern flagged in PR #287 — should be case NonFatal(e). The codebase already uses NonFatal in BatchWrite and ScanBuilder.
IcebergSourceReader.sourceVersion() behavioral change
Old:
val entries = fileEntries
if (entries.isEmpty) snapshotId
else Some(entries.get(0).getSnapshotId)New:
snapshotId.orElse {
val entries = fileEntries
if (entries.isEmpty) None
else Some(entries.get(0).getSnapshotId)
}When snapshotId = Some(id) and entries are non-empty, old returns entries.get(0).getSnapshotId, new returns snapshotId. The comment explains the intent ("FROM SNAPSHOT takes precedence"). This is correct for the streaming use case where snapshotId IS the authoritative version marker. But there's no test covering the case where snapshotId != entries.get(0).getSnapshotId (e.g., a time-travel query to a snapshot that contains files written in a different snapshot). Worth adding a unit test.
Missing test: streaming cycle with deletion detected incrementally
The invalidation-only sync removes stale splits when source files are deleted test covers the one-shot re-sync deletion path. But the streaming cycle (where deletion is detected in cycle N after append-then-delete) has no direct test. The manifest set-difference algorithm has a new code path (oldOnlyEntries, addedEntries, removedPaths) — a streaming deletion test in DistributedSourceScannerTest would complete coverage.
Everything else looks good
CloudPathUtils.stripFileSchemeconsolidation: the URI-based implementation is correct and handles allfile:forms includingfile:/path(the oldLocalCopyDownloaderhad a latent bug there:path.stripPrefix("file://")wouldn't matchfile:/path). Tests are solid.readLastSyncedVersionFromLogauto-resume: clean approach; theorElsecomposition withfromVersion/fromSnapshotpriority guards is correct.FROM SNAPSHOTtime-travel: thesnapshotId = fromSnapshotchange is correct. The scanner's guard at line 681 (if (fromSnapshotId.isDefined)→ get current snapshot) ensures time-travel only applies to the initial full-scan cycle. The testFROM SNAPSHOT seeds initial sync, then streams new datacorrectly validates cycles 1→2→3.- Invalidation-only commit path:
groups.isEmpty && splitsToInvalidate.nonEmptyguard is correct.sourceVersionin this path is alwaysdr.version = Some(currentSnapId)from the incremental scan result (never-1).transactionLog.invalidateCache()after commit is the right call. extractTableBasePath+file://: addingfile://to the scheme pattern means the base path is reconstructed with the scheme intact, which is consistent with the inlinenormalizedRemovedPathscomparison (both sides have the scheme). Correct — but further reinforces that the right fix for the normalization issue above isnormalizeSourcePath, not manually stripping in two more places.
…t limits, error handling Production fixes: - Apply stripFileScheme + normalizeLocalPath to removed-path normalization in Iceberg manifest set-difference, handling file: scheme variations between entries from different manifests - Clear fromVersion/fromSnapshot on streaming resume transition to prevent stale SQL fields leaking into subsequent streaming cycles - Split maxIncrementalManifests into independent per-side limits (default 50 each) so pure appends retain the original threshold - Use NonFatal in old-manifest catch block for consistency - Wrap new-only manifest read in same NonFatal try/catch as old-only reads, preventing exception propagation to the wrong fallback handler Tests: - Add sourceVersion() priority test (FROM SNAPSHOT snap2 reports snap2, not snap1's embedded entry ID) - Add streaming deletion detection test (append → delete → verify companion removes stale splits via manifest set-difference) - Speed up streaming E2E tests: poll interval 2000→500ms, waitUntil poll 500→200ms, idle sleep 6000→1500ms Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Addressing round 2 reviewThanks for the thorough re-review @schenksj. All 6 items addressed, plus two additional fixes found during self-review. Bug fixes
Additional fixes from self-review
New tests
Also
All affected test suites pass (DistributedSourceScannerTest, StreamingCompanionIcebergEndToEndTest, IcebergMiscFeaturesTest, SyncToExternalParsingTest, CloudPathUtilsTest, ExtractTableBasePathTest). |
Resolves 9 conflicts brought by main's activity (primarily the spotless formatting pass in PR indextables#297 and new companion operation test files): Content conflicts (3 files, all semantic): - DistributedSourceScanner.scala — kept HEAD's full set-difference deletion detection logic (the core feature of this PR) and added main's `dateCols` parameter to the `icebergEntryToCompanionFile` call site for Iceberg DATE partition handling - SyncTaskExecutor.scala — trivial whitespace (extra space before `=`), took main's formatting - DistributedSourceScannerTest.scala — kept HEAD's `IcebergSnapshotHelper` trait mixin (needed for this PR's Iceberg tests) with main's multi-line class declaration formatting Add/add conflicts (6 test files, all formatting-only): - CompanionAggregatePushdownCorrectnessTest, CompanionBucketAggregationTest, CompanionDropPartitionsTest, CompanionMergeSplitsTest, CompanionPurgeTest, CompanionTruncateTimeTravelTest — both branches independently added these files with identical test sets (verified: 56 total tests, all common across both sides, zero HEAD-only, zero main-only). The only differences are spotless formatting. Took origin/main's spotless-formatted versions via `git checkout --theirs`. `mvn compile` passes cleanly. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
schenksj
left a comment
There was a problem hiding this comment.
Review — Iceberg Companion Sync Bug Fixes
The PR correctly fixes four interrelated bugs in the Iceberg companion sync pipeline. The test suite (~3,500 lines across 8 new test files) is thorough. Two issues require resolution before merge.
Request Changes
1. Removed-path normalization bypasses normalizeSourcePath (correctness risk)
The new incremental delete path in DistributedSourceScanner.scala inlines its own normalizeLocalPath/stripFileScheme logic instead of delegating to the existing normalizeSourcePath(rawPath, basePath) method. The inline version's else p fallback returns a raw absolute path that won't match any companion split key — the deletion is silently dropped. Use the existing method:
val normalizedRemovedPaths = storageRoot match {
case Some(basePath) => removedPaths.map(normalizeSourcePath(_, basePath))
case None => removedPaths
}2. Dead-code invalidation block in SyncToExternalCommand.scala may still exist
The Apr 2 review identified an original Block 1 Row(...) construction that skips invalidateCache(). The new Block 2 (added in this PR) is the correct implementation. Verify Block 1 has been removed — if it still exists, stale cache entries will persist after deletion-only syncs. The new buildResultRow code path correctly calls invalidateCache().
Important (can be follow-up)
3. maxIncrementalManifests semantics change is undocumented
The config now caps old-only OR new-only manifests independently (not combined). For pure append workloads, the effective threshold doubles (50 → 100 with the default change). Users who tuned this setting will see a behavior change without any indication in the docs or config reference.
4. sampleFilePath is None for deletion-only syncs
sampleFilePath = addedEntries.headOption.orElse(newOnlyEntries.headOption).map(_.getPath)In a pure-deletion sync, both are empty, so sampleFilePath = None. Worth verifying the downstream consumer handles None when removedSourcePaths is non-empty — if schema inference relies on it and doesn't handle None, this will surface as a confusing error in production.
Suggestions
5. Test infrastructure is duplicated across 8 new test files
All 8 test classes independently define beforeAll(), flushCaches(), withIcebergTable(), syncIceberg(), and readCompanion(). IcebergSnapshotHelper already exists as a consolidation point — the session setup and helper methods should move into a shared IcebergTestBase to avoid an 8-way sync problem when configs change.
6. Missing overwrite/compaction unit test in DistributedSourceScannerTest
The three new scanner unit tests cover no-op, append, and pure-delete. The interesting edge case — a snapshot with both added and removed entries (overwrite/compaction) — is not covered at the unit level. Worth adding to directly validate the set-difference logic under simultaneous adds+removes.
Positive Observations
lastSyncedVersiondecoupling is architecturally clean and removes two format-specificcopy()branches- Manifest set-difference algorithm (old-only/new-only partition) is correct and skips shared manifests cheaply
NonFatalusage in manifest read error handling is correct and consistent- The
FROM SNAPSHOT reports requested snapshot ID even when entries have different embedded IDstest is particularly well-targeted - Streaming poll interval reduction (2000ms → 500ms) reduces test wall time without sacrificing determinism
fromVersion = None, fromSnapshot = Noneclearing in streaming cycle copy (raised in Apr 5 review) is correctly addressed in this PR
…anionTestBase, add overwrite test Production code (DistributedSourceScanner.scala): - Extract `relativizeToStorageRoot(rawPath, basePath)` as the single source of truth for converting absolute file paths to relative companion split keys. Three strategies (prefix match, S3 object-key match, filename-only fallback), all in one place. - `parquetEntryToCompanionFile` now delegates to `relativizeToStorageRoot` instead of reimplementing 20 lines of inline normalization. - `icebergEntryToCompanionFile` now delegates to `relativizeToStorageRoot` instead of a simpler 1-branch prefix match. Gains scheme-stripping and S3 object-key fallback for free — ensures add-path and remove-path normalization are identical by construction. - Removed `normalizeSourcePath` (renamed to `relativizeToStorageRoot`). The removed-paths call site now uses `removedPaths.map(relativizeToStorageRoot(_, basePath))`. - Updated `maxIncrementalManifests` comment with 3-case behavioral documentation (pure append, compaction-heavy, mixed workloads). Test infrastructure (IcebergTestSupport.scala + 6 Companion* files): - Created `CompanionTestBase` trait providing shared SparkSession lifecycle, `flushCaches()`, `withTempPath()`, and `readCompanion()`. - Refactored 6 Companion* test files to extend the trait, removing ~330 lines of duplicated helper methods. - CompanionPurgeTest: no-op `beforeAll`/`afterAll` since it manages SparkSession per-test via `beforeEach`/`afterEach`. - CompanionTruncateTimeTravelTest: calls `super.beforeAll()` + 2 extra configs. New test (DistributedSourceScannerTest.scala): - Added `scanIcebergTable incremental detects both added and removed files after overwrite` — exercises the set-difference algorithm with simultaneous adds and removes (delete + append across two snapshots). Exact count assertions. Scott's review findings addressed: indextables#1 (must-fix): normalizeSourcePath → relativizeToStorageRoot with 3-branch logic indextables#2 (must-fix): dead Block 1 already removed in prior commit indextables#3 (important): maxIncrementalManifests semantics documented indextables#4 (important): sampleFilePath=None verified safe (orElse fallback at line 761) indextables#5 (suggestion): CompanionTestBase extracted, 6 files consolidated indextables#6 (suggestion): overwrite/compaction test added 55 tests verified: 26 DistributedSourceScannerTest + 29 Companion* suite tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
751f9ac to
fa5f7ca
Compare
All 6 review findings addressed (2026-04-11)Scott, here's what landed in commit Must-fix findings#1 — Removed-path normalization bypasses Rather than just patching the inline code, I consolidated all path relativization into a single shared method:
All three callers now delegate to this one method:
The old #2 — Dead-code invalidation Block 1 ✅ Already fixed in prior commit Verified: only the clean Block 2 remains (line 722: Important findings#3 — Added a 3-case behavioral comment at the threshold check:
#4 — Downstream uses Suggestion findings#5 — Test infrastructure duplicated across 8 files ✅ Fixed Created
Net: -191 lines of duplicated helpers. #6 — Missing overwrite/compaction unit test ✅ Added New test: Verification55 tests verified locally:
One pre-existing failure ( Happy to discuss any of the changes. |
Follow-up: pre-existing failing test → issue #300The
This brings the PR to all tests green with no known failures. |
…ot BUILD The auto-read of lastSyncedVersion from the companion transaction log on non-streaming BUILDs (introduced to fix indextables#268) silently short-circuits re-sync to no_action whenever the companion has diverged from its last recorded source snapshot via local-only mutations. DROP PARTITIONS is the canonical trigger: the source table is unchanged (same snapshot ID), but the companion is missing the dropped partitions' splits, and the scanner's incremental fast path returns an empty changeset because currentSnap == fsnap. Root cause: lastSyncedVersion encodes source-side state at last sync. It is not a summary of the companion's current contents. Using it as fromSnapshotId for arbitrary future BUILDs assumes the companion only changes in response to source changes — which DROP PARTITIONS, manual invalidation, and any future local mutation paths violate. Fix: leave lastSyncedVersion None on one-shot BUILDs, forcing a full scan whose anti-join against the companion's recorded source files detects any local divergence. Streaming cycles are unaffected — they thread lastSyncedVersion through copy() in the streaming loop, and StreamingCompanionManager still calls readLastSyncedVersionFromLog for cold-start resume. Tests: - CompanionDropPartitionsTest::"Re-sync after drop on Iceberg companion" was previously failing on this branch (commented out in 0bc3b9e, now restored — was still passing on main all along because main never had the auto-read). Same for the Delta variant. - StreamingCompanionIcebergEndToEndTest: 5/5, including "restart resumes from last synced snapshot" and "deletion detected incrementally across streaming cycles". - DistributedSourceScannerTest: 26/26. - SyncToExternalParsingTest: 86/86. - IcebergMiscFeaturesTest: 9/9 (FROM SNAPSHOT time-travel semantics). Issue indextables#268's perf optimization (skip full-scan when source snapshot unchanged) can be re-landed as a follow-up once local-companion divergence tracking exists — for example, a monotonic companion-state epoch bumped by DROP / invalidation paths that the fast-path checks before short-circuiting. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
0bc3b9e to
546356c
Compare
|
Scott — you were right that Root cause. Commit 244c9ef added an auto-read of The fundamental issue: Fix. Non-streaming BUILDs now leave Tests run on 546356c:
Follow-up. #268's optimization (skip the source scan entirely when the snapshot ID matches) can be re-landed safely once there's a companion-state epoch — a counter bumped by every local-mutation path (DROP, invalidate) that the fast-path checks before short-circuiting. I'd rather do that as a separate PR with its own test matrix than stack it on this one. The previous history had a |
Summary
lastSyncedVersionfield onSyncToExternalCommandkeeps streaming incremental resume independent fromfromVersion(Delta SQL) andfromSnapshot(Iceberg SQL). Streaming resume is now format-agnostic; format-specific routing happens at the scanner call site.DistributedSourceScannerreads entries from both old-only and new-only manifests, computing file-level adds AND deletes. Previously only detected new files, missing deletions entirely.lastSyncedVersionfrom the companion transaction log to enable incremental fast path and correctno_actiondetection.Supersedes #263 and #264. Fixes #267, #268, #270.
Test plan
All tests pass (8 new tests added).
🤖 Generated with Claude Code