Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ public interface Snapshot extends Serializable {
*
* @param io a {@link FileIO} instance used for reading files from storage
* @return all data files added to the table in this snapshot.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Main Deprecations are Here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for deprecating these methods. We've had issues with these in the past, like when we had to add FileIO to the signature. A utility class is the better option.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also deprecate the equivalent manifest methods?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to follow up in another PR for that, it is going to require changing another slew of tests

* @deprecated will be removed in 2.0.0; use SnapshotChanges#builderFor(Table) instead
*/
@Deprecated
Iterable<DataFile> addedDataFiles(FileIO io);

/**
Expand All @@ -124,7 +126,9 @@ public interface Snapshot extends Serializable {
*
* @param io a {@link FileIO} instance used for reading files from storage
* @return all data files removed from the table in this snapshot.
* @deprecated will be removed in 2.0.0; use SnapshotChanges#builderFor(Table) instead
*/
@Deprecated
Iterable<DataFile> removedDataFiles(FileIO io);

/**
Expand All @@ -135,7 +139,9 @@ public interface Snapshot extends Serializable {
*
* @param io a {@link FileIO} instance used for reading files from storage
* @return all delete files added to the table in this snapshot
* @deprecated will be removed in 2.0.0; use SnapshotChanges#builderFor(Table) instead
*/
@Deprecated
default Iterable<DeleteFile> addedDeleteFiles(FileIO io) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement addedDeleteFiles");
Expand All @@ -149,7 +155,9 @@ default Iterable<DeleteFile> addedDeleteFiles(FileIO io) {
*
* @param io a {@link FileIO} instance used for reading files from storage
* @return all delete files removed from the table in this snapshot
* @deprecated will be removed in 2.0.0; use SnapshotChanges#builderFor(Table) instead
*/
@Deprecated
default Iterable<DeleteFile> removedDeleteFiles(FileIO io) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement removedDeleteFiles");
Expand Down
13 changes: 7 additions & 6 deletions core/src/main/java/org/apache/iceberg/CherryPickOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,13 @@
class CherryPickOperation extends MergingSnapshotProducer<CherryPickOperation> {

private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private Snapshot cherrypickSnapshot = null;
private boolean requireFastForward = false;
private PartitionSet replacedPartitions = null;

CherryPickOperation(String tableName, TableOperations ops) {
super(tableName, ops);
this.io = ops.io();
this.specsById = ops.current().specsById();
}

@Override
Expand All @@ -71,6 +69,9 @@ public CherryPickOperation cherrypick(long snapshotId) {
ValidationException.check(
cherrypickSnapshot != null, "Cannot cherry-pick unknown snapshot ID: %s", snapshotId);

SnapshotChanges changes =
SnapshotChanges.builderFor(cherrypickSnapshot, ops().io(), current.specsById()).build();

if (cherrypickSnapshot.operation().equals(DataOperations.APPEND)) {
// this property is set on target snapshot that will get published
String wapId = WapUtil.validateWapPublish(current, snapshotId);
Expand All @@ -82,7 +83,7 @@ public CherryPickOperation cherrypick(long snapshotId) {
set(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, String.valueOf(snapshotId));

// Pick modifications from the snapshot
for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io)) {
for (DataFile addedFile : changes.addedDataFiles()) {
add(addedFile);
}

Expand Down Expand Up @@ -113,14 +114,14 @@ public CherryPickOperation cherrypick(long snapshotId) {
failMissingDeletePaths();

// copy adds from the picked snapshot
this.replacedPartitions = PartitionSet.create(specsById);
for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io)) {
this.replacedPartitions = PartitionSet.create(current.specsById());
for (DataFile addedFile : changes.addedDataFiles()) {
add(addedFile);
replacedPartitions.add(addedFile.specId(), addedFile.partition());
}

// copy deletes from the picked snapshot
for (DataFile deletedFile : cherrypickSnapshot.removedDataFiles(io)) {
for (DataFile deletedFile : changes.removedDataFiles()) {
delete(deletedFile);
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/MicroBatches.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ public MicroBatchBuilder specsById(Map<Integer, PartitionSpec> specs) {
public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
return generate(
startFileIndex,
Iterables.size(snapshot.addedDataFiles(io)),
Iterables.size(
SnapshotChanges.builderFor(snapshot, io, specsById).build().addedDataFiles()),
targetSizeInBytes,
scanAllFiles);
}
Expand Down
271 changes: 271 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotChanges.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
/*
Copy link
Copy Markdown
Member Author

@RussellSpitzer RussellSpitzer Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual Utility we are switching to

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With V4 metadata change, the change detection can be more complicated with manfest DVs. I like this direction of moving the change detection out of the Snapshot class, which can just focus on core data structures.

This could be a good foundation for the change detection in the V4 adaptive tree.

In V4, if we are going to colocate DV (deleted old and added new) and data file, it might make sense to expose a combined result. Otherwise, the associations get split first and then need to be joined again.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good way to think about this class for reviews.

* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.ParallelIterable;

/**
* Helper class for retrieving file changes in a snapshot with caching.
*
* <p>This class caches the results of file change detection operations, making it efficient to
* query multiple file change types for the same snapshot. By default, manifests are read
* single-threaded. Use {@link Builder#executeWith(ExecutorService)} to enable parallel manifest
* reading.
*/
public class SnapshotChanges {
private final Snapshot snapshot;
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private final ExecutorService executorService;

private List<DataFile> addedDataFiles = null;
private List<DataFile> removedDataFiles = null;
private List<DeleteFile> addedDeleteFiles = null;
private List<DeleteFile> removedDeleteFiles = null;

private SnapshotChanges(
Snapshot snapshot,
FileIO io,
Map<Integer, PartitionSpec> specsById,
ExecutorService executorService) {
Preconditions.checkArgument(snapshot != null, "Snapshot cannot be null");
Preconditions.checkArgument(io != null, "FileIO cannot be null");
Preconditions.checkArgument(specsById != null, "Partition specs cannot be null");
this.snapshot = snapshot;
this.io = io;
this.specsById = specsById;
this.executorService = executorService;
}

/**
* Create a builder for SnapshotChanges using the table's current snapshot.
*
* @param table the table to detect file changes for
* @return a new Builder
*/
public static Builder builderFor(Table table) {
return new Builder(table.currentSnapshot(), table.io(), table.specs());
}

static Builder builderFor(Snapshot snapshot, FileIO io, Map<Integer, PartitionSpec> specsById) {
return new Builder(snapshot, io, specsById);
}

private <T> CloseableIterable<T> iterate(Iterable<CloseableIterable<T>> tasks) {
if (executorService != null) {
return new ParallelIterable<>(tasks, executorService);
} else {
return CloseableIterable.concat(tasks);
}
}

/** Returns all data files added to the table in this snapshot */
public Iterable<DataFile> addedDataFiles() {
if (addedDataFiles == null) {
cacheDataFileChanges();
}

return addedDataFiles;
Comment on lines +94 to +96
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a prompt for the AIs to add newlines 🤖

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do a "agents.md" i know a bunch of other projects have done this

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although if we really want this to not be an issue we need to encode it in our style rules

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for adding the config.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are apparently thousands of violations in the codebase. I'll raise another PR.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing braces followed by more code (not another '}'):

  Already following the rule:   67594  (94.9%)
  Violating the rule:            3619  (5.1%)
  Total:                        71213

Lots of duplicates because of the modules but a lot of cases

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

/** Returns all data files removed from the table in this snapshot. */
public Iterable<DataFile> removedDataFiles() {
Comment thread
pvary marked this conversation as resolved.
if (removedDataFiles == null) {
cacheDataFileChanges();
}

return removedDataFiles;
}

/** Returns all delete files added to the table in this snapshot. */
public Iterable<DeleteFile> addedDeleteFiles() {
if (addedDeleteFiles == null) {
cacheDeleteFileChanges();
}

return addedDeleteFiles;
}

/** Returns all delete files removed from the table in this snapshot. */
public Iterable<DeleteFile> removedDeleteFiles() {
if (removedDeleteFiles == null) {
cacheDeleteFileChanges();
}

return removedDeleteFiles;
}

private void cacheDataFileChanges() {
ImmutableList.Builder<DataFile> adds = ImmutableList.builder();
ImmutableList.Builder<DataFile> deletes = ImmutableList.builder();

Iterable<ManifestFile> relevantDataManifests =
Iterables.filter(
snapshot.dataManifests(io),
manifest -> Objects.equals(manifest.snapshotId(), snapshot.snapshotId()));

Iterable<CloseableIterable<Pair<ManifestEntry.Status, DataFile>>> manifestReadTasks =
Iterables.transform(relevantDataManifests, this::readDataManifest);

try (CloseableIterable<Pair<ManifestEntry.Status, DataFile>> changedDataFiles =
iterate(manifestReadTasks)) {
for (Pair<ManifestEntry.Status, DataFile> pair : changedDataFiles) {
switch (pair.first()) {
case ADDED:
adds.add(pair.second());
break;
case DELETED:
deletes.add(pair.second());
break;
}
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to close manifest reader", e);
}

this.addedDataFiles = adds.build();
this.removedDataFiles = deletes.build();
}

private CloseableIterable<Pair<ManifestEntry.Status, DataFile>> readDataManifest(
ManifestFile manifest) {
CloseableIterable<ManifestEntry<DataFile>> entries =
ManifestFiles.read(manifest, io, specsById).entries();

CloseableIterable<ManifestEntry<DataFile>> relevant =
CloseableIterable.filter(entries, e -> e.status() != ManifestEntry.Status.EXISTING);

return CloseableIterable.transform(
relevant,
entry -> {
if (entry.status() == ManifestEntry.Status.ADDED) {
return Pair.of(ManifestEntry.Status.ADDED, entry.file().copy());
} else {
return Pair.of(ManifestEntry.Status.DELETED, entry.file().copyWithoutStats());
}
});
}

private void cacheDeleteFileChanges() {
ImmutableList.Builder<DeleteFile> adds = ImmutableList.builder();
ImmutableList.Builder<DeleteFile> deletes = ImmutableList.builder();

Iterable<ManifestFile> relevantDeleteManifests =
Iterables.filter(
snapshot.deleteManifests(io),
manifest -> Objects.equals(manifest.snapshotId(), snapshot.snapshotId()));

Iterable<CloseableIterable<Pair<ManifestEntry.Status, DeleteFile>>> manifestReadTasks =
Iterables.transform(relevantDeleteManifests, this::readDeleteManifest);

try (CloseableIterable<Pair<ManifestEntry.Status, DeleteFile>> changedDeleteFiles =
iterate(manifestReadTasks)) {
for (Pair<ManifestEntry.Status, DeleteFile> pair : changedDeleteFiles) {
switch (pair.first()) {
case ADDED:
adds.add(pair.second());
break;
case DELETED:
deletes.add(pair.second());
break;
}
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to close manifest reader", e);
}

this.addedDeleteFiles = adds.build();
this.removedDeleteFiles = deletes.build();
}

private CloseableIterable<Pair<ManifestEntry.Status, DeleteFile>> readDeleteManifest(
ManifestFile manifest) {
CloseableIterable<ManifestEntry<DeleteFile>> entries =
ManifestFiles.readDeleteManifest(manifest, io, specsById).entries();

CloseableIterable<ManifestEntry<DeleteFile>> relevant =
CloseableIterable.filter(entries, e -> e.status() != ManifestEntry.Status.EXISTING);

return CloseableIterable.transform(
relevant,
entry -> {
if (entry.status() == ManifestEntry.Status.ADDED) {
return Pair.of(ManifestEntry.Status.ADDED, entry.file().copy());
} else {
return Pair.of(ManifestEntry.Status.DELETED, entry.file().copyWithoutStats());
}
});
}

public static class Builder {
private Snapshot snapshot;
private final FileIO io;
private final Map<Integer, PartitionSpec> specsById;
private ExecutorService executorService = null;

private Builder(Snapshot snapshot, FileIO io, Map<Integer, PartitionSpec> specsById) {
this.snapshot = snapshot;
this.io = io;
this.specsById = specsById;
}

/**
* Set the snapshot to detect file changes for, overriding the default.
*
* @param snapshotOverride the snapshot to use
* @return this builder for method chaining
*/
public Builder snapshot(Snapshot snapshotOverride) {
this.snapshot = snapshotOverride;
return this;
}

/**
* Configure an executor service to use for parallel manifest reading.
*
* @param executor the executor service to use for parallel execution
* @return this builder for method chaining
*/
public Builder executeWith(ExecutorService executor) {
this.executorService = executor;
return this;
}

/**
* Build the SnapshotChanges instance.
*
* @return a new SnapshotChanges instance
*/
public SnapshotChanges build() {
return new SnapshotChanges(snapshot, io, specsById, executorService);
}
}
}
3 changes: 2 additions & 1 deletion core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,8 @@ public void testDeleteFilesNoValidation() {
.containsEntry(SnapshotSummary.KEPT_MANIFESTS_COUNT, "0")
.containsEntry(SnapshotSummary.REPLACED_MANIFESTS_COUNT, "0");
assertThat(delete2.allManifests(FILE_IO)).isEmpty();
assertThat(delete2.removedDataFiles(FILE_IO)).isEmpty();
assertThat(SnapshotChanges.builderFor(table).snapshot(delete2).build().removedDataFiles())
.isEmpty();
}

@Test
Expand Down
Loading