Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
88dfc14
Add branch support to rewrite_data_files procedure
97harsh Jan 5, 2026
91ccc44
Add toBranch() implementation and tests for rewrite_data_files procedure
97harsh Jan 5, 2026
0e696f7
gradlew -Dall
97harsh Jan 5, 2026
8ce0921
Fix tests for rewrite_data_files with branch support
97harsh Jan 5, 2026
975e6e7
Fix constructor chain in RewriteDataFilesCommitManager to prevent NPE
97harsh Jan 5, 2026
efb1d57
Fix rewrite_data_files to use branch snapshot instead of main
97harsh Jan 5, 2026
5ab9d80
Test: Verify branch snapshot lineage in rewrite_data_files test
97harsh Jan 5, 2026
d4e0a6e
Fix spotless formatting in test and action files
97harsh Jan 5, 2026
2f3ee60
Add precondition to check branch exists in rewrite_data_files
97harsh Jan 5, 2026
40c5a4a
Refactor RewriteDataFilesProcedure to use concrete types in helper me…
97harsh Jan 6, 2026
640603c
Revert branch support from Spark 3.4, 3.5, 4.0
97harsh Jan 6, 2026
b4fcf5f
Remove debug comment from branch compaction test
97harsh Jan 6, 2026
560ff4e
Trigger CI
97harsh Jan 6, 2026
efd06dd
Trigger CI
97harsh Jan 6, 2026
649c4a7
Address PR review comments for branch compaction tests
97harsh Jan 7, 2026
72b40b3
Remove unnecessary cast in RewriteDataFilesProcedure
97harsh Jan 7, 2026
23ec30c
Make toBranch(null) a no-op for default branch behavior
97harsh Jan 8, 2026
21ba419
Align toBranch() with SnapshotProducer: default to MAIN_BRANCH and re…
97harsh Jan 9, 2026
2e232c7
Trigger Build
97harsh Jan 10, 2026
870cc50
Fix null branch handling in RewriteDataFilesProcedure
97harsh Jan 10, 2026
f220b40
Update spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/proced…
97harsh Jan 13, 2026
f7acee3
Fix effectively final variable for lambda capture
97harsh Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class RewriteDataFilesCommitManager {
private final long startingSnapshotId;
private final boolean useStartingSequenceNumber;
private final Map<String, String> snapshotProperties;
private final String branch;

// constructor used for testing
public RewriteDataFilesCommitManager(Table table) {
Expand All @@ -60,10 +61,20 @@ public RewriteDataFilesCommitManager(
long startingSnapshotId,
boolean useStartingSequenceNumber,
Map<String, String> snapshotProperties) {
this(table, startingSnapshotId, useStartingSequenceNumber, snapshotProperties, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor : i wonder if we could just set this to MAIN branch as default, so we don't have to do isNull checks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to use null to delegate to SnapshotProducer's default behavior rather than explicitly setting "main".
This avoids imposing an opinion - if the default branch behavior changes in the future, this will automatically follow. The null check pattern is also consistent with how branch handling works elsewhere in the codebase (see SnapshotUtil methods that treat null as "use default").

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then again, it makes sense to remove a lot of isNull checks by making toBranch(null) a no-op. This should make the code cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary / @singhpk234 any suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something, I have also considered during the review, but I don't have a strong opinion, so I left as it is.

@singhpk234?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaning +1 on letting null mean “use default branch” and making toBranch(null) a no‑op.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review, done

  • toBranch(null) results in no-op
  • removed one condition check for branch!=null

}

public RewriteDataFilesCommitManager(
Table table,
long startingSnapshotId,
boolean useStartingSequenceNumber,
Map<String, String> snapshotProperties,
String branch) {
this.table = table;
this.startingSnapshotId = startingSnapshotId;
this.useStartingSequenceNumber = useStartingSequenceNumber;
this.snapshotProperties = snapshotProperties;
this.branch = branch;
}

/**
Expand Down Expand Up @@ -94,6 +105,10 @@ public void commitFileGroups(Set<RewriteFileGroup> fileGroups) {

snapshotProperties.forEach(rewrite::set);

if (branch != null) {
rewrite.toBranch(branch);
}

rewrite.commit();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.spark.SystemFunctionPushDownHelper;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -1105,4 +1106,209 @@ private List<Object[]> currentData() {
private List<Object[]> currentData(String table) {
return rowsToJava(spark.sql("SELECT * FROM " + table + " order by c1, c2, c3").collectAsList());
}

@TestTemplate
public void testRewriteDataFilesOnBranch() {
createTable();
insertData(10);

String branchName = "testBranch";
sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);

List<Object[]> expectedRecords =
rowsToJava(
spark
.sql(
String.format(
"SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", tableName, branchName))
.collectAsList());

// Get snapshot IDs before rewrite
Table table = validationCatalog.loadTable(tableIdent);
long mainSnapshotId = table.currentSnapshot().snapshotId();
long branchSnapshotId = table.refs().get(branchName).snapshotId();

// Call rewrite_data_files on the branch
List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files(table => '%s', branch => '%s')",
catalogName, tableName, branchName);

assertEquals(
"Action should rewrite 10 data files and add 1 data file",
row(10, 1),
Arrays.copyOf(output.get(0), 2));

// Verify branch data is preserved after compaction
List<Object[]> actualRecords =
rowsToJava(
spark
.sql(
String.format(
"SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", tableName, branchName))
.collectAsList());
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);

// Verify branch snapshot changed
table.refresh();
assertThat(table.refs().get(branchName).snapshotId())
.as("Branch snapshot should be updated when files are rewritten")
.isNotEqualTo(branchSnapshotId);

// Verify main snapshot unchanged
assertThat(table.currentSnapshot().snapshotId())
.as("Main snapshot should remain unchanged")
.isEqualTo(mainSnapshotId);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a compare between the actualRecords and expectedRecords?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review, Done!


@TestTemplate
public void testRewriteDataFilesToNullBranchFails() {
createTable();
insertData(10);

Table table = validationCatalog.loadTable(tableIdent);

assertThatThrownBy(() -> SparkActions.get(spark).rewriteDataFiles(table).toBranch(null))
.as("Invalid branch")
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid branch name: null");
}

@TestTemplate
public void testRewriteDataFilesOnBranchWithFilter() {
createPartitionTable();
insertData(10);

String branchName = "filteredBranch";
sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);

List<Object[]> expectedRecords =
rowsToJava(
spark
.sql(
String.format(
"SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", tableName, branchName))
.collectAsList());

// Get snapshot IDs before rewrite
Table table = validationCatalog.loadTable(tableIdent);
long mainSnapshotId = table.currentSnapshot().snapshotId();
long branchSnapshotId = table.refs().get(branchName).snapshotId();

// Call rewrite_data_files on the branch with filter (select only partition c2 = 'bar')
List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files(table => '%s', branch => '%s', where => 'c2 = \"bar\"')",
catalogName, tableName, branchName);

assertEquals(
"Action should rewrite 5 data files from single matching partition"
+ "(containing c2 = bar) and add 1 data file",
row(5, 1),
Arrays.copyOf(output.get(0), 2));

// Verify branch data is preserved after compaction
List<Object[]> actualRecords =
rowsToJava(
spark
.sql(
String.format(
"SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", tableName, branchName))
.collectAsList());
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);

// Verify branch snapshot changed after rewrite
table.refresh();
assertThat(table.refs().get(branchName).snapshotId())
.as("Branch snapshot should be updated when files are rewritten")
.isNotEqualTo(branchSnapshotId);

// Verify main snapshot unchanged
assertThat(table.currentSnapshot().snapshotId())
.as("Main snapshot should remain unchanged")
.isEqualTo(mainSnapshotId);
}

@TestTemplate
public void testBranchCompactionDoesNotAffectMain() {
createTable();
// create 10 files under non-partitioned table
insertData(10);

Table table = validationCatalog.loadTable(tableIdent);

// Create branch from current main state
String branchName = "compactionBranch";
sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName);

// Add more data to MAIN to make it diverge from branch
insertData(tableName, 10);

// Refresh to get new main snapshot after divergence
table.refresh();
long mainSnapshotAfterDivergence = table.currentSnapshot().snapshotId();
List<Object[]> expectedMainRecords = currentData();

// Get branch data before compaction
List<Object[]> expectedBranchRecords =
rowsToJava(
spark
.sql(
String.format(
"SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", tableName, branchName))
.collectAsList());

long branchSnapshotBeforeCompaction = table.refs().get(branchName).snapshotId();

// Verify that branch and main have diverged
assertThat(branchSnapshotBeforeCompaction)
.as("Branch and main should have different snapshots")
.isNotEqualTo(mainSnapshotAfterDivergence);

List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files(table => '%s', branch => '%s')",
catalogName, tableName, branchName);

assertEquals(
"Action should rewrite 10 data files and add 1 data file",
row(10, 1),
Arrays.copyOf(output.get(0), 2));

table.refresh();

// Verify main snapshot unchanged
assertThat(table.currentSnapshot().snapshotId())
.as("Main snapshot ID must remain unchanged after branch compaction")
.isEqualTo(mainSnapshotAfterDivergence);

// Verify main data unchanged
List<Object[]> actualMainRecords = currentData();
assertEquals(
"Main data after compaction should not change", expectedMainRecords, actualMainRecords);

// Verify branch data unchanged
List<Object[]> actualBranchRecords =
rowsToJava(
spark
.sql(
String.format(
"SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", tableName, branchName))
.collectAsList());
assertEquals(
"Branch data after compaction should not change",
expectedBranchRecords,
actualBranchRecords);

// Verify branch snapshot changed
long branchSnapshotAfterCompaction = table.refs().get(branchName).snapshotId();
assertThat(branchSnapshotAfterCompaction)
.as("Branch snapshot must be updated after compaction")
.isNotEqualTo(branchSnapshotBeforeCompaction);

// Verify the new branch snapshot is a child of the previous branch snapshot
assertThat(table.snapshot(branchSnapshotAfterCompaction).parentId())
.as("New branch snapshot must be a child of the previous branch snapshot")
.isEqualTo(branchSnapshotBeforeCompaction);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -96,6 +97,7 @@ public class RewriteDataFilesSparkAction
private boolean removeDanglingDeletes;
private boolean useStartingSequenceNumber;
private boolean caseSensitive;
private String branch = SnapshotRef.MAIN_BRANCH;
private BinPackRewriteFilePlanner planner = null;
private FileRewriteRunner<FileGroupInfo, FileScanTask, DataFile, RewriteFileGroup> runner = null;

Expand Down Expand Up @@ -157,13 +159,24 @@ public RewriteDataFilesSparkAction filter(Expression expression) {
return this;
}

public RewriteDataFilesSparkAction toBranch(String targetBranch) {
Preconditions.checkArgument(targetBranch != null, "Invalid branch name: null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a branch with name "null"? The error message looks like targetBranch.equals("null") to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@manuzhang: Sorry, I haven't seen your comment. Care to raise a PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented after the merge. @97harsh Can you create a follow-up PR?

this.branch = targetBranch;
return this;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline - if we keep this code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do this?

It would make sense if we would define the branch differently, like:

private String branch = SnapshotRef.MAIN_BRANCH;

If it is just null then this code is just confusing. Basically equals

this.branch = targetBranch;
return this;

Just a little more confusing, because you only can not reset the value back to null 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you this makes sense, the only logical path out of this I see is to default it to MAIN_BRANCH

Updated to align with core Iceberg API behavior. SnapshotProducer.targetBranch() defaults to MAIN_BRANCH and rejects null with IllegalArgumentException - since RewriteDataFilesSparkAction eventually calls through to SnapshotProducer via rewrite.toBranch(), it makes sense to have consistent behavior at the action level.

Changes:

  • branch now defaults to SnapshotRef.MAIN_BRANCH instead of null
  • toBranch(null) now throws IllegalArgumentException (matching SnapshotProducer)
  • Removed null-check guards in execute() that are no longer needed
  • Added test testRewriteDataFilesToNullBranchFails

cc: @huaxingao

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. The new change makes sense to me.

}

@Override
public RewriteDataFiles.Result execute() {
if (table.currentSnapshot() == null) {
return EMPTY_RESULT;
}

long startingSnapshotId = table.currentSnapshot().snapshotId();
Preconditions.checkArgument(
table.snapshot(branch) != null,
"Cannot rewrite data files for branch %s: branch does not exist",
branch);

long startingSnapshotId = table.snapshot(branch).snapshotId();

init(startingSnapshotId);

Expand Down Expand Up @@ -230,7 +243,7 @@ private ExecutorService rewriteService() {
@VisibleForTesting
RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
return new RewriteDataFilesCommitManager(
table, startingSnapshotId, useStartingSequenceNumber, commitSummary());
table, startingSnapshotId, useStartingSequenceNumber, commitSummary(), branch);
}

private Builder doExecute(
Expand Down
Loading