Add tests for Iceberg data type fidelity, batching, compact strings, partition evolution, error handling, and FROM SNAPSHOT#264
Conversation
Format-agnostic tests for 6 companion operations: - AggregatePushdownCorrectness: 11 tests — COUNT/SUM/AVG/MIN/MAX, GROUP BY (exact values) - BucketAggregation: 9 tests — Histogram/DateHistogram/Range with per-bucket verification - DropPartitions: 9 tests — equality/range/compound predicates, cumulative drops, re-sync - MergeSplits: 9 tests — companion field preservation, data/agg/filter correctness post-merge - Purge: 9 tests — orphan deletion, retention, DRY RUN, data integrity - TruncateTimeTravel: 9 tests — version cleanup, checkpoint creation, metadata preservation 3 tests intentionally fail, catching real bugs: - Merged Iceberg companion splits fail file:// parquet path resolution - Iceberg sync returns "success" instead of "no_action" for unchanged snapshots - Iceberg file deletion not detected by companion anti-join Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
please see previous comments about not merging failing tests without fixes |
The native Rust layer received file:// URIs as parquetTableRoot for local Iceberg companions but only supports bare filesystem paths or cloud URIs. - ConfigUtils.normalizeStorageUrl(): strip file:// to bare paths - SyncTaskExecutor.extractTableBasePath(): add file:// to scheme pattern - Fix merge test data layout to use common data directory - Remove known-bug comments from test files Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Five locations stripped file:// URIs using different approaches (URI.getPath, substring, stripPrefix), two with bugs: SyncTaskExecutor.downloadFile had unguarded URI.getPath (NPE risk), LocalCopyDownloader.normalizeLocalPath produced wrong result for file://hostname/path URIs. Adds CloudPathUtils.stripFileScheme with URI.getPath + null/exception guards and substring fallback for malformed URIs (e.g., unescaped spaces). All five call sites now delegate to this shared utility. Adds unit tests for both stripFileScheme and extractTableBasePath. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- IcebergRoundTripTest: 6 tests — scalar/Date/Timestamp/Array/Struct/Map/partitioned data fidelity - IcebergBatchingSyncTest: 4 tests — batchSize/maxConcurrentBatches configuration - IcebergCompactStringTest: 4 tests — exact_only/text_uuid_exactonly/text_uuid_strip modes - IcebergPartitionEvolutionTest: 3 tests — unpartitioned→partitioned, add partition, mixed specs - IcebergMiscFeaturesTest: 8 tests — error handling, write guard, fast field modes, FROM SNAPSHOT 2 tests intentionally fail, catching real bugs: - Partitioned round-trip: file:// parquet path resolution failure - FROM SNAPSHOT: returns wrong snapshot ID in result row Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
appendSnapshot wrote each batch's parquet files to separate batch-N/ subdirectories, but extractTableBasePath() derives the storage root from the first file's path. Files from other batches couldn't be resolved relative to that root, causing "Failed to create parquet stream builder" on partitioned tests. Real Iceberg tables store all data files under a single data/ directory — mirror that layout so the test reflects production behavior. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
61b7d2a to
9b7afd9
Compare
FROM SNAPSHOT was incorrectly implemented as an incremental filter (get changes after this snapshot) instead of time-travel (build companion from the state at this snapshot). This caused the wrong files to be synced and the wrong snapshot ID reported in the result. Two changes: - SyncToExternalCommand: pass fromSnapshot as snapshotId (time-travel) instead of fromSnapshotId (incremental filter) - IcebergSourceReader.sourceVersion: return the requested snapshot ID when one was explicitly provided, instead of the first file entry's ID Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…hods Six Iceberg test files each had their own copy of appendSnapshot with slight variations. Four still used the old direct-write pattern that breaks on multi-batch tests. Consolidated into IcebergSnapshotHelper trait in IcebergTestSupport.scala with staging+move to a common data/ directory, optional partitionPath, and explicit schema parameter. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Changes since initial reviewBug fixes (production code)Fix Fix FROM SNAPSHOT time-travel semantics (#270) —
Fix IcebergRoundTripTest parquet directory layout — Refactoring (test code)Extract shared Merge order
|
schenksj
left a comment
There was a problem hiding this comment.
Code Review
Overall: LGTM with minor nits. Two solid bug fixes backed by comprehensive tests. The file: URI consolidation is clean housekeeping.
Production changes
CloudPathUtils.stripFileScheme() (new method)
Implementation is correct. URI parsing handles all valid file: forms including percent-encoded characters, with a safe fallback for malformed URIs.
One small gap: the stripFileSchemeBySubstring fallback only handles file:/// and file:/ — file://hostname/path (UNC-style) would fall through to else path and be returned unchanged. This is an obscure edge case on Linux/Mac and unlikely to matter in practice, but worth noting.
LocalCopyDownloader — behavior improvement
The old normalizeLocalPath used stripPrefix("file://"), which meant file:///path/to/file would return /path/to/file correctly, but file://hostname/path would return hostname/path (wrong). The new code is strictly more correct.
SyncTaskExecutor — local copy path
Old code: new java.net.URI(sourcePath).getPath — throws URISyntaxException on paths with unescaped spaces (e.g., Iceberg tables in directories with spaces). New code handles this gracefully via the fallback. Good defensive improvement.
SyncTaskExecutor.extractTableBasePath — scheme regex
Adding file:// to the scheme pattern is correct for local testing: file:///path/to/table/part-00000.parquet → file:///path/to/table (scheme preserved in output). Verified by the new ExtractTableBasePathTest.
SyncToExternalCommand — FROM SNAPSHOT semantics fix
Clear and correct. The previous code was passing the user-specified snapshot ID as fromSnapshotId (incremental filter: "give me changes after this snapshot") when it should be snapshotId (time-travel: "give me the state at this snapshot"). The old comment even acknowledged it was doing time-travel wrong.
IcebergSourceReader.sourceVersion()
Previous logic:
if (entries.isEmpty) snapshotId
else Some(entries.get(0).getSnapshotId) // ignores explicit snapshotId!When a caller specified snapshotId = Some(42L) and there were entries, the old code returned the first entry's snapshot ID instead of 42L. The new snapshotId.orElse { ... } is correct: explicit snapshot ID wins, fall back to entry derivation only when none was requested.
Tests
Test coverage is thorough — data-type fidelity, batching, compact strings, partition evolution, error paths, write guard, fast field modes, FROM SNAPSHOT, plus Delta/Iceberg parity tests for all the SQL commands (MERGE SPLITS, PURGE, DROP PARTITIONS, TRUNCATE TIME TRAVEL).
Nit — incorrect comment in CompanionAggregatePushdownCorrectnessTest:
// department | rows | SUM(score) | MIN(score) | MAX(score) | AVG(score)
// engineering | 8 | 780.0 | 50.0 | 150.0 | 97.5
// marketing | 6 | 510.0 | 60.0 | 120.0 | 85.0 ← MAX should be 105.0
// sales | 6 | 510.0 | 55.0 | 130.0 | 85.0 ← MAX should be 120.0The actual data has marketing MAX = 105.0 (lisa) and sales MAX = 120.0 (pat), which matches DeptExpected correctly — it's only the header comment that's wrong. Won't cause test failures, but will confuse anyone trying to manually verify the values.
No blocking issues. The two bug fixes are clearly correct and well-tested.
schenksj
left a comment
There was a problem hiding this comment.
Blocking issue: streaming/incremental sync is broken by the FROM SNAPSHOT fix
The FROM SNAPSHOT semantics fix is correct for the SQL command, but it introduces a regression in streaming/incremental sync because fromSnapshot is doing double duty.
The broken call chain
SyncToExternalCommand.scala line 132 (not touched by this PR) — streaming manager reuses fromSnapshot as the incremental resume pointer:
case Some(v) if sourceFormat == "iceberg" =>
copy(fromSnapshot = Some(v), streamingPollIntervalMs = None)Before this PR, that fromSnapshot value was routed to fromSnapshotId in scanIcebergTable, which triggers the manifest-diff fast path in DistributedSourceScanner (the fromSnapshotId match { case Some(fsnap) => ... } block):
// PRE-PR (incremental works):
scanIcebergTable(..., snapshotId = None, fromSnapshotId = fromSnapshot)After this PR:
// POST-PR (incremental broken):
scanIcebergTable(..., snapshotId = fromSnapshot, fromSnapshotId = None)fromSnapshotId is now always None, so the manifest-diff path is dead code for all streaming cycles.
The consequence is worse than an expensive anti-join
scanIcebergTable(snapshotId=Some(lastSnapshot), fromSnapshotId=None)time-travels to the old snapshot — returns the same file list as the previous cycleIcebergSourceReader.sourceVersion()returnssnapshotId = Some(lastSnapshot)— unchangedlastSyncedVersionnever advances- New data written to the Iceberg table after the first sync is never picked up
Fix
fromSnapshot needs to be split into two separate parameters so SQL FROM SNAPSHOT and streaming resume don't share a field:
case class SyncToExternalCommand(
...
fromSnapshot: Option[Long] = None, // SQL FROM SNAPSHOT <id> → time-travel
incrementalFromSnapshot: Option[Long] = None, // streaming resume → fromSnapshotId (manifest diff)
...
)Streaming manager (line 132) changes to:
case Some(v) if sourceFormat == "iceberg" =>
copy(incrementalFromSnapshot = Some(v), streamingPollIntervalMs = None)And the distributed scan call site uses the correct field for each path:
scanIcebergTable(...,
snapshotId = fromSnapshot, // time-travel from SQL FROM SNAPSHOT
fromSnapshotId = incrementalFromSnapshot // manifest-diff for streaming
)Similarly IcebergSourceReader should receive fromSnapshot (time-travel) and not incrementalFromSnapshot for its snapshotId.
Why tests don't catch this
The FROM SNAPSHOT tests in IcebergMiscFeaturesTest verify single-cycle builds from a specific snapshot. There's no test that runs two streaming cycles with new data appended between them and asserts that the second cycle picks up the new rows.
schenksj
left a comment
There was a problem hiding this comment.
This PR also breaks incremental sync. Please see comments.
Summary
Tests Iceberg-specific features that were previously untested or only tested for Delta/Parquet (25 tests total), plus fixes for two bugs discovered by the tests.
Bug fixes
Fix FROM SNAPSHOT time-travel semantics (#270) —
FROM SNAPSHOT <id>was incorrectly implemented as an incremental filter (get changes after this snapshot) instead of time-travel (build companion from the state at this snapshot). ChangedSyncToExternalCommandto passfromSnapshotassnapshotId(time-travel) and fixedIcebergSourceReader.sourceVersion()to return the requested snapshot ID.Fix IcebergRoundTripTest data directory layout — Parquet files were written to separate
batch-N/subdirectories, butextractTableBasePath()derives the storage root from the first file's path. Moved to a commondata/directory mirroring real Iceberg table layout.What's tested
Data type round-trip fidelity (6 tests) — Writes data to Iceberg, builds companion, reads back, and compares every field of every row. Covers scalar types (Int, Long, Double, String, Boolean), Date, Timestamp, Array[String], Struct, Map[String,String], and partitioned mixed-type tables.
Batching control (4 tests) — Verifies
batchSizeandmaxConcurrentBatchesconfigs affect how many commits are made during companion build. Tests exact state directory counts with batchSize=1 vs default, and batching behavior during incremental sync.Compact string indexing modes (4 tests) — Verifies INDEXING MODES clause works with Iceberg:
exact_only(equality filter only),text_uuid_exactonly(text search + UUID preservation),text_uuid_strip(text search, UUIDs stripped), and multiple modes combined.Partition evolution (3 tests) — Verifies companion handles Iceberg partition spec changes: table evolving from unpartitioned to partitioned, adding a second partition column, and syncing files from mixed partition specs in a single pass.
Error handling (2 tests) — Verifies companion returns
status="error"for non-existent Iceberg table and unreachable catalog (not an exception).Write guard (1 test) — Verifies direct write to an Iceberg companion index is rejected with a clear error message.
Fast field modes (3 tests) — Verifies DISABLED and PARQUET_ONLY modes store correct metadata and produce readable companions (HYBRID was already tested).
FROM SNAPSHOT (2 tests) — Verifies building from a specific Iceberg snapshot syncs only that snapshot's data, and that a non-existent snapshot returns error.
Test plan
All 25 tests pass.
🤖 Generated with Claude Code