Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.tag.Tag;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DataFilePathFactories;
Expand All @@ -85,6 +86,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
Expand Down Expand Up @@ -1021,8 +1023,18 @@ private void deleteApplicableMetadataFiles(long snapshotId) throws IOException {

@Override
public void notifyCreation(String tagName) {
throw new UnsupportedOperationException(
"IcebergCommitCallback notifyCreation requires a snapshot ID");
// The base TagCallback API does not carry a snapshot id, but Iceberg refs
// require one. The tag is persisted by TagManager before this callback
// fires, so resolve the snapshot the tag points to and delegate to the
// snapshot aware overload.
Optional<Tag> tag = table.tagManager().get(tagName);
if (!tag.isPresent()) {
LOG.info(
"Tag {} not found in Paimon TagManager when creating Iceberg ref. Unable to create tag.",
tagName);
return;
}
notifyCreation(tagName, tag.get().id());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,25 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.tag.Tag;
import org.apache.paimon.utils.TagManager;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Answers;

import java.lang.reflect.Field;
import java.util.Optional;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/** Tests for {@link IcebergCommitCallback}. */
Expand All @@ -53,6 +62,46 @@ void setUp() {
when(mockCoreOptions.toConfiguration()).thenReturn(mockConfig);
}

@Test
void testNotifyCreationWithoutSnapshotIdSkipsWhenTagMissing() throws Exception {
FileStoreTable table = mock(FileStoreTable.class);
TagManager tagManager = mock(TagManager.class);
when(table.tagManager()).thenReturn(tagManager);
when(tagManager.get("missing")).thenReturn(Optional.empty());

IcebergCommitCallback callback =
mock(IcebergCommitCallback.class, Answers.CALLS_REAL_METHODS);
setField(callback, "table", table);

assertThatCode(() -> callback.notifyCreation("missing")).doesNotThrowAnyException();
verify(tagManager).get("missing");
}

@Test
void testNotifyCreationWithoutSnapshotIdDelegatesUsingTagManagerSnapshotId() throws Exception {
FileStoreTable table = mock(FileStoreTable.class);
TagManager tagManager = mock(TagManager.class);
Tag tag = mock(Tag.class);
when(tag.id()).thenReturn(42L);
when(table.tagManager()).thenReturn(tagManager);
when(tagManager.get("v1")).thenReturn(Optional.of(tag));

IcebergCommitCallback callback =
mock(IcebergCommitCallback.class, Answers.CALLS_REAL_METHODS);
setField(callback, "table", table);
doNothing().when(callback).notifyCreation("v1", 42L);

callback.notifyCreation("v1");

verify(callback).notifyCreation("v1", 42L);
}

private static void setField(Object target, String fieldName, Object value) throws Exception {
Field field = IcebergCommitCallback.class.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(target, value);
}

@ParameterizedTest(name = "StorageType: {0}")
@MethodSource("provideMetadataPathsWithStorageType")
void testCatalogTableMetadataPathWithStorageType(
Expand Down