-
Notifications
You must be signed in to change notification settings - Fork 3.2k
API, CORE, Flink, Spark: Deprecate Snapshot Change Methods #15241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
86d4f2e
a6bd8c3
53d0ea6
a4ab20c
848a19b
cc5ac9b
c8c185e
d33da87
52511b1
aa062ac
217ed0f
f5520ac
a366f75
ba0b422
5ba92ab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,271 @@ | ||
| /* | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the actual Utility we are switching to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With V4 metadata change, the change detection can be more complicated with manfest DVs. I like this direction of moving the change detection out of the This could be a good foundation for the change detection in the V4 adaptive tree. In V4, if we are going to colocate DV (deleted old and added new) and data file, it might make sense to expose a combined result. Otherwise, the associations get split first and then need to be joined again.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is a good way to think about this class for reviews. |
||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.UncheckedIOException; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.concurrent.ExecutorService; | ||
| import org.apache.iceberg.io.CloseableIterable; | ||
| import org.apache.iceberg.io.FileIO; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Iterables; | ||
| import org.apache.iceberg.util.Pair; | ||
| import org.apache.iceberg.util.ParallelIterable; | ||
|
|
||
| /** | ||
| * Helper class for retrieving file changes in a snapshot with caching. | ||
| * | ||
| * <p>This class caches the results of file change detection operations, making it efficient to | ||
| * query multiple file change types for the same snapshot. By default, manifests are read | ||
| * single-threaded. Use {@link Builder#executeWith(ExecutorService)} to enable parallel manifest | ||
| * reading. | ||
| */ | ||
| public class SnapshotChanges { | ||
| private final Snapshot snapshot; | ||
| private final FileIO io; | ||
| private final Map<Integer, PartitionSpec> specsById; | ||
| private final ExecutorService executorService; | ||
|
|
||
| private List<DataFile> addedDataFiles = null; | ||
| private List<DataFile> removedDataFiles = null; | ||
| private List<DeleteFile> addedDeleteFiles = null; | ||
| private List<DeleteFile> removedDeleteFiles = null; | ||
|
|
||
| private SnapshotChanges( | ||
| Snapshot snapshot, | ||
| FileIO io, | ||
| Map<Integer, PartitionSpec> specsById, | ||
| ExecutorService executorService) { | ||
| Preconditions.checkArgument(snapshot != null, "Snapshot cannot be null"); | ||
| Preconditions.checkArgument(io != null, "FileIO cannot be null"); | ||
| Preconditions.checkArgument(specsById != null, "Partition specs cannot be null"); | ||
| this.snapshot = snapshot; | ||
| this.io = io; | ||
| this.specsById = specsById; | ||
| this.executorService = executorService; | ||
| } | ||
|
|
||
| /** | ||
| * Create a builder for SnapshotChanges using the table's current snapshot. | ||
| * | ||
| * @param table the table to detect file changes for | ||
| * @return a new Builder | ||
| */ | ||
| public static Builder builderFor(Table table) { | ||
| return new Builder(table.currentSnapshot(), table.io(), table.specs()); | ||
| } | ||
|
|
||
| static Builder builderFor(Snapshot snapshot, FileIO io, Map<Integer, PartitionSpec> specsById) { | ||
| return new Builder(snapshot, io, specsById); | ||
| } | ||
|
|
||
| private <T> CloseableIterable<T> iterate(Iterable<CloseableIterable<T>> tasks) { | ||
| if (executorService != null) { | ||
| return new ParallelIterable<>(tasks, executorService); | ||
| } else { | ||
| return CloseableIterable.concat(tasks); | ||
| } | ||
| } | ||
|
|
||
| /** Returns all data files added to the table in this snapshot */ | ||
| public Iterable<DataFile> addedDataFiles() { | ||
| if (addedDataFiles == null) { | ||
| cacheDataFileChanges(); | ||
| } | ||
|
|
||
| return addedDataFiles; | ||
|
Comment on lines
+94
to
+96
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need a prompt for the AIs to add newlines 🤖
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can do a "agents.md" i know a bunch of other projects have done this
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although if we really want this to not be an issue we need to encode it in our style rules
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 for adding the config.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are apparently thousands of violations in the codebase. I'll raise another PR.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lots of duplicates because of the modules but a lot of cases
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. https://github.com/apache/iceberg/pull/15422/changes#r2843183454 <- To see the damage |
||
| } | ||
|
|
||
| /** Returns all data files removed from the table in this snapshot. */ | ||
| public Iterable<DataFile> removedDataFiles() { | ||
|
pvary marked this conversation as resolved.
|
||
| if (removedDataFiles == null) { | ||
| cacheDataFileChanges(); | ||
| } | ||
|
|
||
| return removedDataFiles; | ||
| } | ||
|
|
||
| /** Returns all delete files added to the table in this snapshot. */ | ||
| public Iterable<DeleteFile> addedDeleteFiles() { | ||
| if (addedDeleteFiles == null) { | ||
| cacheDeleteFileChanges(); | ||
| } | ||
|
|
||
| return addedDeleteFiles; | ||
| } | ||
|
|
||
| /** Returns all delete files removed from the table in this snapshot. */ | ||
| public Iterable<DeleteFile> removedDeleteFiles() { | ||
| if (removedDeleteFiles == null) { | ||
| cacheDeleteFileChanges(); | ||
| } | ||
|
|
||
| return removedDeleteFiles; | ||
| } | ||
|
|
||
| private void cacheDataFileChanges() { | ||
| ImmutableList.Builder<DataFile> adds = ImmutableList.builder(); | ||
| ImmutableList.Builder<DataFile> deletes = ImmutableList.builder(); | ||
|
|
||
| Iterable<ManifestFile> relevantDataManifests = | ||
| Iterables.filter( | ||
| snapshot.dataManifests(io), | ||
| manifest -> Objects.equals(manifest.snapshotId(), snapshot.snapshotId())); | ||
|
|
||
| Iterable<CloseableIterable<Pair<ManifestEntry.Status, DataFile>>> manifestReadTasks = | ||
| Iterables.transform(relevantDataManifests, this::readDataManifest); | ||
|
|
||
| try (CloseableIterable<Pair<ManifestEntry.Status, DataFile>> changedDataFiles = | ||
| iterate(manifestReadTasks)) { | ||
| for (Pair<ManifestEntry.Status, DataFile> pair : changedDataFiles) { | ||
| switch (pair.first()) { | ||
| case ADDED: | ||
| adds.add(pair.second()); | ||
| break; | ||
| case DELETED: | ||
| deletes.add(pair.second()); | ||
| break; | ||
| } | ||
| } | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException("Failed to close manifest reader", e); | ||
| } | ||
|
|
||
| this.addedDataFiles = adds.build(); | ||
| this.removedDataFiles = deletes.build(); | ||
| } | ||
|
|
||
| private CloseableIterable<Pair<ManifestEntry.Status, DataFile>> readDataManifest( | ||
| ManifestFile manifest) { | ||
| CloseableIterable<ManifestEntry<DataFile>> entries = | ||
| ManifestFiles.read(manifest, io, specsById).entries(); | ||
|
|
||
| CloseableIterable<ManifestEntry<DataFile>> relevant = | ||
| CloseableIterable.filter(entries, e -> e.status() != ManifestEntry.Status.EXISTING); | ||
|
|
||
| return CloseableIterable.transform( | ||
| relevant, | ||
| entry -> { | ||
| if (entry.status() == ManifestEntry.Status.ADDED) { | ||
| return Pair.of(ManifestEntry.Status.ADDED, entry.file().copy()); | ||
| } else { | ||
| return Pair.of(ManifestEntry.Status.DELETED, entry.file().copyWithoutStats()); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| private void cacheDeleteFileChanges() { | ||
| ImmutableList.Builder<DeleteFile> adds = ImmutableList.builder(); | ||
| ImmutableList.Builder<DeleteFile> deletes = ImmutableList.builder(); | ||
|
|
||
| Iterable<ManifestFile> relevantDeleteManifests = | ||
| Iterables.filter( | ||
| snapshot.deleteManifests(io), | ||
| manifest -> Objects.equals(manifest.snapshotId(), snapshot.snapshotId())); | ||
|
|
||
| Iterable<CloseableIterable<Pair<ManifestEntry.Status, DeleteFile>>> manifestReadTasks = | ||
| Iterables.transform(relevantDeleteManifests, this::readDeleteManifest); | ||
|
|
||
| try (CloseableIterable<Pair<ManifestEntry.Status, DeleteFile>> changedDeleteFiles = | ||
| iterate(manifestReadTasks)) { | ||
| for (Pair<ManifestEntry.Status, DeleteFile> pair : changedDeleteFiles) { | ||
| switch (pair.first()) { | ||
| case ADDED: | ||
| adds.add(pair.second()); | ||
| break; | ||
| case DELETED: | ||
| deletes.add(pair.second()); | ||
| break; | ||
| } | ||
| } | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException("Failed to close manifest reader", e); | ||
| } | ||
|
|
||
| this.addedDeleteFiles = adds.build(); | ||
| this.removedDeleteFiles = deletes.build(); | ||
| } | ||
|
|
||
| private CloseableIterable<Pair<ManifestEntry.Status, DeleteFile>> readDeleteManifest( | ||
| ManifestFile manifest) { | ||
| CloseableIterable<ManifestEntry<DeleteFile>> entries = | ||
| ManifestFiles.readDeleteManifest(manifest, io, specsById).entries(); | ||
|
|
||
| CloseableIterable<ManifestEntry<DeleteFile>> relevant = | ||
| CloseableIterable.filter(entries, e -> e.status() != ManifestEntry.Status.EXISTING); | ||
|
|
||
| return CloseableIterable.transform( | ||
| relevant, | ||
| entry -> { | ||
| if (entry.status() == ManifestEntry.Status.ADDED) { | ||
| return Pair.of(ManifestEntry.Status.ADDED, entry.file().copy()); | ||
| } else { | ||
| return Pair.of(ManifestEntry.Status.DELETED, entry.file().copyWithoutStats()); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| public static class Builder { | ||
| private Snapshot snapshot; | ||
| private final FileIO io; | ||
| private final Map<Integer, PartitionSpec> specsById; | ||
| private ExecutorService executorService = null; | ||
|
|
||
| private Builder(Snapshot snapshot, FileIO io, Map<Integer, PartitionSpec> specsById) { | ||
| this.snapshot = snapshot; | ||
| this.io = io; | ||
| this.specsById = specsById; | ||
| } | ||
|
|
||
| /** | ||
| * Set the snapshot to detect file changes for, overriding the default. | ||
| * | ||
| * @param snapshotOverride the snapshot to use | ||
| * @return this builder for method chaining | ||
| */ | ||
| public Builder snapshot(Snapshot snapshotOverride) { | ||
| this.snapshot = snapshotOverride; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Configure an executor service to use for parallel manifest reading. | ||
| * | ||
| * @param executor the executor service to use for parallel execution | ||
| * @return this builder for method chaining | ||
| */ | ||
| public Builder executeWith(ExecutorService executor) { | ||
| this.executorService = executor; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Build the SnapshotChanges instance. | ||
| * | ||
| * @return a new SnapshotChanges instance | ||
| */ | ||
| public SnapshotChanges build() { | ||
| return new SnapshotChanges(snapshot, io, specsById, executorService); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Main Deprecations are Here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for deprecating these methods. We've had issues with these in the past, like when we had to add
FileIOto the signature. A utility class is the better option.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also deprecate the equivalent manifest methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going to follow up in another PR for that, it is going to require changing another slew of tests