Skip to content

API, CORE, Flink, Spark: Deprecate Snapshot Change Methods#15241

Open
RussellSpitzer wants to merge 8 commits intoapache:mainfrom
RussellSpitzer:DeprecateSnapshotIOMethods
Open

API, CORE, Flink, Spark: Deprecate Snapshot Change Methods#15241
RussellSpitzer wants to merge 8 commits intoapache:mainfrom
RussellSpitzer:DeprecateSnapshotIOMethods

Conversation

@RussellSpitzer
Copy link
Member

@RussellSpitzer RussellSpitzer commented Feb 5, 2026

Context

This is part of the write metadata with columnar formats change. When we start writing parquet manifests, calls to ManifestReader() without passing through the partitionSpecByID will error out. The reason for the error is that we have no way of reading the metadata in the new file APIs and we have decided we aren't going add it in the future.

Snapshot.addedFiles and it's friends are some of the main users of this ManifestRead(path, IO) path (that aren't test code) so we need to remove those methods and switch our usage to a version which passes through partitionSpecByID. Otherwise switching to parquet manifests will cause issues throughout the codebase.

See #13769

This PR

We currently offer several methods for getting files changed in a snapshot but they rely on the assumption that you can read the partition_spec from the manifest metadata. In advance of the move to Parquet Manifest, we'll be no longer able to rely on this part of the manifest read code.

In this PR we deprecate those existing methods and create a new utility class which can do the same thing as the old Snapshot methods. The new utility class does not assume that the manifest read code can actually read the partition_spec info and instead takes it as an arguement.

Production Code Changes

Core
  1. CherryPickOperation
  2. MicroBatches
Flink
  1. TableChange
Spark
  1. MicroBatchStream

Test Changes

Unfortunately there are also a huge number of test usages of these methods, the majority of this commit is cleaning those up.

As a disclaimer, I did use Cursor and Claude code when writing this PR, It did the majority of the test refactoring although I have checked them all as well.

We currently offer several methods for getting files changed in
a snapshot but they rely on the assumption that you can read the
partition_spec from the manifest metadata. In advance of the
move to Parquet Manifest, we'll be no longer able to rely on
this part of the manifest read code.

In this PR we deprecate those existing methods and create a
new utility class which can do the same thing as the old
Snapshot methods. The new utility class does not assume
that the manifest read code can actually read the partition_spec
info and instead takes it as an arguement.

In production code there are only a small number of actual
uses

1. CherryPickOperation
2. MicroBatches

Within our other modules we also had a few usages

Flink
1. TableChange

Spark
2. MicroBatchStream

Unfortuantely there are also a huge number of test usages of these
methods, the majority of this commit is cleaning those up.
@@ -112,7 +112,11 @@ public interface Snapshot extends Serializable {
*
* @param io a {@link FileIO} instance used for reading files from storage
* @return all data files added to the table in this snapshot.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Main Deprecations are Here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for deprecating these methods. We've had issues with these in the past, like when we had to add FileIO to the signature. A utility class is the better option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also deprecate the equivalent manifest methods?

// Pick modifications from the snapshot
for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io)) {
SnapshotFileChanges changes =
SnapshotFileChanges.builder(cherrypickSnapshot, io, specsById).build();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We technically don't have to use the cached utility here, but we use it in the other usage within this file so I thought it was a bit clearer this way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks reasonable to me. Cached or not is the internal impl of SnapshotFileChanges

failMissingDeletePaths();

// copy adds from the picked snapshot
// copy adds and deletes from the picked snapshot
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// copy adds and deletes from the picked snapshot
// copy adds from the picked snapshot

@@ -0,0 +1,276 @@
/*
Copy link
Member Author

@RussellSpitzer RussellSpitzer Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual Utility we are switching to

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With V4 metadata change, the change detection can be more complicated with manfest DVs. I like this direction of moving the change detection out of the Snapshot class, which can just focus on core data structures.

This could be a good foundation for the change detection in the V4 adaptive tree.

In V4, if we are going to colocate DV (deleted old and added new) and data file, it might make sense to expose a combined result. Otherwise, the associations get split first and then need to be joined again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good way to think about this class for reviews.

* @param specsById a map of partition spec IDs to partition specs
* @return a new Builder
*/
public static Builder builder(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may want to in the future extend this to take multiple snapshots so ti may make sense to break the api into a "specs,io" and "snapshot" seperately, but not now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not pass everything in builder methods? Or do you want to keep those focused on just the snapshot configuration?

}

private void cacheDataFileChanges() {
List<ManifestFile> changedManifests =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the logic from the BaseSnapshot implementation to optionally use an ExecutorService. I want to save actually using that for a followup PR but I think it was probably a mistake that we had this single threaded before.

}

private void cacheDeleteFileChanges() {
List<ManifestFile> changedManifests =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above this differs from the BaseSnapshot impl by using an optional executor service

}

public static class Builder {
private final Snapshot snapshot;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am attempting to hid some of the constructor details from the Changes Class with this builder, but that may be premature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good idea, but I think you should choose what is required and what is used to configure the builder.

I might suggest passing a table in since this seems like a useful interface for callers that don't want to get lots of things from a table just to pass it in here.

  SnapshotChanges changes = SnapshotChanges.builderFor(table)
      .snapshot(id) // maybe this can support refs, too?
      .executeWith(threadPool)
      .build();

  SnapshotChanges changes = SnapshotChanges.builderFor(table)
      .startingSnapshot(start)
      .endingSnapshot(end)
      .executeWith(threadPool)
      .build();


return metadata.snapshot(ref.snapshotId());
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A set of static methods which allow for getting just one element of the Changes class without actually constructing it. We can drop these as well but I think they make the test code refactor a bit smaller.

@@ -31,6 +31,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another production code change here

assertThat(table.schema().asStruct()).isEqualTo(expected.asStruct());
assertThat(SnapshotUtil.schemaFor(table, tag).asStruct()).isEqualTo(initialSchema.asStruct());
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual Tests for the Utility

// If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP,
// iterate through addedFiles iterator to find addedFilesCount.
return addedFilesCount == -1
? Iterables.size(snapshot.addedDataFiles(table.io()))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only Spark Usage, pretty straight forward fix

@github-actions github-actions bot removed the INFRA label Feb 5, 2026
@RussellSpitzer
Copy link
Member Author

If it's easier for reviewers, I can also split this into "deprecate and utility" and then do all the production and test updates in a follow up PR. I just bundled this all up so that we would be able to avoid having a build that has active deprecation warnings.

@RussellSpitzer RussellSpitzer requested review from nastra, pvary and stevenzwu and removed request for pvary and stevenzwu February 5, 2026 19:42
@@ -0,0 +1,276 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With V4 metadata change, the change detection can be more complicated with manfest DVs. I like this direction of moving the change detection out of the Snapshot class, which can just focus on core data structures.

This could be a good foundation for the change detection in the V4 adaptive tree.

In V4, if we are going to colocate DV (deleted old and added new) and data file, it might make sense to expose a combined result. Otherwise, the associations get split first and then need to be joined again.

// Pick modifications from the snapshot
for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io)) {
SnapshotFileChanges changes =
SnapshotFileChanges.builder(cherrypickSnapshot, io, specsById).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks reasonable to me. Cached or not is the internal impl of SnapshotFileChanges

* query multiple file change types for the same snapshot. By default, manifests are read
* sequentially. Use {@link Builder#executeWith(ExecutorService)} to enable parallel reading.
*/
public class SnapshotFileChanges {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we keep this class as package private to start with? SnapshotUtil is in a diff package which prevents this.

Not sure if ChangelogUtil is a better place. Or maybe add a new ChangeDetectionUtil?

Comment on lines +116 to +120
Iterables.filter(
Iterables.filter(
snapshot.allManifests(io),
manifest -> manifest.content() == ManifestContent.DATA),
manifest -> Objects.equals(manifest.snapshotId(), snapshot.snapshotId())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why two filters? Could we combine them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just being lazy and mimicking the BaseSnapshot code, Let me tighten that up

Comment on lines +85 to +86
}
return addedDataFiles;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a prompt for the AIs to add newlines 🤖

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do a "agents.md" i know a bunch of other projects have done this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although if we really want this to not be an issue we need to encode it in our style rules

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for adding the config.

}

/** Returns all data files removed from the table in this snapshot. */
public Iterable<DataFile> removedDataFiles() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want Iterable or Iterator?
Iterator could be a bit more flexible, if we end up in a situation where the data don't fit into memory, but we might end up duplicating data.

Minimally we should return immutable results

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently i'm just mimicing the old signature, but your right it make be nice to change to interator although our current implementation is just iterable. I'm wondering for "iterator" invocations if we should change the implementation of the helper methods I added to SnapshotUtil since this class is basically for caching.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterable is the right class.

Iterable provides something that can be iterated over multiple times, each time producing a new Iterator. That gives us the most flexibility, instead of forcing the caller to go back to this API to process removed data files another time -- the result of this method can be passed around and used without exposing this.

Iterable also does not need to load anything into memory. A List is Iterable, but so is ManifestReader that loads chunks of data at a time.

Java's enhanced for syntax is another reason to use Iterable:

  // this works
  for (DataFile removed : changes.removedDataFiles()) { ... }

Otherwise you have to wrap to produce an Iterable:

  // this has to create an Iterable using a lambda
  for (DataFile removed : (Iterable<DataFile>) () -> changes.removedDataFiles()) { ... }

Copy link
Contributor

@stevenzwu stevenzwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

*
* @param io a {@link FileIO} instance used for reading files from storage
* @return all data files added to the table in this snapshot.
* @deprecated will be removed in 2.0.0; use org.apache.iceberg.SnapshotChanges#builder(Snapshot,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably worth an import for SnapshotChanges to avoid the fully-qualified names in javadoc.

Iterables.filter(
Iterables.filter(
snapshot.allManifests(io),
manifest -> manifest.content() == ManifestContent.DATA),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

snapshot.dataManifests(io)?

.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(executorService)
.run(manifest -> fileChangesByManifest.add(readDataFileChanges(manifest)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to use ParallelIterable instead of directly building on Tasks. The benefit of using ParallelIterable is that it manages the queue internally, doubles the number of running tasks (2x the size of the thread pool), handles closing for CloseableIterable, and allows you to consume the results while threads are running. The main benefit would be higher parallelism, but this would also be a little simpler because the original implementation that used ManifestGroup already supports passing a threadpool if you call planWith.

Was the intent to avoid complication by replacing ManifestGroup?

Copy link
Contributor

@rdblue rdblue Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. The delete code used readDeleteManifest directly, so this is probably just being consistent.

I think I'd still recommend using ParallelIterable. Here's what I came up with:

  private Iterable<Pair<ManifestEntry.Status, DeleteFile>> readDeleteFiles(ManifestFile manifest) {
    Iterable<ManifestEntry<DeleteFile>> entries = ManifestFiles.readDeleteManifest(manifest, fileIO, null).entries();
    Iterable<Pair<ManifestEntry.Status, DeleteFile>> copied = Iterables.transform(entries, entry ->
        switch (entry.status()) {
          case ADDED -> Pair.of(ManifestEntry.Status.ADDED, entry.file().copy());
          case DELETED -> Pair.of(ManifestEntry.Status.DELETED, entry.file().copyWithoutStats());
          default -> null;
        });
    return Iterables.filter(copied, java.util.Objects::nonNull);
  }

  private void cacheDeleteFileChanges() {
    ImmutableList.Builder<DeleteFile> adds = ImmutableList.builder();
    ImmutableList.Builder<DeleteFile> deletes = ImmutableList.builder();

    Iterable<ManifestFile> changedManifests =
        Iterables.filter(
            deleteManifests(fileIO), manifest -> Objects.equal(manifest.snapshotId(), snapshotId));

    Iterable<Iterable<Pair<ManifestEntry.Status, DeleteFile>>> changedDeletes = Iterables.transform(changedManifests, this::readDeleteFiles);

    try (CloseableIterable<Pair<ManifestEntry.Status, DeleteFile>> pairs = new ParallelIterable<>(changedDeletes, ThreadPools.getWorkerPool())) {
      for (Pair<ManifestEntry.Status, DeleteFile> delete : pairs) {
        switch (delete.first()) {
          case ADDED -> adds.add(delete.second());
          case DELETED -> deletes.add(delete.second());
        }
      }
    } catch (IOException e) {
      throw new UncheckedIOException("Failed to close manifest reader", e);
    }

    this.addedDeleteFiles = adds.build();
    this.removedDeleteFiles = deletes.build();
  }

This uses Pair because the files need to be copied before returning them to ParallelIterable, but that would lose the added/deleted status. This is the kind of thing that inspired inverting the manifest_entry / delete_file relationship in v4 so that delete_file contains tracking_info.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants