Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.optimizing.maintainer;

import org.apache.amoro.BasicTableTestHelper;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableTestHelper;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.formats.iceberg.maintainer.IcebergTableMaintainer;
import org.apache.amoro.server.scheduler.inline.ExecutorTestBase;
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.table.TableProperties;
import org.apache.amoro.table.UnkeyedTable;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.data.Record;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.List;

/**
* Reproduces a data-file loss observed in production during snapshot expiration.
*
* <p>When a table has a single ref (the common case after a tag is dropped), {@code
* RemoveSnapshots} auto-selects {@link org.apache.iceberg.IncrementalFileCleanup}. That strategy
* walks the current snapshot's ancestor chain via {@code SnapshotUtil.ancestorIds}; if a parent
* snapshot is missing from metadata (e.g. expired by an earlier cycle) the walk terminates
* silently. Snapshots below that break are then treated as "not an ancestor", so the ADDED entries
* in their (superseded but still referenced) manifests are reverted and the data files are
* physically deleted - even though those files are still carried over as EXISTING entries in the
* current snapshot's manifest. The result is a current snapshot that references a missing file.
*
* <p>This test builds exactly that state and asserts the invariant that expiration must never
* delete a data file referenced by the current snapshot. It fails on the buggy incremental path and
* passes once {@link IcebergTableMaintainer} detects the off-main snapshot and forces the reachable
* cleanup strategy, which never walks the ancestor chain.
*/
@RunWith(Parameterized.class)
public class TestExpireSnapshotsKeepReferencedFiles extends ExecutorTestBase {

@Parameterized.Parameters(name = "{0}, {1}")
public static Object[] parameters() {
return new Object[][] {
{new BasicCatalogTestHelper(TableFormat.ICEBERG), new BasicTableTestHelper(false, false)}
};
}

public TestExpireSnapshotsKeepReferencedFiles(
CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
super(catalogTestHelper, tableTestHelper);
}

@Test
public void testExpireKeepsFilesReferencedByCurrentSnapshot() {
UnkeyedTable table = getMixedTable().asUnkeyedTable();
// Merge manifests explicitly below; keep auto-merge off so the fixture is deterministic.
table
.updateProperties()
.set("commit.manifest-merge.enabled", "false")
.set(TableProperties.SNAPSHOT_KEEP_DURATION, "0")
.commit();

// S0, S1: each append writes one data file in its own manifest (F0 in m0, F1 in m1).
DataFile f0 = appendOneRecord(table, 1, 1L);
DataFile f1 = appendOneRecord(table, 2, 2L);

// S2: rewrite manifests into a single merged manifest holding F0, F1 as EXISTING entries.
// m0/m1 are now superseded but remain referenced by the manifest lists of S0/S1.
table.rewriteManifests().clusterBy(file -> 0).commit();
long midSnapshotId = table.currentSnapshot().snapshotId();

// S3 (head): append F2. The current snapshot still references F0, F1 via the merged manifest.
DataFile f2 = appendOneRecord(table, 3, 3L);

Assert.assertTrue(table.io().exists(f0.path().toString()));
Assert.assertTrue(table.io().exists(f1.path().toString()));

// Explicitly expire the middle snapshot S2. This breaks head's ancestor chain
// (head.parent == S2, now absent), so a later ancestor walk truncates above S0/S1.
// Snapshot-id expiration uses reachable cleanup, so the baseline files stay safe here.
table.expireSnapshots().expireSnapshotId(midSnapshotId).cleanExpiredFiles(true).commit();
Assert.assertTrue(table.io().exists(f0.path().toString()));
Assert.assertTrue(table.io().exists(f1.path().toString()));

// Only the main ref remains, which is what makes iceberg auto-select incremental cleanup.
Assert.assertEquals(1, table.refs().size());

IcebergTableMaintainer maintainer =
new IcebergTableMaintainer(table, table.id(), TestTableMaintainerContext.of(table));
maintainer.expireSnapshots(System.currentTimeMillis(), 1);

// Sanity: head retained, history expired.
Assert.assertEquals(1, Iterables.size(table.snapshots()));
Assert.assertTrue(table.io().exists(f2.path().toString()));

// Invariant: F0 and F1 are still referenced by the current snapshot, so they must survive.
Assert.assertTrue(
"F0 is referenced by the current snapshot and must not be deleted by expiration",
table.io().exists(f0.path().toString()));
Assert.assertTrue(
"F1 is referenced by the current snapshot and must not be deleted by expiration",
table.io().exists(f1.path().toString()));
}

private DataFile appendOneRecord(UnkeyedTable table, int id, long txId) {
Record record = tableTestHelper().generateTestRecord(id, "name" + id, 0, "2022-01-01T00:00:00");
List<DataFile> dataFiles =
tableTestHelper().writeBaseStore(table, txId, Lists.newArrayList(record), false);
Assert.assertEquals(1, dataFiles.size());
AppendFiles appendFiles = table.newAppend();
dataFiles.forEach(appendFiles::appendFile);
appendFiles.commit();
return dataFiles.get(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileCleanupBridge;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Schema;
Expand All @@ -70,6 +72,7 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.SerializableFunction;
import org.apache.iceberg.util.SnapshotUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -201,14 +204,24 @@ private void expireSnapshots(long olderThan, int minCount, Set<String> exclude)
minCount,
exclude);
RollingFileCleaner expiredFileCleaner = new RollingFileCleaner(fileIO(), exclude);
table
.expireSnapshots()
.retainLast(Math.max(minCount, 1))
.expireOlderThan(olderThan)
.deleteWith(expiredFileCleaner::addFile)
.cleanExpiredFiles(
true) /* enable clean only for collecting the expired files, will delete them later */
.commit();
ExpireSnapshots expireSnapshots =
table
.expireSnapshots()
.retainLast(Math.max(minCount, 1))
.expireOlderThan(olderThan)
.deleteWith(expiredFileCleaner::addFile)
.cleanExpiredFiles(
true) /* enable clean only for collecting the expired files, will delete them later */;
// iceberg auto-selects IncrementalFileCleanup for single-ref tables. That strategy walks the
// current snapshot's ancestor chain and can terminate silently at a missing parent, then revert
// the ADDED entries of superseded-but-still-referenced manifests below the break - physically
// deleting data files the current snapshot still references (observed as partition data-file
// loss in production). The walk only truncates when a snapshot sits outside the current main
// ancestry, so force the safe ReachableFileCleanup then.
if (hasSnapshotsOutsideMainAncestry()) {
ReachableFileCleanupBridge.forceReachable(expireSnapshots);
}
expireSnapshots.commit();

int collectedFiles = expiredFileCleaner.fileCount();
expiredFileCleaner.clear();
Expand All @@ -227,6 +240,28 @@ private void expireSnapshots(long olderThan, int minCount, Set<String> exclude)
}
}

/**
* Whether the table holds any snapshot that is not reachable from the current snapshot's ancestor
* chain. Such a snapshot makes {@code IncrementalFileCleanup}'s ancestor walk truncate, which can
* lead it to delete data files still referenced by the current snapshot. The walk used here is
* the same {@link SnapshotUtil#ancestorIds} used by the cleanup, so it detects exactly the states
* the incremental strategy would mishandle.
*/
private boolean hasSnapshotsOutsideMainAncestry() {
Snapshot currentSnapshot = table.currentSnapshot();
if (currentSnapshot == null) {
return false;
}
Set<Long> mainAncestors =
Sets.newHashSet(SnapshotUtil.ancestorIds(currentSnapshot, table::snapshot));
for (Snapshot snapshot : table.snapshots()) {
if (!mainAncestors.contains(snapshot.snapshotId())) {
return true;
}
}
return false;
}

@Override
public void expireData() {
DataExpirationConfig expirationConfig = context.getTableConfiguration().getExpiringDataConfig();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg;

/**
* Bridge into iceberg-core to force the reachable file-cleanup strategy during snapshot expiration.
*
* <p>{@link RemoveSnapshots#withIncrementalCleanup(boolean)} is package-private, so this class
* lives in {@code org.apache.iceberg} to call it without reflection. Binding at compile time means
* a future change to that method's signature breaks the build instead of silently falling back.
*
* <p>By default iceberg auto-selects {@link IncrementalFileCleanup} whenever the table has a single
* ref. That strategy walks the current snapshot's ancestor chain and terminates silently if a
* parent snapshot is missing from metadata; snapshots below the break are then treated as
* non-ancestors and the ADDED entries in their superseded-but-still-referenced manifests are
* reverted, physically deleting data files the current snapshot still references. Forcing {@link
* ReachableFileCleanup} avoids the ancestor walk entirely and is safe for any ref count.
*/
public final class ReachableFileCleanupBridge {

private ReachableFileCleanupBridge() {}

/**
* Forces {@code expireSnapshots} to use {@link ReachableFileCleanup} instead of letting iceberg
* auto-select the incremental strategy.
*
* @param expireSnapshots the expire operation to configure
* @return the same operation, for fluent chaining
*/
public static ExpireSnapshots forceReachable(ExpireSnapshots expireSnapshots) {
if (expireSnapshots instanceof RemoveSnapshots) {
((RemoveSnapshots) expireSnapshots).withIncrementalCleanup(false);
}
return expireSnapshots;
}
}
Loading