Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
for (JobContext jobContext : jobContexts) {
JobConf conf = jobContext.getJobConf();

table = Optional.ofNullable(table).orElseGet(() -> Catalogs.loadTable(conf, catalogProperties));
table = Optional.ofNullable(table).orElseGet(() -> IcebergTableUtil.getTable(conf, catalogProperties));
Copy link
Copy Markdown
Member

@deniskuzZ deniskuzZ Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we need a fresh copy, to check conflicts, not a cached one

Copy link
Copy Markdown
Member Author

@ayushtkn ayushtkn Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it not break at time of commit? if our commit is conflicting? Means this is again reloading the table & reading the metadata which we already read once.

Between this and before we commit there could be another commit which could create conflict. I don't think we are under lock

In that case I believe some test would fail....

Copy link
Copy Markdown
Member

@deniskuzZ deniskuzZ Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's see, i think it was intentional. there was no conflict found during commit. but if that's not needed, we could save some calls

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @difin didn't you have some issue with that in a past?

Copy link
Copy Markdown
Member

@deniskuzZ deniskuzZ Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with that change, if it works

    Table snapshotTable = Optional.ofNullable(IcebergAcidUtil.getTransaction(table))
        .map(Transaction::table)
        .orElse(table);

is not longer needed and could be simplified

branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF);
snapshotId = getSnapshotId(outputTable.table, branchName);

Expand Down Expand Up @@ -518,15 +518,8 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
}

private Long getSnapshotId(Table table, String branchName) {
// Use the transaction table (if available) so that intra-txn changes
// (e.g. a prior INSERT) don't cause false conflict validation failures.
Table snapshotTable = Optional.ofNullable(IcebergAcidUtil.getTransaction(table))
.map(Transaction::table)
.orElse(table);

return Optional.ofNullable(IcebergTableUtil.getTableSnapshot(snapshotTable, branchName))
.map(Snapshot::snapshotId)
.orElse(null);
Snapshot snapshot = IcebergTableUtil.getTableSnapshot(table, branchName);
return Optional.ofNullable(snapshot).map(Snapshot::snapshotId).orElse(null);
}

/**
Expand Down
Loading