Skip to content

Add tests for Iceberg data type fidelity, batching, compact strings, partition evolution, error handling, and FROM SNAPSHOT#264

Closed
tlee732 wants to merge 7 commits into
indextables:mainfrom
tlee732:test/iceberg-features
Closed

Add tests for Iceberg data type fidelity, batching, compact strings, partition evolution, error handling, and FROM SNAPSHOT#264
tlee732 wants to merge 7 commits into
indextables:mainfrom
tlee732:test/iceberg-features

Conversation

@tlee732
Copy link
Copy Markdown
Contributor

@tlee732 tlee732 commented Mar 26, 2026

Summary

Tests Iceberg-specific features that were previously untested or only tested for Delta/Parquet (25 tests total), plus fixes for two bugs discovered by the tests.

Bug fixes

Fix FROM SNAPSHOT time-travel semantics (#270)FROM SNAPSHOT <id> was incorrectly implemented as an incremental filter (get changes after this snapshot) instead of time-travel (build companion from the state at this snapshot). Changed SyncToExternalCommand to pass fromSnapshot as snapshotId (time-travel) and fixed IcebergSourceReader.sourceVersion() to return the requested snapshot ID.

Fix IcebergRoundTripTest data directory layout — Parquet files were written to separate batch-N/ subdirectories, but extractTableBasePath() derives the storage root from the first file's path. Moved to a common data/ directory mirroring real Iceberg table layout.

What's tested

Data type round-trip fidelity (6 tests) — Writes data to Iceberg, builds companion, reads back, and compares every field of every row. Covers scalar types (Int, Long, Double, String, Boolean), Date, Timestamp, Array[String], Struct, Map[String,String], and partitioned mixed-type tables.

Batching control (4 tests) — Verifies batchSize and maxConcurrentBatches configs affect how many commits are made during companion build. Tests exact state directory counts with batchSize=1 vs default, and batching behavior during incremental sync.

Compact string indexing modes (4 tests) — Verifies INDEXING MODES clause works with Iceberg: exact_only (equality filter only), text_uuid_exactonly (text search + UUID preservation), text_uuid_strip (text search, UUIDs stripped), and multiple modes combined.

Partition evolution (3 tests) — Verifies companion handles Iceberg partition spec changes: table evolving from unpartitioned to partitioned, adding a second partition column, and syncing files from mixed partition specs in a single pass.

Error handling (2 tests) — Verifies companion returns status="error" for non-existent Iceberg table and unreachable catalog (not an exception).

Write guard (1 test) — Verifies direct write to an Iceberg companion index is rejected with a clear error message.

Fast field modes (3 tests) — Verifies DISABLED and PARQUET_ONLY modes store correct metadata and produce readable companions (HYBRID was already tested).

FROM SNAPSHOT (2 tests) — Verifies building from a specific Iceberg snapshot syncs only that snapshot's data, and that a non-existent snapshot returns error.

Test plan

export JAVA_HOME=/opt/homebrew/opt/openjdk@11
mvn test-compile scalatest:test -DwildcardSuites='io.indextables.spark.sync.IcebergRoundTripTest,io.indextables.spark.sync.IcebergBatchingSyncTest,io.indextables.spark.sync.IcebergCompactStringTest,io.indextables.spark.sync.IcebergPartitionEvolutionTest,io.indextables.spark.sync.IcebergMiscFeaturesTest'

All 25 tests pass.

🤖 Generated with Claude Code

Format-agnostic tests for 6 companion operations:
- AggregatePushdownCorrectness: 11 tests — COUNT/SUM/AVG/MIN/MAX, GROUP BY (exact values)
- BucketAggregation: 9 tests — Histogram/DateHistogram/Range with per-bucket verification
- DropPartitions: 9 tests — equality/range/compound predicates, cumulative drops, re-sync
- MergeSplits: 9 tests — companion field preservation, data/agg/filter correctness post-merge
- Purge: 9 tests — orphan deletion, retention, DRY RUN, data integrity
- TruncateTimeTravel: 9 tests — version cleanup, checkpoint creation, metadata preservation

3 tests intentionally fail, catching real bugs:
- Merged Iceberg companion splits fail file:// parquet path resolution
- Iceberg sync returns "success" instead of "no_action" for unchanged snapshots
- Iceberg file deletion not detected by companion anti-join

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@tlee732 tlee732 changed the title Add 25 Iceberg feature parity tests (F5, F7-F10, F13-F15, F17) Add tests for Iceberg data type fidelity, batching, compact strings, partition evolution, error handling, and FROM SNAPSHOT Mar 26, 2026
@schenksj
Copy link
Copy Markdown
Collaborator

please see previous comments about not merging failing tests without fixes

@schenksj schenksj marked this pull request as draft March 27, 2026 01:06
tlee732 and others added 4 commits March 29, 2026 11:56
The native Rust layer received file:// URIs as parquetTableRoot for local
Iceberg companions but only supports bare filesystem paths or cloud URIs.

- ConfigUtils.normalizeStorageUrl(): strip file:// to bare paths
- SyncTaskExecutor.extractTableBasePath(): add file:// to scheme pattern
- Fix merge test data layout to use common data directory
- Remove known-bug comments from test files

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Five locations stripped file:// URIs using different approaches (URI.getPath,
substring, stripPrefix), two with bugs: SyncTaskExecutor.downloadFile had
unguarded URI.getPath (NPE risk), LocalCopyDownloader.normalizeLocalPath
produced wrong result for file://hostname/path URIs.

Adds CloudPathUtils.stripFileScheme with URI.getPath + null/exception guards
and substring fallback for malformed URIs (e.g., unescaped spaces). All five
call sites now delegate to this shared utility. Adds unit tests for both
stripFileScheme and extractTableBasePath.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- IcebergRoundTripTest: 6 tests — scalar/Date/Timestamp/Array/Struct/Map/partitioned data fidelity
- IcebergBatchingSyncTest: 4 tests — batchSize/maxConcurrentBatches configuration
- IcebergCompactStringTest: 4 tests — exact_only/text_uuid_exactonly/text_uuid_strip modes
- IcebergPartitionEvolutionTest: 3 tests — unpartitioned→partitioned, add partition, mixed specs
- IcebergMiscFeaturesTest: 8 tests — error handling, write guard, fast field modes, FROM SNAPSHOT

2 tests intentionally fail, catching real bugs:
- Partitioned round-trip: file:// parquet path resolution failure
- FROM SNAPSHOT: returns wrong snapshot ID in result row

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
appendSnapshot wrote each batch's parquet files to separate batch-N/
subdirectories, but extractTableBasePath() derives the storage root from
the first file's path. Files from other batches couldn't be resolved
relative to that root, causing "Failed to create parquet stream builder"
on partitioned tests. Real Iceberg tables store all data files under a
single data/ directory — mirror that layout so the test reflects
production behavior.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@tlee732 tlee732 force-pushed the test/iceberg-features branch from 61b7d2a to 9b7afd9 Compare March 29, 2026 21:56
tlee732 and others added 2 commits March 29, 2026 15:09
FROM SNAPSHOT was incorrectly implemented as an incremental filter (get
changes after this snapshot) instead of time-travel (build companion from
the state at this snapshot). This caused the wrong files to be synced and
the wrong snapshot ID reported in the result.

Two changes:
- SyncToExternalCommand: pass fromSnapshot as snapshotId (time-travel)
  instead of fromSnapshotId (incremental filter)
- IcebergSourceReader.sourceVersion: return the requested snapshot ID
  when one was explicitly provided, instead of the first file entry's ID

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…hods

Six Iceberg test files each had their own copy of appendSnapshot with
slight variations. Four still used the old direct-write pattern that
breaks on multi-batch tests. Consolidated into IcebergSnapshotHelper
trait in IcebergTestSupport.scala with staging+move to a common data/
directory, optional partitionPath, and explicit schema parameter.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@tlee732
Copy link
Copy Markdown
Contributor Author

tlee732 commented Mar 29, 2026

Changes since initial review

Bug fixes (production code)

Fix file:// URI path resolution for Iceberg companions — Rebased onto PR #262 which consolidates file:// URI stripping into CloudPathUtils.stripFileScheme(). This fixes the file:///path/path conversion that the native Rust layer needs for local Iceberg companions. (Inherited from #262, not new code in this PR.)

Fix FROM SNAPSHOT time-travel semantics (#270)FROM SNAPSHOT <id> was incorrectly implemented as an incremental filter (get changes added after this snapshot) rather than time-travel (build companion from the state at this snapshot). Two changes:

  • SyncToExternalCommand: pass fromSnapshot as snapshotId (time-travel) instead of fromSnapshotId (incremental filter)
  • IcebergSourceReader.sourceVersion(): return the requested snapshot ID when explicitly provided, instead of the first file entry's ID

Fix IcebergRoundTripTest parquet directory layoutappendSnapshot wrote each batch to separate parquet-data/batch-N/ subdirectories, but extractTableBasePath() derives the storage root from the first file's path. Files from other batches couldn't be resolved relative to that root. Changed to a common data/ directory mirroring real Iceberg table layout.

Refactoring (test code)

Extract shared IcebergSnapshotHelper trait — Six Iceberg test files each had their own copy of appendSnapshot with slight variations. Four still used the old direct-write pattern that breaks on multi-batch tests. Consolidated into IcebergSnapshotHelper trait in IcebergTestSupport.scala with staging+move to a common data/ directory, optional partitionPath, and explicit schema parameter. Net reduction: -214 lines across 8 files.

Merge order

This PR must be merged after #262 (test/companion-format-agnostic-ops). This branch is rebased on #262 and depends on:

Merging out of order will cause compilation failures.

@tlee732 tlee732 marked this pull request as ready for review March 30, 2026 05:47
Copy link
Copy Markdown
Collaborator

@schenksj schenksj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

Overall: LGTM with minor nits. Two solid bug fixes backed by comprehensive tests. The file: URI consolidation is clean housekeeping.


Production changes

CloudPathUtils.stripFileScheme() (new method)

Implementation is correct. URI parsing handles all valid file: forms including percent-encoded characters, with a safe fallback for malformed URIs.

One small gap: the stripFileSchemeBySubstring fallback only handles file:/// and file:/file://hostname/path (UNC-style) would fall through to else path and be returned unchanged. This is an obscure edge case on Linux/Mac and unlikely to matter in practice, but worth noting.

LocalCopyDownloader — behavior improvement

The old normalizeLocalPath used stripPrefix("file://"), which meant file:///path/to/file would return /path/to/file correctly, but file://hostname/path would return hostname/path (wrong). The new code is strictly more correct.

SyncTaskExecutor — local copy path

Old code: new java.net.URI(sourcePath).getPath — throws URISyntaxException on paths with unescaped spaces (e.g., Iceberg tables in directories with spaces). New code handles this gracefully via the fallback. Good defensive improvement.

SyncTaskExecutor.extractTableBasePath — scheme regex

Adding file:// to the scheme pattern is correct for local testing: file:///path/to/table/part-00000.parquetfile:///path/to/table (scheme preserved in output). Verified by the new ExtractTableBasePathTest.

SyncToExternalCommand — FROM SNAPSHOT semantics fix

Clear and correct. The previous code was passing the user-specified snapshot ID as fromSnapshotId (incremental filter: "give me changes after this snapshot") when it should be snapshotId (time-travel: "give me the state at this snapshot"). The old comment even acknowledged it was doing time-travel wrong.

IcebergSourceReader.sourceVersion()

Previous logic:

if (entries.isEmpty) snapshotId
else Some(entries.get(0).getSnapshotId)   // ignores explicit snapshotId!

When a caller specified snapshotId = Some(42L) and there were entries, the old code returned the first entry's snapshot ID instead of 42L. The new snapshotId.orElse { ... } is correct: explicit snapshot ID wins, fall back to entry derivation only when none was requested.


Tests

Test coverage is thorough — data-type fidelity, batching, compact strings, partition evolution, error paths, write guard, fast field modes, FROM SNAPSHOT, plus Delta/Iceberg parity tests for all the SQL commands (MERGE SPLITS, PURGE, DROP PARTITIONS, TRUNCATE TIME TRAVEL).

Nit — incorrect comment in CompanionAggregatePushdownCorrectnessTest:

// department  | rows | SUM(score) | MIN(score) | MAX(score) | AVG(score)
// engineering |   8  |   780.0    |   50.0     |   150.0    |   97.5
// marketing   |   6  |   510.0    |   60.0     |   120.0    |   85.0   ← MAX should be 105.0
// sales       |   6  |   510.0    |   55.0     |   130.0    |   85.0   ← MAX should be 120.0

The actual data has marketing MAX = 105.0 (lisa) and sales MAX = 120.0 (pat), which matches DeptExpected correctly — it's only the header comment that's wrong. Won't cause test failures, but will confuse anyone trying to manually verify the values.


No blocking issues. The two bug fixes are clearly correct and well-tested.

Copy link
Copy Markdown
Collaborator

@schenksj schenksj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking issue: streaming/incremental sync is broken by the FROM SNAPSHOT fix

The FROM SNAPSHOT semantics fix is correct for the SQL command, but it introduces a regression in streaming/incremental sync because fromSnapshot is doing double duty.

The broken call chain

SyncToExternalCommand.scala line 132 (not touched by this PR) — streaming manager reuses fromSnapshot as the incremental resume pointer:

case Some(v) if sourceFormat == "iceberg" =>
  copy(fromSnapshot = Some(v), streamingPollIntervalMs = None)

Before this PR, that fromSnapshot value was routed to fromSnapshotId in scanIcebergTable, which triggers the manifest-diff fast path in DistributedSourceScanner (the fromSnapshotId match { case Some(fsnap) => ... } block):

// PRE-PR (incremental works):
scanIcebergTable(..., snapshotId = None, fromSnapshotId = fromSnapshot)

After this PR:

// POST-PR (incremental broken):
scanIcebergTable(..., snapshotId = fromSnapshot, fromSnapshotId = None)

fromSnapshotId is now always None, so the manifest-diff path is dead code for all streaming cycles.

The consequence is worse than an expensive anti-join

  1. scanIcebergTable(snapshotId=Some(lastSnapshot), fromSnapshotId=None) time-travels to the old snapshot — returns the same file list as the previous cycle
  2. IcebergSourceReader.sourceVersion() returns snapshotId = Some(lastSnapshot) — unchanged
  3. lastSyncedVersion never advances
  4. New data written to the Iceberg table after the first sync is never picked up

Fix

fromSnapshot needs to be split into two separate parameters so SQL FROM SNAPSHOT and streaming resume don't share a field:

case class SyncToExternalCommand(
  ...
  fromSnapshot: Option[Long] = None,             // SQL FROM SNAPSHOT <id> → time-travel
  incrementalFromSnapshot: Option[Long] = None,  // streaming resume → fromSnapshotId (manifest diff)
  ...
)

Streaming manager (line 132) changes to:

case Some(v) if sourceFormat == "iceberg" =>
  copy(incrementalFromSnapshot = Some(v), streamingPollIntervalMs = None)

And the distributed scan call site uses the correct field for each path:

scanIcebergTable(...,
  snapshotId      = fromSnapshot,           // time-travel from SQL FROM SNAPSHOT
  fromSnapshotId  = incrementalFromSnapshot // manifest-diff for streaming
)

Similarly IcebergSourceReader should receive fromSnapshot (time-travel) and not incrementalFromSnapshot for its snapshotId.

Why tests don't catch this

The FROM SNAPSHOT tests in IcebergMiscFeaturesTest verify single-cycle builds from a specific snapshot. There's no test that runs two streaming cycles with new data appended between them and asserts that the second cycle picks up the new rows.

Copy link
Copy Markdown
Collaborator

@schenksj schenksj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR also breaks incremental sync. Please see comments.

@tlee732
Copy link
Copy Markdown
Contributor Author

tlee732 commented Apr 1, 2026

Superseded by #286, which combines this PR with #263 and addresses all reviewer feedback (streaming resume separation, enhanced manifest-diff with deletion detection, invalidation-only commit path).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants