API, CORE, Flink, Spark: Deprecate Snapshot Change Methods#15241
API, CORE, Flink, Spark: Deprecate Snapshot Change Methods#15241RussellSpitzer wants to merge 8 commits intoapache:mainfrom
Conversation
We currently offer several methods for getting files changed in a snapshot but they rely on the assumption that you can read the partition_spec from the manifest metadata. In advance of the move to Parquet Manifest, we'll be no longer able to rely on this part of the manifest read code. In this PR we deprecate those existing methods and create a new utility class which can do the same thing as the old Snapshot methods. The new utility class does not assume that the manifest read code can actually read the partition_spec info and instead takes it as an arguement. In production code there are only a small number of actual uses 1. CherryPickOperation 2. MicroBatches Within our other modules we also had a few usages Flink 1. TableChange Spark 2. MicroBatchStream Unfortuantely there are also a huge number of test usages of these methods, the majority of this commit is cleaning those up.
| @@ -112,7 +112,11 @@ public interface Snapshot extends Serializable { | |||
| * | |||
| * @param io a {@link FileIO} instance used for reading files from storage | |||
| * @return all data files added to the table in this snapshot. | |||
There was a problem hiding this comment.
The Main Deprecations are Here
There was a problem hiding this comment.
+1 for deprecating these methods. We've had issues with these in the past, like when we had to add FileIO to the signature. A utility class is the better option.
There was a problem hiding this comment.
Should we also deprecate the equivalent manifest methods?
| // Pick modifications from the snapshot | ||
| for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io)) { | ||
| SnapshotFileChanges changes = | ||
| SnapshotFileChanges.builder(cherrypickSnapshot, io, specsById).build(); |
There was a problem hiding this comment.
We technically don't have to use the cached utility here, but we use it in the other usage within this file so I thought it was a bit clearer this way.
There was a problem hiding this comment.
This looks reasonable to me. Cached or not is the internal impl of SnapshotFileChanges
| failMissingDeletePaths(); | ||
|
|
||
| // copy adds from the picked snapshot | ||
| // copy adds and deletes from the picked snapshot |
There was a problem hiding this comment.
| // copy adds and deletes from the picked snapshot | |
| // copy adds from the picked snapshot |
| @@ -0,0 +1,276 @@ | |||
| /* | |||
There was a problem hiding this comment.
This is the actual Utility we are switching to
There was a problem hiding this comment.
With V4 metadata change, the change detection can be more complicated with manfest DVs. I like this direction of moving the change detection out of the Snapshot class, which can just focus on core data structures.
This could be a good foundation for the change detection in the V4 adaptive tree.
In V4, if we are going to colocate DV (deleted old and added new) and data file, it might make sense to expose a combined result. Otherwise, the associations get split first and then need to be joined again.
There was a problem hiding this comment.
I think this is a good way to think about this class for reviews.
| * @param specsById a map of partition spec IDs to partition specs | ||
| * @return a new Builder | ||
| */ | ||
| public static Builder builder( |
There was a problem hiding this comment.
I think we may want to in the future extend this to take multiple snapshots so ti may make sense to break the api into a "specs,io" and "snapshot" seperately, but not now
There was a problem hiding this comment.
Why not pass everything in builder methods? Or do you want to keep those focused on just the snapshot configuration?
| } | ||
|
|
||
| private void cacheDataFileChanges() { | ||
| List<ManifestFile> changedManifests = |
There was a problem hiding this comment.
I changed the logic from the BaseSnapshot implementation to optionally use an ExecutorService. I want to save actually using that for a followup PR but I think it was probably a mistake that we had this single threaded before.
| } | ||
|
|
||
| private void cacheDeleteFileChanges() { | ||
| List<ManifestFile> changedManifests = |
There was a problem hiding this comment.
Similar to above this differs from the BaseSnapshot impl by using an optional executor service
| } | ||
|
|
||
| public static class Builder { | ||
| private final Snapshot snapshot; |
There was a problem hiding this comment.
I am attempting to hid some of the constructor details from the Changes Class with this builder, but that may be premature.
There was a problem hiding this comment.
I think this is a good idea, but I think you should choose what is required and what is used to configure the builder.
I might suggest passing a table in since this seems like a useful interface for callers that don't want to get lots of things from a table just to pass it in here.
SnapshotChanges changes = SnapshotChanges.builderFor(table)
.snapshot(id) // maybe this can support refs, too?
.executeWith(threadPool)
.build();
SnapshotChanges changes = SnapshotChanges.builderFor(table)
.startingSnapshot(start)
.endingSnapshot(end)
.executeWith(threadPool)
.build();|
|
||
| return metadata.snapshot(ref.snapshotId()); | ||
| } | ||
|
|
There was a problem hiding this comment.
A set of static methods which allow for getting just one element of the Changes class without actually constructing it. We can drop these as well but I think they make the test code refactor a bit smaller.
| @@ -31,6 +31,7 @@ | |||
| import org.apache.iceberg.relocated.com.google.common.collect.Iterables; | |||
There was a problem hiding this comment.
Another production code change here
| assertThat(table.schema().asStruct()).isEqualTo(expected.asStruct()); | ||
| assertThat(SnapshotUtil.schemaFor(table, tag).asStruct()).isEqualTo(initialSchema.asStruct()); | ||
| } | ||
|
|
There was a problem hiding this comment.
Actual Tests for the Utility
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
Show resolved
Hide resolved
| // If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP, | ||
| // iterate through addedFiles iterator to find addedFilesCount. | ||
| return addedFilesCount == -1 | ||
| ? Iterables.size(snapshot.addedDataFiles(table.io())) |
There was a problem hiding this comment.
This is the only Spark Usage, pretty straight forward fix
|
If it's easier for reviewers, I can also split this into "deprecate and utility" and then do all the production and test updates in a follow up PR. I just bundled this all up so that we would be able to avoid having a build that has active deprecation warnings. |
| @@ -0,0 +1,276 @@ | |||
| /* | |||
There was a problem hiding this comment.
With V4 metadata change, the change detection can be more complicated with manfest DVs. I like this direction of moving the change detection out of the Snapshot class, which can just focus on core data structures.
This could be a good foundation for the change detection in the V4 adaptive tree.
In V4, if we are going to colocate DV (deleted old and added new) and data file, it might make sense to expose a combined result. Otherwise, the associations get split first and then need to be joined again.
| // Pick modifications from the snapshot | ||
| for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io)) { | ||
| SnapshotFileChanges changes = | ||
| SnapshotFileChanges.builder(cherrypickSnapshot, io, specsById).build(); |
There was a problem hiding this comment.
This looks reasonable to me. Cached or not is the internal impl of SnapshotFileChanges
| * query multiple file change types for the same snapshot. By default, manifests are read | ||
| * sequentially. Use {@link Builder#executeWith(ExecutorService)} to enable parallel reading. | ||
| */ | ||
| public class SnapshotFileChanges { |
There was a problem hiding this comment.
can we keep this class as package private to start with? SnapshotUtil is in a diff package which prevents this.
Not sure if ChangelogUtil is a better place. Or maybe add a new ChangeDetectionUtil?
core/src/test/java/org/apache/iceberg/util/TestSnapshotUtil.java
Outdated
Show resolved
Hide resolved
core/src/test/java/org/apache/iceberg/util/TestSnapshotUtil.java
Outdated
Show resolved
Hide resolved
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableChange.java
Show resolved
Hide resolved
| Iterables.filter( | ||
| Iterables.filter( | ||
| snapshot.allManifests(io), | ||
| manifest -> manifest.content() == ManifestContent.DATA), | ||
| manifest -> Objects.equals(manifest.snapshotId(), snapshot.snapshotId()))); |
There was a problem hiding this comment.
nit: why two filters? Could we combine them?
There was a problem hiding this comment.
Just being lazy and mimicking the BaseSnapshot code, Let me tighten that up
| } | ||
| return addedDataFiles; |
There was a problem hiding this comment.
We need a prompt for the AIs to add newlines 🤖
There was a problem hiding this comment.
We can do a "agents.md" i know a bunch of other projects have done this
There was a problem hiding this comment.
Although if we really want this to not be an issue we need to encode it in our style rules
| } | ||
|
|
||
| /** Returns all data files removed from the table in this snapshot. */ | ||
| public Iterable<DataFile> removedDataFiles() { |
There was a problem hiding this comment.
Do we want Iterable or Iterator?
Iterator could be a bit more flexible, if we end up in a situation where the data don't fit into memory, but we might end up duplicating data.
Minimally we should return immutable results
There was a problem hiding this comment.
Currently i'm just mimicing the old signature, but your right it make be nice to change to interator although our current implementation is just iterable. I'm wondering for "iterator" invocations if we should change the implementation of the helper methods I added to SnapshotUtil since this class is basically for caching.
There was a problem hiding this comment.
Iterable is the right class.
Iterable provides something that can be iterated over multiple times, each time producing a new Iterator. That gives us the most flexibility, instead of forcing the caller to go back to this API to process removed data files another time -- the result of this method can be passed around and used without exposing this.
Iterable also does not need to load anything into memory. A List is Iterable, but so is ManifestReader that loads chunks of data at a time.
Java's enhanced for syntax is another reason to use Iterable:
// this works
for (DataFile removed : changes.removedDataFiles()) { ... }Otherwise you have to wrap to produce an Iterable:
// this has to create an Iterable using a lambda
for (DataFile removed : (Iterable<DataFile>) () -> changes.removedDataFiles()) { ... }
core/src/test/java/org/apache/iceberg/util/TestSnapshotUtil.java
Outdated
Show resolved
Hide resolved
| * | ||
| * @param io a {@link FileIO} instance used for reading files from storage | ||
| * @return all data files added to the table in this snapshot. | ||
| * @deprecated will be removed in 2.0.0; use org.apache.iceberg.SnapshotChanges#builder(Snapshot, |
There was a problem hiding this comment.
It's probably worth an import for SnapshotChanges to avoid the fully-qualified names in javadoc.
| Iterables.filter( | ||
| Iterables.filter( | ||
| snapshot.allManifests(io), | ||
| manifest -> manifest.content() == ManifestContent.DATA), |
There was a problem hiding this comment.
snapshot.dataManifests(io)?
| .stopOnFailure() | ||
| .throwFailureWhenFinished() | ||
| .executeWith(executorService) | ||
| .run(manifest -> fileChangesByManifest.add(readDataFileChanges(manifest))); |
There was a problem hiding this comment.
You may want to use ParallelIterable instead of directly building on Tasks. The benefit of using ParallelIterable is that it manages the queue internally, doubles the number of running tasks (2x the size of the thread pool), handles closing for CloseableIterable, and allows you to consume the results while threads are running. The main benefit would be higher parallelism, but this would also be a little simpler because the original implementation that used ManifestGroup already supports passing a threadpool if you call planWith.
Was the intent to avoid complication by replacing ManifestGroup?
There was a problem hiding this comment.
Oh, I see. The delete code used readDeleteManifest directly, so this is probably just being consistent.
I think I'd still recommend using ParallelIterable. Here's what I came up with:
private Iterable<Pair<ManifestEntry.Status, DeleteFile>> readDeleteFiles(ManifestFile manifest) {
Iterable<ManifestEntry<DeleteFile>> entries = ManifestFiles.readDeleteManifest(manifest, fileIO, null).entries();
Iterable<Pair<ManifestEntry.Status, DeleteFile>> copied = Iterables.transform(entries, entry ->
switch (entry.status()) {
case ADDED -> Pair.of(ManifestEntry.Status.ADDED, entry.file().copy());
case DELETED -> Pair.of(ManifestEntry.Status.DELETED, entry.file().copyWithoutStats());
default -> null;
});
return Iterables.filter(copied, java.util.Objects::nonNull);
}
private void cacheDeleteFileChanges() {
ImmutableList.Builder<DeleteFile> adds = ImmutableList.builder();
ImmutableList.Builder<DeleteFile> deletes = ImmutableList.builder();
Iterable<ManifestFile> changedManifests =
Iterables.filter(
deleteManifests(fileIO), manifest -> Objects.equal(manifest.snapshotId(), snapshotId));
Iterable<Iterable<Pair<ManifestEntry.Status, DeleteFile>>> changedDeletes = Iterables.transform(changedManifests, this::readDeleteFiles);
try (CloseableIterable<Pair<ManifestEntry.Status, DeleteFile>> pairs = new ParallelIterable<>(changedDeletes, ThreadPools.getWorkerPool())) {
for (Pair<ManifestEntry.Status, DeleteFile> delete : pairs) {
switch (delete.first()) {
case ADDED -> adds.add(delete.second());
case DELETED -> deletes.add(delete.second());
}
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to close manifest reader", e);
}
this.addedDeleteFiles = adds.build();
this.removedDeleteFiles = deletes.build();
}This uses Pair because the files need to be copied before returning them to ParallelIterable, but that would lose the added/deleted status. This is the kind of thing that inspired inverting the manifest_entry / delete_file relationship in v4 so that delete_file contains tracking_info.
Context
This is part of the write metadata with columnar formats change. When we start writing parquet manifests, calls to ManifestReader() without passing through the partitionSpecByID will error out. The reason for the error is that we have no way of reading the metadata in the new file APIs and we have decided we aren't going add it in the future.
Snapshot.addedFiles and it's friends are some of the main users of this ManifestRead(path, IO) path (that aren't test code) so we need to remove those methods and switch our usage to a version which passes through partitionSpecByID. Otherwise switching to parquet manifests will cause issues throughout the codebase.
See #13769
This PR
We currently offer several methods for getting files changed in a snapshot but they rely on the assumption that you can read the partition_spec from the manifest metadata. In advance of the move to Parquet Manifest, we'll be no longer able to rely on this part of the manifest read code.
In this PR we deprecate those existing methods and create a new utility class which can do the same thing as the old Snapshot methods. The new utility class does not assume that the manifest read code can actually read the partition_spec info and instead takes it as an arguement.
Production Code Changes
Core
Flink
Spark
Test Changes
Unfortunately there are also a huge number of test usages of these methods, the majority of this commit is cleaning those up.
As a disclaimer, I did use Cursor and Claude code when writing this PR, It did the majority of the test refactoring although I have checked them all as well.