Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 3 additions & 13 deletions core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,10 @@ private void keepActiveManifests(List<ManifestFile> currentManifests) {
}

private void reset() {
cleanUncommitted(newManifests, ImmutableSet.of());
deleteUncommitted(newManifests, ImmutableSet.of(), true /* clear new manifests */);
entryCount.set(0);
keptManifests.clear();
rewrittenManifests.clear();
newManifests.clear();
writers.clear();
}

Expand Down Expand Up @@ -345,19 +344,10 @@ private WriterWrapper getWriter(Object key, int partitionSpecId) {

@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
cleanUncommitted(newManifests, committed);
deleteUncommitted(newManifests, committed, false);
// clean up only rewrittenAddedManifests as they are always owned by the table
// don't clean up addedManifests as they are added to the manifest list and are not compacted
cleanUncommitted(rewrittenAddedManifests, committed);
}

private void cleanUncommitted(
Iterable<ManifestFile> manifests, Set<ManifestFile> committedManifests) {
for (ManifestFile manifest : manifests) {
if (!committedManifests.contains(manifest)) {
deleteFile(manifest.path());
}
}
deleteUncommitted(rewrittenAddedManifests, committed, false);
}

long getManifestTargetSizeBytes() {
Expand Down
17 changes: 2 additions & 15 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,25 +181,12 @@ public Object updateEvent() {
@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
if (newManifests != null) {
boolean hasDeletes = false;
for (ManifestFile manifest : newManifests) {
if (!committed.contains(manifest)) {
deleteFile(manifest.path());
hasDeletes = true;
}
}
if (hasDeletes) {
this.newManifests.clear();
}
deleteUncommitted(newManifests, committed, true /* clear manifests */);
}

// clean up only rewrittenAppendManifests as they are always owned by the table
// don't clean up appendManifests as they are added to the manifest list and are not compacted
for (ManifestFile manifest : rewrittenAppendManifests) {
if (!committed.contains(manifest)) {
deleteFile(manifest.path());
}
}
deleteUncommitted(rewrittenAppendManifests, committed, false);
}

/**
Expand Down
65 changes: 13 additions & 52 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,6 @@ protected void validateNewDeleteFile(DeleteFile file) {
ContentFileUtil.dvDesc(file));
break;
case 3:
Preconditions.checkArgument(
file.content() == FileContent.EQUALITY_DELETES || ContentFileUtil.isDV(file),
"Must use DVs for position deletes in V%s: %s",
formatVersion(),
file.location());
break;
case 4:
Preconditions.checkArgument(
file.content() == FileContent.EQUALITY_DELETES || ContentFileUtil.isDV(file),
Expand Down Expand Up @@ -991,62 +985,29 @@ public Object updateEvent() {
return new CreateSnapshotEvent(tableName, operation(), snapshotId, sequenceNumber, summary);
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
private void cleanUncommittedAppends(Set<ManifestFile> committed) {
if (!cachedNewDataManifests.isEmpty()) {
boolean hasDeletes = false;
for (ManifestFile manifest : cachedNewDataManifests) {
if (!committed.contains(manifest)) {
deleteFile(manifest.path());
hasDeletes = true;
}
}

if (hasDeletes) {
this.cachedNewDataManifests.clear();
}
}

boolean hasDeleteDeletes = false;
for (ManifestFile cachedNewDeleteManifest : cachedNewDeleteManifests) {
if (!committed.contains(cachedNewDeleteManifest)) {
deleteFile(cachedNewDeleteManifest.path());
hasDeleteDeletes = true;
}
}

if (hasDeleteDeletes) {
this.cachedNewDeleteManifests.clear();
}
@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
mergeManager.cleanUncommitted(committed);
filterManager.cleanUncommitted(committed);
deleteMergeManager.cleanUncommitted(committed);
deleteFilterManager.cleanUncommitted(committed);
cleanUncommittedAppends(committed);
}

private void cleanUncommittedAppends(Set<ManifestFile> committed) {
deleteUncommitted(cachedNewDataManifests, committed, true /* clear manifests */);
deleteUncommitted(cachedNewDeleteManifests, committed, true /* clear manifests */);
// rewritten manifests are always owned by the table
for (ManifestFile manifest : rewrittenAppendManifests) {
if (!committed.contains(manifest)) {
deleteFile(manifest.path());
}
}
deleteUncommitted(rewrittenAppendManifests, committed, false);

// manifests that are not rewritten are only owned by the table if the commit succeeded
if (!committed.isEmpty()) {
// the commit succeeded if at least one manifest was committed
// the table now owns appendManifests; clean up any that are not used
for (ManifestFile manifest : appendManifests) {
if (!committed.contains(manifest)) {
deleteFile(manifest.path());
}
}
deleteUncommitted(appendManifests, committed, false);
}
}

@Override
protected void cleanUncommitted(Set<ManifestFile> committed) {
mergeManager.cleanUncommitted(committed);
filterManager.cleanUncommitted(committed);
deleteMergeManager.cleanUncommitted(committed);
deleteFilterManager.cleanUncommitted(committed);
cleanUncommittedAppends(committed);
}

private Iterable<ManifestFile> prepareNewDataManifests() {
Iterable<ManifestFile> newManifests;
if (!newDataFilesBySpec.isEmpty()) {
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,22 @@ protected List<ManifestFile> writeDataManifests(
return writeManifests(files, group -> writeDataFileGroup(group, dataSeq, spec));
}

// Deletes uncommitted manifests; clears list if clearManifests and any deleted.
protected void deleteUncommitted(
Collection<ManifestFile> manifests, Set<ManifestFile> committed, boolean clearManifests) {
boolean anyDeleted = false;
for (ManifestFile manifest : manifests) {
if (!committed.contains(manifest)) {
deleteFile(manifest.path());
anyDeleted = true;
}
}

if (clearManifests && anyDeleted) {
manifests.clear();
}
}

private List<ManifestFile> writeDataFileGroup(
Collection<DataFile> files, Long dataSeq, PartitionSpec spec) {
RollingManifestWriter<DataFile> writer = newRollingManifestWriter(spec);
Expand Down