Skip to content

Conversation

@97harsh
Copy link
Contributor

@97harsh 97harsh commented Jan 5, 2026

Spark: Add branch support to rewrite_data_files procedure

This change enables the rewrite_data_files stored procedure to rewrite
data files on specific branches instead of only on the main branch.

Implementation:

  • Core: Extended RewriteDataFilesCommitManager to accept and use branch parameter
  • Action: Added toBranch() method to RewriteDataFilesSparkAction (v3.4, v3.5, v4.0, v4.1)
  • Procedure: Added optional branch parameter to RewriteDataFilesProcedure (all versions)
  • Tests: Added branch-specific test coverage for all Spark versions

Users can specify branches in two ways:

  1. Table identifier: CALL system.rewrite_data_files('db.table.branch_myBranch')
  2. Explicit parameter: CALL system.rewrite_data_files(table => 'db.table', branch => 'myBranch')

The implementation follows the existing pattern used by SparkWrite and other
branch-aware operations. The commit manager already had branch support built in,
this change wires it through the action and procedure layers.

Fixes #14813

This commit adds branch support to the rewrite_data_files Spark SQL
procedure, allowing users to rewrite data files on specific branches
instead of only the main branch.

Changes:
- Core: Updated RewriteDataFilesCommitManager to accept and apply branch parameter
- Action: Added toBranch() method to RewriteDataFilesSparkAction
- Procedure: Added branch parameter to all Spark versions (v3.4, v3.5, v4.0, v4.1)

Users can now specify branches in two ways:
1. Via table identifier: CALL system.rewrite_data_files('db.table.branch_myBranch')
2. Via explicit parameter: CALL system.rewrite_data_files(table => 'db.table', branch => 'myBranch')

Fixes apache#14813
@97harsh 97harsh marked this pull request as draft January 5, 2026 09:41
This commit fixes the compilation error by implementing the missing toBranch()
method in RewriteDataFilesSparkAction for Spark versions 3.4, 3.5, and 4.0.

Changes:
- Added toBranch(String targetBranch) method to RewriteDataFilesSparkAction
- Updated commitManager() to pass branch parameter to RewriteDataFilesCommitManager
- Added comprehensive branch tests to TestRewriteDataFilesProcedure (all versions)

The implementation follows the same pattern as v4.1 and matches how SparkWrite
handles branches. Integration tests passing: iceberg-delta-lake:check
- Add missing Table import in Spark 3.4 test file
- Fix branch names to use camelCase (testBranch, filteredBranch) to avoid SQL parsing errors
- Ensure files are actually rewritten by inserting multiple small files
- Add min-input-files option to force file compaction
- Remove incorrect snapshot ID ordering assertions
- Add explicit assertions to verify files are rewritten and snapshots change
@97harsh 97harsh marked this pull request as ready for review January 5, 2026 12:06
The previous implementation incorrectly passed null as a 4th parameter in
the 2-arg and 3-arg constructors, which caused them to call the wrong
constructor overload. This resulted in snapshotProperties being null,
leading to a NullPointerException when commitFileGroups() tried to iterate
over properties with forEach().

The issue broke Flink maintenance API tests (TestRewriteDataFiles and
TestFlinkTableSinkCompaction) because the Flink DataFileRewriteCommitter
uses the 2-arg constructor. Files were not being rewritten as expected.

Changes:
- Line 51: Remove null parameter to call 3-arg constructor
- Line 56: Remove null parameter to call 4-arg constructor with Map

This ensures the constructor chain properly passes through existing
constructors without introducing null values, and branch parameter is
correctly passed only through the appropriate constructors.
@97harsh 97harsh marked this pull request as draft January 5, 2026 13:53
When rewrite_data_files was called with a branch parameter, the planner
incorrectly used the main branch's snapshot to scan for files to compact,
while the commit targeted the specified branch. This caused validation
failures when branches diverged.

The fix ensures RewriteDataFilesSparkAction.execute() uses the branch's
snapshot ID when a branch is specified, allowing the planner to correctly
identify and compact files from the branch.

This change applies to all Spark versions (3.4, 3.5, 4.0, 4.1) and fixes
all rewrite strategies (binpack, sort, z-order) since they all rely on
the snapshot ID passed from RewriteDataFilesSparkAction.
Enhanced testBranchCompactionDoesNotAffectMain to verify that the new
snapshot created by rewrite_data_files is a child of the previous branch
snapshot. This ensures the compaction is committed to the branch's history
chain, not to main.

The assertion checks that:
  table.snapshot(branchSnapshotAfterCompaction).parentId() == branchSnapshotBeforeCompaction

This provides stronger validation that the rewrite operation correctly
targets and modifies the specified branch.
@97harsh 97harsh marked this pull request as ready for review January 5, 2026 15:20
Apply spotless formatting to multi-line sql() and assertThat() calls
across Spark 3.4, 3.5, and 4.0 modules.
Adds a precondition check in RewriteDataFilesSparkAction.execute() to
verify that the specified branch exists before attempting to access its
snapshot. This provides a clear error message instead of a cryptic
NullPointerException when a non-existent branch is specified.
@97harsh 97harsh requested a review from pvary January 6, 2026 03:47
@pvary
Copy link
Contributor

pvary commented Jan 6, 2026

Left some comments.
Could you please remove the Spark 3.4, 3.5, 4.0 changes for now, so it is easier to remove, and apply the changes requested by the reviewers?

In a next PR we will do the backport which should be easy and clean.

Thanks!

…thods

Change checkAndApplyFilter and checkAndApplyStrategy to accept and return
RewriteDataFilesSparkAction instead of the RewriteDataFiles interface,
eliminating unnecessary casts at call sites.
Remove rewrite_data_files branch support from older Spark versions,
keeping the feature only in Spark 4.1.
Copy link
Contributor

@singhpk234 singhpk234 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM as well, just a minor / optional suggestion

long startingSnapshotId,
boolean useStartingSequenceNumber,
Map<String, String> snapshotProperties) {
this(table, startingSnapshotId, useStartingSequenceNumber, snapshotProperties, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor : i wonder if we could just set this to MAIN branch as default, so we don't have to do isNull checks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to use null to delegate to SnapshotProducer's default behavior rather than explicitly setting "main".
This avoids imposing an opinion - if the default branch behavior changes in the future, this will automatically follow. The null check pattern is also consistent with how branch handling works elsewhere in the codebase (see SnapshotUtil methods that treat null as "use default").

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then again, it makes sense to remove a lot of isNull checks by making toBranch(null) a no-op. This should make the code cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary / @singhpk234 any suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something, I have also considered during the review, but I don't have a strong opinion, so I left as it is.

@singhpk234?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaning +1 on letting null mean “use default branch” and making toBranch(null) a no‑op.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review, done

  • toBranch(null) results in no-op
  • removed one condition check for branch!=null

@97harsh 97harsh requested a review from singhpk234 January 7, 2026 17:21
@97harsh
Copy link
Contributor Author

97harsh commented Jan 7, 2026

Are you good @singhpk234 leaving as is?
Can we merge if good?

private boolean removeDanglingDeletes;
private boolean useStartingSequenceNumber;
private boolean caseSensitive;
private String branch = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as what Prashant suggested above, I think we can probably default branch to main and can always assert on table.snapshot(branch) != null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @dramaticlly , per @huaxingao 's recommendation kept

  • null->default branch
  • toBranch(null) results in no-op

This change ensures that calling toBranch(null) does not modify the
internal branch state, treating null as "use default branch". This
simplifies the calling code and removes redundant null checks.
@97harsh
Copy link
Contributor Author

97harsh commented Jan 8, 2026

@huaxingao
Can we merge if good?

if (targetBranch != null) {
this.branch = targetBranch;
}
return this;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline - if we keep this code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do this?

It would make sense if we would define the branch differently, like:

private String branch = SnapshotRef.MAIN_BRANCH;

If it is just null then this code is just confusing. Basically equals

this.branch = targetBranch;
return this;

Just a little more confusing, because you only can not reset the value back to null 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you this makes sense, the only logical path out of this I see is to default it to MAIN_BRANCH

Updated to align with core Iceberg API behavior. SnapshotProducer.targetBranch() defaults to MAIN_BRANCH and rejects null with IllegalArgumentException - since RewriteDataFilesSparkAction eventually calls through to SnapshotProducer via rewrite.toBranch(), it makes sense to have consistent behavior at the action level.

Changes:

  • branch now defaults to SnapshotRef.MAIN_BRANCH instead of null
  • toBranch(null) now throws IllegalArgumentException (matching SnapshotProducer)
  • Removed null-check guards in execute() that are no longer needed
  • Added test testRewriteDataFilesToNullBranchFails

cc: @huaxingao

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. The new change makes sense to me.

…ject null

This change aligns RewriteDataFilesSparkAction.toBranch() with the core
Iceberg API behavior in SnapshotProducer.targetBranch():
- Default branch to SnapshotRef.MAIN_BRANCH instead of null
- Reject null branch with IllegalArgumentException
- Remove null-check guards that are no longer needed

Since RewriteDataFilesSparkAction eventually calls rewrite.toBranch()
which invokes SnapshotProducer.targetBranch(), having consistent
behavior at the action level prevents confusion and potential runtime
errors.
Default to MAIN_BRANCH when branch parameter is not provided, preventing
IllegalArgumentException from toBranch() validation. Simplifies branch
resolution logic by using Objects.requireNonNullElse and removes unused
SparkTable import.
@97harsh 97harsh force-pushed the feature/rewrite-data-files-branch-support branch from 8f0b83c to 870cc50 Compare January 10, 2026 17:23
@97harsh 97harsh requested a review from pvary January 11, 2026 15:40
@97harsh
Copy link
Contributor Author

97harsh commented Jan 11, 2026

@pvary, @huaxingao What's the process to merge if this looks good

@huaxingao
Copy link
Contributor

I will wait a day or two to see if there are any further comments. If not, I will merge it by the end of tomorrow.

if (targetBranch == null) {
targetBranch = loadSparkTable(tableIdent).branch();
}
if (targetBranch == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline

}
if (targetBranch == null) {
targetBranch = SnapshotRef.MAIN_BRANCH;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline

Comment on lines 121 to 131
String explicitBranch = input.asString(BRANCH_PARAM, null);

// Determine target branch: explicit parameter > table branch > main branch
String targetBranch = explicitBranch;
if (targetBranch == null) {
targetBranch = loadSparkTable(tableIdent).branch();
}
if (targetBranch == null) {
targetBranch = SnapshotRef.MAIN_BRANCH;
}
String branch = targetBranch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String explicitBranch = input.asString(BRANCH_PARAM, null);
// Determine target branch: explicit parameter > table branch > main branch
String targetBranch = explicitBranch;
if (targetBranch == null) {
targetBranch = loadSparkTable(tableIdent).branch();
}
if (targetBranch == null) {
targetBranch = SnapshotRef.MAIN_BRANCH;
}
String branch = targetBranch;
// Determine target branch: explicit parameter > table branch > main branch
String branch = input.asString(BRANCH_PARAM, null);
if (branch == null) {
branch = loadSparkTable(tableIdent).branch();
if (branch == null) {
branch = SnapshotRef.MAIN_BRANCH;
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running into java compilation error with this, so reverting, will nest the ifs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The branch variable is used inside a lambda. Java requires variables used in lambdas to be effectively final. So needs a separate variable.

97harsh and others added 2 commits January 13, 2026 06:41
…ures/RewriteDataFilesProcedure.java

Co-authored-by: pvary <peter.vary.apache@gmail.com>
The branch variable was reassigned multiple times before being
used in a lambda, violating Java's effectively final requirement.
@97harsh
Copy link
Contributor Author

97harsh commented Jan 16, 2026

@pvary @huaxingao Ready to merge now?

@pvary pvary merged commit b4ef17c into apache:main Jan 16, 2026
32 checks passed
@pvary
Copy link
Contributor

pvary commented Jan 16, 2026

Merged to main.
Sorry for the delay.
Thanks @97harsh for the new feature, and @huaxingao for the review!

}

public RewriteDataFilesSparkAction toBranch(String targetBranch) {
Preconditions.checkArgument(targetBranch != null, "Invalid branch name: null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a branch with name "null"? The error message looks like targetBranch.equals("null") to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@manuzhang: Sorry, I haven't seen your comment. Care to raise a PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented after the merge. @97harsh Can you create a follow-up PR?

97harsh added a commit to 97harsh/iceberg that referenced this pull request Jan 16, 2026
….5, and 3.4

This backports the branch support feature from Spark 4.1 (PR apache#14964) to Spark
4.0, 3.5, and 3.4. The changes add a new `branch` parameter to the
`rewrite_data_files` procedure that allows users to specify which branch to
compact data files for.

Key changes:
- Add `toBranch()` method to RewriteDataFilesSparkAction
- Add `branch` parameter to RewriteDataFilesProcedure
- Use branch snapshot ID instead of current snapshot for compaction
- Pass branch parameter to RewriteDataFilesCommitManager
97harsh added a commit to 97harsh/iceberg that referenced this pull request Jan 16, 2026
….0, 3.5, and 3.4

Backport test cases from apache#14964:
- testRewriteDataFilesOnBranch
- testRewriteDataFilesToNullBranchFails
- testRewriteDataFilesOnBranchWithFilter
- testBranchCompactionDoesNotAffectMain
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RewriteDataFiles procedure does not support branch operations

7 participants