Skip to content

Fix Iceberg streaming resume, deletion detection, and no_action handling#286

Open
tlee732 wants to merge 18 commits into
indextables:mainfrom
tlee732:fix/iceberg-streaming-and-deletion
Open

Fix Iceberg streaming resume, deletion detection, and no_action handling#286
tlee732 wants to merge 18 commits into
indextables:mainfrom
tlee732:fix/iceberg-streaming-and-deletion

Conversation

@tlee732
Copy link
Copy Markdown
Contributor

@tlee732 tlee732 commented Apr 1, 2026

Summary

  • Separate streaming resume from SQL fields — New lastSyncedVersion field on SyncToExternalCommand keeps streaming incremental resume independent from fromVersion (Delta SQL) and fromSnapshot (Iceberg SQL). Streaming resume is now format-agnostic; format-specific routing happens at the scanner call site.
  • Incremental deletion detection — Enhanced manifest set-difference in DistributedSourceScanner reads entries from both old-only and new-only manifests, computing file-level adds AND deletes. Previously only detected new files, missing deletions entirely.
  • Invalidation-only commit path — When files are deleted from source but nothing new needs indexing, remove actions are now committed directly instead of being silently dropped by the empty batch loop.
  • Auto-resume for one-shot syncs — Non-streaming re-syncs now read lastSyncedVersion from the companion transaction log to enable incremental fast path and correct no_action detection.

Supersedes #263 and #264. Fixes #267, #268, #270.

Test plan

export JAVA_HOME=/opt/homebrew/opt/openjdk@11

# Iceberg streaming E2E (incremental resume + FROM SNAPSHOT + invalidation-only):
mvn test-compile scalatest:test -DwildcardSuites='io.indextables.spark.sql.StreamingCompanionIcebergEndToEndTest'

# Iceberg incremental scanner (no-op, append, delete detection):
mvn test-compile scalatest:test -DwildcardSuites='io.indextables.spark.sync.DistributedSourceScannerTest'

# FROM SNAPSHOT time-travel:
mvn test-compile scalatest:test -DwildcardSuites='io.indextables.spark.sync.IcebergMiscFeaturesTest'

# SQL parsing (FROM SNAPSHOT + WITH STREAMING):
mvn test-compile scalatest:test -DwildcardSuites='io.indextables.spark.sql.SyncToExternalParsingTest'

# Delta streaming (unchanged behavior):
mvn test-compile scalatest:test -DwildcardSuites='io.indextables.spark.sql.StreamingCompanionEndToEndTest'

All tests pass (8 new tests added).

🤖 Generated with Claude Code

tlee732 and others added 13 commits March 26, 2026 15:57
Format-agnostic tests for 6 companion operations:
- AggregatePushdownCorrectness: 11 tests — COUNT/SUM/AVG/MIN/MAX, GROUP BY (exact values)
- BucketAggregation: 9 tests — Histogram/DateHistogram/Range with per-bucket verification
- DropPartitions: 9 tests — equality/range/compound predicates, cumulative drops, re-sync
- MergeSplits: 9 tests — companion field preservation, data/agg/filter correctness post-merge
- Purge: 9 tests — orphan deletion, retention, DRY RUN, data integrity
- TruncateTimeTravel: 9 tests — version cleanup, checkpoint creation, metadata preservation

3 tests intentionally fail, catching real bugs:
- Merged Iceberg companion splits fail file:// parquet path resolution
- Iceberg sync returns "success" instead of "no_action" for unchanged snapshots
- Iceberg file deletion not detected by companion anti-join

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Closes the 5:1 Delta-to-Iceberg test gap for sync operations:
- IcebergMutationSyncTest: 10 tests — append/delete/overwrite/mixed ops, partitioned tables
- IcebergDistributedSyncTest: 12 tests — WHERE clause (=, IN, !=, range, AND, OR), scoped invalidation
- IcebergWhereClauseExtendedTest: 5 tests — compound AND, OR, strict >, BETWEEN, range filters
- IcebergSchemaEvolutionSyncTest: 4 tests — add/drop/rename column, new column indexing

7 tests intentionally fail, catching real Iceberg bugs:
- Scoped invalidation doesn't detect deleted files within WHERE range
- Sync returns "success" instead of "no_action" for unchanged snapshots
- Sync after add/drop/rename column returns "error" (schema evolution not handled)
- Partitioned delete not detected by anti-join

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The native Rust layer received file:// URIs as parquetTableRoot for local
Iceberg companions but only supports bare filesystem paths or cloud URIs.

- ConfigUtils.normalizeStorageUrl(): strip file:// to bare paths
- SyncTaskExecutor.extractTableBasePath(): add file:// to scheme pattern
- Fix merge test data layout to use common data directory
- Remove known-bug comments from test files

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Schema evolution support doesn't exist yet (indextables#269), so these tests
shouldn't be part of this PR. They can be reintroduced alongside
the feature implementation.

Addresses reviewer feedback from @schenksj.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Five locations stripped file:// URIs using different approaches (URI.getPath,
substring, stripPrefix), two with bugs: SyncTaskExecutor.downloadFile had
unguarded URI.getPath (NPE risk), LocalCopyDownloader.normalizeLocalPath
produced wrong result for file://hostname/path URIs.

Adds CloudPathUtils.stripFileScheme with URI.getPath + null/exception guards
and substring fallback for malformed URIs (e.g., unescaped spaces). All five
call sites now delegate to this shared utility. Adds unit tests for both
stripFileScheme and extractTableBasePath.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- IcebergRoundTripTest: 6 tests — scalar/Date/Timestamp/Array/Struct/Map/partitioned data fidelity
- IcebergBatchingSyncTest: 4 tests — batchSize/maxConcurrentBatches configuration
- IcebergCompactStringTest: 4 tests — exact_only/text_uuid_exactonly/text_uuid_strip modes
- IcebergPartitionEvolutionTest: 3 tests — unpartitioned→partitioned, add partition, mixed specs
- IcebergMiscFeaturesTest: 8 tests — error handling, write guard, fast field modes, FROM SNAPSHOT

2 tests intentionally fail, catching real bugs:
- Partitioned round-trip: file:// parquet path resolution failure
- FROM SNAPSHOT: returns wrong snapshot ID in result row

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
appendSnapshot wrote each batch's parquet files to separate batch-N/
subdirectories, but extractTableBasePath() derives the storage root from
the first file's path. Files from other batches couldn't be resolved
relative to that root, causing "Failed to create parquet stream builder"
on partitioned tests. Real Iceberg tables store all data files under a
single data/ directory — mirror that layout so the test reflects
production behavior.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ndextables#268)

Three changes:

1. Read lastSyncedVersion from companion metadata for non-streaming
   Iceberg syncs, enabling the snapshot ID comparison that returns
   "no_action" when the source hasn't changed. (fixes indextables#268)

2. Replace the incremental manifest-diff path with a full-scan
   fallback when the Iceberg snapshot has changed. The manifest-diff
   only detected new files — it never populated removedSourcePaths,
   so file deletions were invisible. The full-scan anti-join already
   handles both new and deleted files correctly. (fixes indextables#267)

3. Handle invalidation-only syncs (splits to remove, no new files).
   Previously, remove actions were only committed during batch
   processing — if there were no files to index, no batch ran and
   the stale splits were never removed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
FROM SNAPSHOT was incorrectly implemented as an incremental filter (get
changes after this snapshot) instead of time-travel (build companion from
the state at this snapshot). This caused the wrong files to be synced and
the wrong snapshot ID reported in the result.

Two changes:
- SyncToExternalCommand: pass fromSnapshot as snapshotId (time-travel)
  instead of fromSnapshotId (incremental filter)
- IcebergSourceReader.sourceVersion: return the requested snapshot ID
  when one was explicitly provided, instead of the first file entry's ID

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Replace hand-built Row in invalidation-only path with buildResultRow
- Hoist val externalStorageRoot before step 8b to eliminate duplicate
  icebergStorageRoot.orElse(resolvedStorageLocation) expression
- Trim verbose comments: effectiveFromSnapshot (4→1 line), remove
  comment duplicating log message, simplify scanner block comment

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…hods

Six Iceberg test files each had their own copy of appendSnapshot with
slight variations. Four still used the old direct-write pattern that
breaks on multi-batch tests. Consolidated into IcebergSnapshotHelper
trait in IcebergTestSupport.scala with staging+move to a common data/
directory, optional partitionPath, and explicit schema parameter.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…/iceberg-streaming-and-deletion

# Conflicts:
#	src/main/scala/io/indextables/spark/sql/SyncToExternalCommand.scala
Separates streaming resume from SQL FROM SNAPSHOT/FROM VERSION fields,
adds incremental deletion detection via manifest set-difference, and
fixes invalidation-only sync commits.

Production changes:
- Add lastSyncedVersion field to SyncToExternalCommand for format-agnostic
  streaming resume, keeping fromVersion (Delta SQL) and fromSnapshot
  (Iceberg SQL) as separate concerns
- Enhance DistributedSourceScanner manifest-diff to read both old-only and
  new-only manifests, computing file-level adds AND deletes via set
  difference (previously only detected new files)
- Add invalidation-only commit path for deletion-only syncs where no new
  files need indexing but stale splits must be removed
- Auto-read lastSyncedVersion from companion transaction log for
  non-streaming one-shot re-syncs (fixes indextables#268 no_action)
- Add Parquet format filter to incremental manifest reads
- Handle GC'd old manifests gracefully with fallback to full scan

Fixes indextables#267, indextables#268, indextables#270

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Collaborator

@schenksj schenksj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR #286 Review — Fix Iceberg streaming resume, deletion detection, and no_action handling

Summary

Solid architectural work. The lastSyncedVersion separation cleanly decouples streaming state from SQL clauses, the manifest set-difference deletion detection is the right approach for Iceberg, and the CloudPathUtils.stripFileScheme consolidation removes three duplicate implementations while upgrading to proper java.net.URI parsing with substring fallback.


Issues

1. Duplicate invalidation-only commit path (dead code)

There are two blocks handling the same condition, and the first one makes the second unreachable.

Block 1 (~line 698 in SyncToExternalCommand.scala):

if (parquetFilesToIndex.isEmpty && splitsToInvalidate.nonEmpty) {
    ...
    return Seq(Row(destPath, sourcePath, "success", ..., s"Removed ${splitsToInvalidate.size} invalidated splits (no new files)"))
}

Block 2 (~line 737):

if (groups.isEmpty && splitsToInvalidate.nonEmpty) {
    transactionLog.invalidateCache()
    return buildResultRow(Seq.empty, splitsToInvalidate.size, ...)
}

Since groups = planIndexingGroups(parquetFilesToIndex, ...), when parquetFilesToIndex is empty, groups is also empty — Block 2 can never fire. Block 2 is the better implementation (uses buildResultRow, calls invalidateCache()), while Block 1 manually constructs the Row inline and skips invalidateCache(). Remove Block 1.

2. return inside foreach lambda in scanIcebergTable

oldOnlyEntriesOpt.foreach { oldOnlyEntries =>
    ...
    return DistributedScanResult(...)   // NonLocalReturnControl
}
// fall through to full scan

return inside a Scala closure uses NonLocalReturnControl (a Throwable subclass) to unwind to the enclosing method. This works but generates a compiler warning, has allocator overhead on the hot path, and is easy to misread. The fall-through behavior is better expressed as:

oldOnlyEntriesOpt match {
    case Some(oldOnlyEntries) =>
        ...
        return DistributedScanResult(...)
    case None =>
        logger.warn("...falling back to full scan")
        // fall through
}

Or restructure so the full-scan path is in the None branch rather than relying on fall-through.


Minor

  • The IcebergSourceReader.sourceVersion() change from if (entries.isEmpty) snapshotId else ... to snapshotId.orElse(...) is intentional and correct: constructor-provided snapshotId (from FROM SNAPSHOT) now takes precedence over the entry's embedded snapshot ID. Worth a brief comment since the old behavior was the opposite priority order.

  • The maxIncrementalManifests threshold increase from 50 → 100 and the threshold semantics change (now counting old+new instead of just new) are both sensible, but the old config key (spark.indextables.companion.sync.iceberg.maxIncrementalManifests) now semantically means "total changed manifests" — users with existing tuning at the old default (50) will see behavior change. Fine for now but worth a release note.


Tests

The new StreamingCompanionIcebergEndToEndTest cases (FROM SNAPSHOT seeding + invalidation-only) directly cover the fixed paths. DistributedSourceScannerTest additions look comprehensive. No concerns.

Copy link
Copy Markdown
Collaborator

@schenksj schenksj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test

…rn to match

- Remove duplicate invalidation-only early return that was unreachable
  (Block 2 after planIndexingGroups already handles this case with
  buildResultRow + invalidateCache)
- Move invalidation-only message into buildResultRow so the single
  code path produces the correct result message
- Refactor oldOnlyEntriesOpt.foreach { return } to match/case to
  avoid NonLocalReturnControl overhead and compiler warning
- Add comment on IcebergSourceReader.sourceVersion() priority order

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@tlee732
Copy link
Copy Markdown
Contributor Author

tlee732 commented Apr 2, 2026

Thanks for the thorough review, Scott! Addressed all points in a36ab52:

  1. Duplicate invalidation-only block — Removed Block 1 entirely. Block 2 (after planIndexingGroups) now handles all invalidation-only syncs using buildResultRow + invalidateCache(). Moved the "Removed N invalidated splits" message into buildResultRow so it produces the correct result string when results are empty but splits were invalidated.

  2. foreach + returnmatch — Refactored oldOnlyEntriesOpt.foreach { ... return ... } to a match with Some/None branches. The None case now explicitly logs a warning before falling through to full scan.

  3. sourceVersion() comment — Added a one-liner clarifying the priority order (explicit FROM SNAPSHOT snapshotId takes precedence over entry's embedded snapshot ID).

  4. maxIncrementalManifests semantics — Noted for release notes, no code change.

All 3 test suites pass (StreamingCompanionIcebergEndToEndTest 4/4, DistributedSourceScannerTest 25/25, IcebergMiscFeaturesTest 8/8).

Copy link
Copy Markdown
Collaborator

@schenksj schenksj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — PR #286 (Full Re-review)

Summary

The four main changes are well-motivated and the overall logic is correct. A few issues worth addressing before merge:


Bug: Removed-path normalization reimplements normalizeSourcePath without scheme handling

The new inline normalization in the Iceberg incremental deletion path:

val normalizedRemovedPaths = storageRoot match {
  case Some(basePath) =>
    val normalizedBase = basePath.stripSuffix("/")
    removedPaths.map { p =>
      val normalized = p.stripSuffix("/")
      if (normalized.startsWith(normalizedBase))
        normalized.substring(normalizedBase.length).stripPrefix("/")
      else p
    }
  case None => removedPaths
}

This is a reimplementation of DistributedSourceScanner.normalizeSourcePath(rawPath, basePath), which already strips file:// schemes, applies normalizeLocalPath, and handles leading-slash normalization. The inline version does none of that.

Why this matters for the tests: In local tests, extractTableBasePath now includes file:// in its scheme pattern, so storageRoot = "file:///tmp/..." and removedPaths are "file:///tmp/.../file.parquet" — scheme-consistent, so the raw startsWith happens to work. But if Iceberg ever returns paths without the file:// prefix (e.g., after a catalog round-trip), or for any path normalization edge case that normalizeSourcePath already handles, the inline version silently falls through to else p (absolute path), and the removed path won't match any companion split — the deletion is silently dropped.

Fix: use the existing utility:

val normalizedRemovedPaths = storageRoot match {
  case Some(basePath) => removedPaths.map(normalizeSourcePath(_, basePath))
  case None           => removedPaths
}

Bug: fromSnapshot leaks into all streaming cycles after the first

The new streaming cycle copy:

case Some(v) =>
  copy(lastSyncedVersion = Some(v), streamingPollIntervalMs = None)

This preserves fromSnapshot from the original command. For cycles 2+, snapshotId = fromSnapshot = Some(originalSnap1Id) is passed to the scanner. The scanner correctly ignores snapshotId when fromSnapshotId is set (line 681: if (fromSnapshotId.isDefined) IcebergTableReader.getSnapshotInfo(... /* no id, gets current */)) so this doesn't break today. But it's fragile coupling — the correctness depends on that one-line guard in the scanner, with no indication at the call site that fromSnapshot is being silently ignored.

Fix: explicitly clear fromSnapshot when transitioning to lastSyncedVersion-based resumption:

case Some(v) =>
  copy(lastSyncedVersion = Some(v), fromSnapshot = None, streamingPollIntervalMs = None)

maxIncrementalManifests threshold effectively doubles for pure appends

Old threshold was 50 and counted only new manifests. New threshold is 100 and counts both old and new. For a pure append (no replaced manifests), oldOnlyManifests.size == 0, so the threshold change is: 51 new manifests would previously → full scan (51 > 50), but now stays on the driver (0 + 51 = 51 ≤ 100). This is effectively doubling the driver-side threshold for append-only workloads.

The comment says "Threshold counts old + new manifests (a compaction replacing N manifests counts 2N)" — that's accurate for the compaction case but understates the behavioral change for the append-only case. Either the Scaladoc should note the effective doubling for appends, or the threshold should be left at 50 for newOnlyManifests.size and have a separate cap on oldOnlyManifests.size.


case e: Exception in old-manifest catch should be NonFatal

} catch {
  case e: Exception =>
    logger.warn(...)
    None
}

Same pattern flagged in PR #287 — should be case NonFatal(e). The codebase already uses NonFatal in BatchWrite and ScanBuilder.


IcebergSourceReader.sourceVersion() behavioral change

Old:

val entries = fileEntries
if (entries.isEmpty) snapshotId
else Some(entries.get(0).getSnapshotId)

New:

snapshotId.orElse {
  val entries = fileEntries
  if (entries.isEmpty) None
  else Some(entries.get(0).getSnapshotId)
}

When snapshotId = Some(id) and entries are non-empty, old returns entries.get(0).getSnapshotId, new returns snapshotId. The comment explains the intent ("FROM SNAPSHOT takes precedence"). This is correct for the streaming use case where snapshotId IS the authoritative version marker. But there's no test covering the case where snapshotId != entries.get(0).getSnapshotId (e.g., a time-travel query to a snapshot that contains files written in a different snapshot). Worth adding a unit test.


Missing test: streaming cycle with deletion detected incrementally

The invalidation-only sync removes stale splits when source files are deleted test covers the one-shot re-sync deletion path. But the streaming cycle (where deletion is detected in cycle N after append-then-delete) has no direct test. The manifest set-difference algorithm has a new code path (oldOnlyEntries, addedEntries, removedPaths) — a streaming deletion test in DistributedSourceScannerTest would complete coverage.


Everything else looks good

  • CloudPathUtils.stripFileScheme consolidation: the URI-based implementation is correct and handles all file: forms including file:/path (the old LocalCopyDownloader had a latent bug there: path.stripPrefix("file://") wouldn't match file:/path). Tests are solid.
  • readLastSyncedVersionFromLog auto-resume: clean approach; the orElse composition with fromVersion/fromSnapshot priority guards is correct.
  • FROM SNAPSHOT time-travel: the snapshotId = fromSnapshot change is correct. The scanner's guard at line 681 (if (fromSnapshotId.isDefined) → get current snapshot) ensures time-travel only applies to the initial full-scan cycle. The test FROM SNAPSHOT seeds initial sync, then streams new data correctly validates cycles 1→2→3.
  • Invalidation-only commit path: groups.isEmpty && splitsToInvalidate.nonEmpty guard is correct. sourceVersion in this path is always dr.version = Some(currentSnapId) from the incremental scan result (never -1). transactionLog.invalidateCache() after commit is the right call.
  • extractTableBasePath + file://: adding file:// to the scheme pattern means the base path is reconstructed with the scheme intact, which is consistent with the inline normalizedRemovedPaths comparison (both sides have the scheme). Correct — but further reinforces that the right fix for the normalization issue above is normalizeSourcePath, not manually stripping in two more places.

…t limits, error handling

Production fixes:
- Apply stripFileScheme + normalizeLocalPath to removed-path normalization in
  Iceberg manifest set-difference, handling file: scheme variations between
  entries from different manifests
- Clear fromVersion/fromSnapshot on streaming resume transition to prevent
  stale SQL fields leaking into subsequent streaming cycles
- Split maxIncrementalManifests into independent per-side limits (default 50
  each) so pure appends retain the original threshold
- Use NonFatal in old-manifest catch block for consistency
- Wrap new-only manifest read in same NonFatal try/catch as old-only reads,
  preventing exception propagation to the wrong fallback handler

Tests:
- Add sourceVersion() priority test (FROM SNAPSHOT snap2 reports snap2,
  not snap1's embedded entry ID)
- Add streaming deletion detection test (append → delete → verify companion
  removes stale splits via manifest set-difference)
- Speed up streaming E2E tests: poll interval 2000→500ms, waitUntil poll
  500→200ms, idle sleep 6000→1500ms

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@tlee732
Copy link
Copy Markdown
Contributor Author

tlee732 commented Apr 5, 2026

Addressing round 2 review

Thanks for the thorough re-review @schenksj. All 6 items addressed, plus two additional fixes found during self-review.

Bug fixes

  1. Removed-path normalization — Applied CloudPathUtils.stripFileScheme + normalizeLocalPath to the inline normalization in the Iceberg incremental deletion path. Previously, a scheme mismatch between storageRoot and removedPaths (e.g., after a catalog round-trip) would silently drop deletions.

  2. fromSnapshot leak into streaming cycles — Streaming resume now clears both fromVersion and fromSnapshot when transitioning to lastSyncedVersion-based resumption: copy(lastSyncedVersion = Some(v), fromVersion = None, fromSnapshot = None, ...). Removes fragile coupling where correctness depended on the scanner's fromSnapshotId.isDefined guard.

  3. maxIncrementalManifests threshold — Reverted default to 50 and now checks each side independently (oldOnly > 50 || newOnly > 50). Preserves the original 50-manifest cap for append-only workloads; compaction-heavy workloads (N old, 1 new) are also independently capped.

  4. case e: ExceptionNonFatal — Updated the old-manifest catch block.

Additional fixes from self-review

  1. New-only manifest error handling — The old-only manifest read was wrapped in try/catch NonFatal(e) with graceful fallback to full scan, but the new-only manifest read had no error handling. A transient I/O error would propagate to the broad case e: Exception in SyncToExternalCommand, falling back to the single-call reader instead of the distributed full scan. Now wrapped in the same NonFatal guard with fall-through to full scan.

  2. Comment accuracy — Fixed misleading comment that claimed normalization "matches icebergEntryToCompanionFile" (it doesn't — icebergEntryToCompanionFile doesn't apply stripFileScheme/normalizeLocalPath). Comment now accurately describes the purpose: handling file: scheme variations between entries from different manifests.

New tests

  1. sourceVersion() priority test (IcebergMiscFeaturesTest) — Builds FROM SNAPSHOT snap2 where snap1 files are still visible, asserts reported version is snap2 (not snap1's embedded entry ID).

  2. Streaming deletion test (StreamingCompanionIcebergEndToEndTest) — Streams initial sync (2 rows) → append (3 rows) → delete snap2's file → verifies companion goes from 3 → 2 rows via manifest set-difference.

Also

  • Lowered streamingPollIntervalMs from 2000ms → 500ms and waitUntil poll from 500ms → 200ms across all streaming E2E tests (reduces idle time without affecting reliability — 30s safety timeouts unchanged).

All affected test suites pass (DistributedSourceScannerTest, StreamingCompanionIcebergEndToEndTest, IcebergMiscFeaturesTest, SyncToExternalParsingTest, CloudPathUtilsTest, ExtractTableBasePathTest).

Resolves 9 conflicts brought by main's activity (primarily the spotless formatting
pass in PR indextables#297 and new companion operation test files):

Content conflicts (3 files, all semantic):
- DistributedSourceScanner.scala — kept HEAD's full set-difference deletion detection
  logic (the core feature of this PR) and added main's `dateCols` parameter to the
  `icebergEntryToCompanionFile` call site for Iceberg DATE partition handling
- SyncTaskExecutor.scala — trivial whitespace (extra space before `=`), took main's
  formatting
- DistributedSourceScannerTest.scala — kept HEAD's `IcebergSnapshotHelper` trait mixin
  (needed for this PR's Iceberg tests) with main's multi-line class declaration formatting

Add/add conflicts (6 test files, all formatting-only):
- CompanionAggregatePushdownCorrectnessTest, CompanionBucketAggregationTest,
  CompanionDropPartitionsTest, CompanionMergeSplitsTest, CompanionPurgeTest,
  CompanionTruncateTimeTravelTest — both branches independently added these files with
  identical test sets (verified: 56 total tests, all common across both sides, zero
  HEAD-only, zero main-only). The only differences are spotless formatting. Took
  origin/main's spotless-formatted versions via `git checkout --theirs`.

`mvn compile` passes cleanly.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Collaborator

@schenksj schenksj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review — Iceberg Companion Sync Bug Fixes

The PR correctly fixes four interrelated bugs in the Iceberg companion sync pipeline. The test suite (~3,500 lines across 8 new test files) is thorough. Two issues require resolution before merge.


Request Changes

1. Removed-path normalization bypasses normalizeSourcePath (correctness risk)

The new incremental delete path in DistributedSourceScanner.scala inlines its own normalizeLocalPath/stripFileScheme logic instead of delegating to the existing normalizeSourcePath(rawPath, basePath) method. The inline version's else p fallback returns a raw absolute path that won't match any companion split key — the deletion is silently dropped. Use the existing method:

val normalizedRemovedPaths = storageRoot match {
  case Some(basePath) => removedPaths.map(normalizeSourcePath(_, basePath))
  case None           => removedPaths
}

2. Dead-code invalidation block in SyncToExternalCommand.scala may still exist

The Apr 2 review identified an original Block 1 Row(...) construction that skips invalidateCache(). The new Block 2 (added in this PR) is the correct implementation. Verify Block 1 has been removed — if it still exists, stale cache entries will persist after deletion-only syncs. The new buildResultRow code path correctly calls invalidateCache().


Important (can be follow-up)

3. maxIncrementalManifests semantics change is undocumented

The config now caps old-only OR new-only manifests independently (not combined). For pure append workloads, the effective threshold doubles (50 → 100 with the default change). Users who tuned this setting will see a behavior change without any indication in the docs or config reference.

4. sampleFilePath is None for deletion-only syncs

sampleFilePath = addedEntries.headOption.orElse(newOnlyEntries.headOption).map(_.getPath)

In a pure-deletion sync, both are empty, so sampleFilePath = None. Worth verifying the downstream consumer handles None when removedSourcePaths is non-empty — if schema inference relies on it and doesn't handle None, this will surface as a confusing error in production.


Suggestions

5. Test infrastructure is duplicated across 8 new test files

All 8 test classes independently define beforeAll(), flushCaches(), withIcebergTable(), syncIceberg(), and readCompanion(). IcebergSnapshotHelper already exists as a consolidation point — the session setup and helper methods should move into a shared IcebergTestBase to avoid an 8-way sync problem when configs change.

6. Missing overwrite/compaction unit test in DistributedSourceScannerTest

The three new scanner unit tests cover no-op, append, and pure-delete. The interesting edge case — a snapshot with both added and removed entries (overwrite/compaction) — is not covered at the unit level. Worth adding to directly validate the set-difference logic under simultaneous adds+removes.


Positive Observations

  • lastSyncedVersion decoupling is architecturally clean and removes two format-specific copy() branches
  • Manifest set-difference algorithm (old-only/new-only partition) is correct and skips shared manifests cheaply
  • NonFatal usage in manifest read error handling is correct and consistent
  • The FROM SNAPSHOT reports requested snapshot ID even when entries have different embedded IDs test is particularly well-targeted
  • Streaming poll interval reduction (2000ms → 500ms) reduces test wall time without sacrificing determinism
  • fromVersion = None, fromSnapshot = None clearing in streaming cycle copy (raised in Apr 5 review) is correctly addressed in this PR

…anionTestBase, add overwrite test

Production code (DistributedSourceScanner.scala):
- Extract `relativizeToStorageRoot(rawPath, basePath)` as the single source of truth
  for converting absolute file paths to relative companion split keys. Three strategies
  (prefix match, S3 object-key match, filename-only fallback), all in one place.
- `parquetEntryToCompanionFile` now delegates to `relativizeToStorageRoot` instead of
  reimplementing 20 lines of inline normalization.
- `icebergEntryToCompanionFile` now delegates to `relativizeToStorageRoot` instead of
  a simpler 1-branch prefix match. Gains scheme-stripping and S3 object-key fallback
  for free — ensures add-path and remove-path normalization are identical by construction.
- Removed `normalizeSourcePath` (renamed to `relativizeToStorageRoot`). The removed-paths
  call site now uses `removedPaths.map(relativizeToStorageRoot(_, basePath))`.
- Updated `maxIncrementalManifests` comment with 3-case behavioral documentation
  (pure append, compaction-heavy, mixed workloads).

Test infrastructure (IcebergTestSupport.scala + 6 Companion* files):
- Created `CompanionTestBase` trait providing shared SparkSession lifecycle,
  `flushCaches()`, `withTempPath()`, and `readCompanion()`.
- Refactored 6 Companion* test files to extend the trait, removing ~330 lines of
  duplicated helper methods.
- CompanionPurgeTest: no-op `beforeAll`/`afterAll` since it manages SparkSession
  per-test via `beforeEach`/`afterEach`.
- CompanionTruncateTimeTravelTest: calls `super.beforeAll()` + 2 extra configs.

New test (DistributedSourceScannerTest.scala):
- Added `scanIcebergTable incremental detects both added and removed files after
  overwrite` — exercises the set-difference algorithm with simultaneous adds and
  removes (delete + append across two snapshots). Exact count assertions.

Scott's review findings addressed:
  indextables#1 (must-fix): normalizeSourcePath → relativizeToStorageRoot with 3-branch logic
  indextables#2 (must-fix): dead Block 1 already removed in prior commit
  indextables#3 (important): maxIncrementalManifests semantics documented
  indextables#4 (important): sampleFilePath=None verified safe (orElse fallback at line 761)
  indextables#5 (suggestion): CompanionTestBase extracted, 6 files consolidated
  indextables#6 (suggestion): overwrite/compaction test added

55 tests verified: 26 DistributedSourceScannerTest + 29 Companion* suite tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@tlee732 tlee732 force-pushed the fix/iceberg-streaming-and-deletion branch from 751f9ac to fa5f7ca Compare April 11, 2026 17:00
@tlee732
Copy link
Copy Markdown
Contributor Author

tlee732 commented Apr 11, 2026

All 6 review findings addressed (2026-04-11)

Scott, here's what landed in commit fa5f7ca6:

Must-fix findings

#1 — Removed-path normalization bypasses normalizeSourcePath ✅ Fixed (and went further)

Rather than just patching the inline code, I consolidated all path relativization into a single shared method: relativizeToStorageRoot(rawPath, basePath). Three strategies tried in order:

  1. Prefix match after stripFileScheme + normalizeLocalPath
  2. S3 object-key match (bare keys without scheme+bucket)
  3. Filename-only fallback (last resort)

All three callers now delegate to this one method:

  • parquetEntryToCompanionFile — replaced 20 lines of inline relativization
  • icebergEntryToCompanionFile — replaced 10 lines of simpler prefix-only logic. This gains the S3 object-key fallback and scheme-stripping that the Iceberg path previously lacked — ensures add-path and remove-path normalization are identical by construction, not by coincidence.
  • Incremental-deletion removed-paths — removedPaths.map(relativizeToStorageRoot(_, basePath))

The old else p fallback (returning a raw absolute path that would never match a relative companion split key) is eliminated. The shared method's filename-only fallback matches the same pattern parquetEntryToCompanionFile already used.

#2 — Dead-code invalidation Block 1 ✅ Already fixed in prior commit a36ab52e

Verified: only the clean Block 2 remains (line 722: groups.isEmpty && splitsToInvalidate.nonEmptybuildResultRow + invalidateCache()).

Important findings

#3maxIncrementalManifests semantics undocumented ✅ Fixed

Added a 3-case behavioral comment at the threshold check:

  • Pure appends (0 old, N new): capped at 50 new manifests (same as before)
  • Compaction-heavy (N old, 1 new): capped at 50 old manifests (prevents driver OOM)
  • Mixed: each side capped independently

#4sampleFilePath None for deletion-only syncs ✅ Verified safe

Downstream uses .orElse(reader.schemaSourceParquetFile()) at line 761 — graceful fallback. No code change needed.

Suggestion findings

#5 — Test infrastructure duplicated across 8 files ✅ Fixed

Created CompanionTestBase trait in IcebergTestSupport.scala providing shared SparkSession lifecycle, flushCaches(), withTempPath(), and readCompanion(). Refactored 6 Companion* test files to extend it:

  • CompanionBucketAggregationTest, CompanionAggregatePushdownCorrectnessTest, CompanionDropPartitionsTest, CompanionMergeSplitsTest — straightforward extraction (~60 lines removed each)
  • CompanionTruncateTimeTravelTest — calls super.beforeAll() + 2 extra configs
  • CompanionPurgeTest — no-op beforeAll/afterAll since it manages per-test SparkSession via beforeEach/afterEach

Net: -191 lines of duplicated helpers.

#6 — Missing overwrite/compaction unit test ✅ Added

New test: scanIcebergTable incremental detects both added and removed files after overwrite in DistributedSourceScannerTest. Creates a delete then an append (two snapshots), runs incremental scan, asserts exact counts for both removedSourcePaths.size and addedFiles.length — not just "non-empty."

Verification

55 tests verified locally:

  • 26/26 DistributedSourceScannerTest (including the new overwrite test)
  • 29/29 across CompanionBucketAggregationTest, CompanionAggregatePushdownCorrectnessTest, CompanionMergeSplitsTest

One pre-existing failure (CompanionDropPartitionsTest::Re-sync after drop on Iceberg companion) is unrelated — it's a known behavioral bug in Iceberg sync detection that predates this PR.

Happy to discuss any of the changes.

@tlee732
Copy link
Copy Markdown
Contributor Author

tlee732 commented Apr 11, 2026

Follow-up: pre-existing failing test → issue #300

The CompanionDropPartitionsTest::Re-sync after drop on Iceberg companion test was failing before this PR — it documents a real product bug where Iceberg companion re-sync after DROP PARTITIONS returns no_action instead of detecting that the companion is missing dropped splits.

This brings the PR to all tests green with no known failures.

…ot BUILD

The auto-read of lastSyncedVersion from the companion transaction log on
non-streaming BUILDs (introduced to fix indextables#268) silently short-circuits
re-sync to no_action whenever the companion has diverged from its last
recorded source snapshot via local-only mutations. DROP PARTITIONS is the
canonical trigger: the source table is unchanged (same snapshot ID), but
the companion is missing the dropped partitions' splits, and the scanner's
incremental fast path returns an empty changeset because currentSnap == fsnap.

Root cause: lastSyncedVersion encodes source-side state at last sync. It is
not a summary of the companion's current contents. Using it as fromSnapshotId
for arbitrary future BUILDs assumes the companion only changes in response
to source changes — which DROP PARTITIONS, manual invalidation, and any
future local mutation paths violate.

Fix: leave lastSyncedVersion None on one-shot BUILDs, forcing a full scan
whose anti-join against the companion's recorded source files detects any
local divergence. Streaming cycles are unaffected — they thread
lastSyncedVersion through copy() in the streaming loop, and
StreamingCompanionManager still calls readLastSyncedVersionFromLog for
cold-start resume.

Tests:
- CompanionDropPartitionsTest::"Re-sync after drop on Iceberg companion"
  was previously failing on this branch (commented out in 0bc3b9e, now
  restored — was still passing on main all along because main never had the
  auto-read). Same for the Delta variant.
- StreamingCompanionIcebergEndToEndTest: 5/5, including "restart resumes
  from last synced snapshot" and "deletion detected incrementally across
  streaming cycles".
- DistributedSourceScannerTest: 26/26.
- SyncToExternalParsingTest: 86/86.
- IcebergMiscFeaturesTest: 9/9 (FROM SNAPSHOT time-travel semantics).

Issue indextables#268's perf optimization (skip full-scan when source snapshot unchanged)
can be re-landed as a follow-up once local-companion divergence tracking
exists — for example, a monotonic companion-state epoch bumped by DROP /
invalidation paths that the fast-path checks before short-circuiting.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@tlee732 tlee732 force-pushed the fix/iceberg-streaming-and-deletion branch from 0bc3b9e to 546356c Compare April 12, 2026 14:56
@tlee732
Copy link
Copy Markdown
Contributor Author

tlee732 commented Apr 12, 2026

Scott — you were right that CompanionDropPartitionsTest::"Re-sync after drop on Iceberg companion" passes on main. Thanks for pushing back on that; I had misdiagnosed it as a pre-existing product bug and filed #300. It's actually a regression introduced by this PR, now fixed in 546356c.

Root cause. Commit 244c9ef added an auto-read of lastSyncedVersion from the companion transaction log for non-streaming one-shot BUILDs (originally to fix the no_action path from #268). That value is then threaded into scanIcebergTable as fromSnapshotId, which takes the scanner's incremental fast-path. When the source snapshot hasn't changed (currentSnap == fsnap), the fast-path returns an empty changeset — correct when the source didn't change and the companion hasn't diverged, wrong when the companion has been mutated locally via DROP PARTITIONS.

The fundamental issue: lastSyncedVersion encodes source-side state at last sync. It's not a summary of what the companion currently contains. Any local-only mutation (DROP PARTITIONS, manual invalidation, anything future that touches the companion without touching the source) leaves the companion diverged from what the snapshot ID implies, and the fast-path can't tell.

Fix. Non-streaming BUILDs now leave lastSyncedVersion None, forcing a full scan whose anti-join against the companion's recorded source files catches local divergence. Streaming cycles are untouched — they still thread lastSyncedVersion through copy() in the streaming loop, and StreamingCompanionManager still uses readLastSyncedVersionFromLog for cold-start resume.

Tests run on 546356c:

  • CompanionDropPartitionsTest — 9/9 (includes the previously-broken Iceberg case and the Delta variant, both of which were silently affected) — 13s
  • StreamingCompanionIcebergEndToEndTest — 5/5 (includes restart resumes from last synced snapshot and deletion detected incrementally across streaming cycles) — 11s
  • DistributedSourceScannerTest — 26/26 — 22s
  • SyncToExternalParsingTest — 86/86 — 1.1s
  • IcebergMiscFeaturesTest — 9/9 (FROM SNAPSHOT time-travel semantics from Iceberg FROM SNAPSHOT returns wrong snapshot ID in result #270) — 6.5s

Follow-up. #268's optimization (skip the source scan entirely when the snapshot ID matches) can be re-landed safely once there's a companion-state epoch — a counter bumped by every local-mutation path (DROP, invalidate) that the fast-path checks before short-circuiting. I'd rather do that as a separate PR with its own test matrix than stack it on this one.

The previous history had a Comment out failing test commit plus a Revert of it; those are squashed out by the force-push so the final diff is just the 10-line production fix and an untouched test file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Iceberg scoped invalidation does not detect deleted files

2 participants