-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark:Add branch support to rewrite_data_files procedure #14964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
88dfc14
91ccc44
0e696f7
8ce0921
975e6e7
efb1d57
5ab9d80
d4e0a6e
2f3ee60
40c5a4a
640603c
b4fcf5f
560ff4e
efd06dd
649c4a7
72b40b3
23ec30c
21ba419
2e232c7
870cc50
f220b40
f7acee3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,6 +54,7 @@ | |
| import org.apache.iceberg.spark.SparkCatalogConfig; | ||
| import org.apache.iceberg.spark.SparkTableCache; | ||
| import org.apache.iceberg.spark.SystemFunctionPushDownHelper; | ||
| import org.apache.iceberg.spark.actions.SparkActions; | ||
| import org.apache.iceberg.spark.source.ThreeColumnRecord; | ||
| import org.apache.spark.sql.AnalysisException; | ||
| import org.apache.spark.sql.Dataset; | ||
|
|
@@ -1105,4 +1106,209 @@ private List<Object[]> currentData() { | |
| private List<Object[]> currentData(String table) { | ||
| return rowsToJava(spark.sql("SELECT * FROM " + table + " order by c1, c2, c3").collectAsList()); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testRewriteDataFilesOnBranch() { | ||
| createTable(); | ||
| insertData(10); | ||
|
|
||
| String branchName = "testBranch"; | ||
| sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName); | ||
|
|
||
| List<Object[]> expectedRecords = | ||
| rowsToJava( | ||
| spark | ||
| .sql( | ||
| String.format( | ||
| "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", tableName, branchName)) | ||
| .collectAsList()); | ||
|
|
||
| // Get snapshot IDs before rewrite | ||
| Table table = validationCatalog.loadTable(tableIdent); | ||
| long mainSnapshotId = table.currentSnapshot().snapshotId(); | ||
| long branchSnapshotId = table.refs().get(branchName).snapshotId(); | ||
|
|
||
| // Call rewrite_data_files on the branch | ||
| List<Object[]> output = | ||
| sql( | ||
| "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s')", | ||
| catalogName, tableName, branchName); | ||
|
|
||
| assertEquals( | ||
| "Action should rewrite 10 data files and add 1 data file", | ||
| row(10, 1), | ||
| Arrays.copyOf(output.get(0), 2)); | ||
|
|
||
| // Verify branch data is preserved after compaction | ||
| List<Object[]> actualRecords = | ||
| rowsToJava( | ||
| spark | ||
| .sql( | ||
| String.format( | ||
| "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", tableName, branchName)) | ||
| .collectAsList()); | ||
| assertEquals("Data after compaction should not change", expectedRecords, actualRecords); | ||
|
|
||
| // Verify branch snapshot changed | ||
| table.refresh(); | ||
| assertThat(table.refs().get(branchName).snapshotId()) | ||
| .as("Branch snapshot should be updated when files are rewritten") | ||
| .isNotEqualTo(branchSnapshotId); | ||
|
|
||
| // Verify main snapshot unchanged | ||
| assertThat(table.currentSnapshot().snapshotId()) | ||
| .as("Main snapshot should remain unchanged") | ||
| .isEqualTo(mainSnapshotId); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a compare between the actualRecords and expectedRecords?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for your review, Done! |
||
|
|
||
| @TestTemplate | ||
| public void testRewriteDataFilesToNullBranchFails() { | ||
| createTable(); | ||
| insertData(10); | ||
|
|
||
| Table table = validationCatalog.loadTable(tableIdent); | ||
|
|
||
| assertThatThrownBy(() -> SparkActions.get(spark).rewriteDataFiles(table).toBranch(null)) | ||
| .as("Invalid branch") | ||
| .isInstanceOf(IllegalArgumentException.class) | ||
| .hasMessage("Invalid branch name: null"); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testRewriteDataFilesOnBranchWithFilter() { | ||
| createPartitionTable(); | ||
| insertData(10); | ||
|
|
||
| String branchName = "filteredBranch"; | ||
| sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName); | ||
|
|
||
| List<Object[]> expectedRecords = | ||
| rowsToJava( | ||
| spark | ||
| .sql( | ||
| String.format( | ||
| "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", tableName, branchName)) | ||
| .collectAsList()); | ||
|
|
||
| // Get snapshot IDs before rewrite | ||
| Table table = validationCatalog.loadTable(tableIdent); | ||
| long mainSnapshotId = table.currentSnapshot().snapshotId(); | ||
| long branchSnapshotId = table.refs().get(branchName).snapshotId(); | ||
|
|
||
| // Call rewrite_data_files on the branch with filter (select only partition c2 = 'bar') | ||
| List<Object[]> output = | ||
| sql( | ||
| "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s', where => 'c2 = \"bar\"')", | ||
| catalogName, tableName, branchName); | ||
|
|
||
| assertEquals( | ||
| "Action should rewrite 5 data files from single matching partition" | ||
| + "(containing c2 = bar) and add 1 data file", | ||
| row(5, 1), | ||
| Arrays.copyOf(output.get(0), 2)); | ||
|
|
||
| // Verify branch data is preserved after compaction | ||
| List<Object[]> actualRecords = | ||
| rowsToJava( | ||
| spark | ||
| .sql( | ||
| String.format( | ||
| "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", tableName, branchName)) | ||
| .collectAsList()); | ||
| assertEquals("Data after compaction should not change", expectedRecords, actualRecords); | ||
|
|
||
| // Verify branch snapshot changed after rewrite | ||
| table.refresh(); | ||
| assertThat(table.refs().get(branchName).snapshotId()) | ||
| .as("Branch snapshot should be updated when files are rewritten") | ||
| .isNotEqualTo(branchSnapshotId); | ||
|
|
||
| // Verify main snapshot unchanged | ||
| assertThat(table.currentSnapshot().snapshotId()) | ||
| .as("Main snapshot should remain unchanged") | ||
| .isEqualTo(mainSnapshotId); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testBranchCompactionDoesNotAffectMain() { | ||
| createTable(); | ||
| // create 10 files under non-partitioned table | ||
| insertData(10); | ||
|
|
||
| Table table = validationCatalog.loadTable(tableIdent); | ||
|
|
||
| // Create branch from current main state | ||
| String branchName = "compactionBranch"; | ||
| sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName); | ||
|
|
||
| // Add more data to MAIN to make it diverge from branch | ||
| insertData(tableName, 10); | ||
|
|
||
| // Refresh to get new main snapshot after divergence | ||
| table.refresh(); | ||
| long mainSnapshotAfterDivergence = table.currentSnapshot().snapshotId(); | ||
| List<Object[]> expectedMainRecords = currentData(); | ||
|
|
||
| // Get branch data before compaction | ||
| List<Object[]> expectedBranchRecords = | ||
| rowsToJava( | ||
| spark | ||
| .sql( | ||
| String.format( | ||
| "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", tableName, branchName)) | ||
| .collectAsList()); | ||
|
|
||
| long branchSnapshotBeforeCompaction = table.refs().get(branchName).snapshotId(); | ||
|
|
||
| // Verify that branch and main have diverged | ||
| assertThat(branchSnapshotBeforeCompaction) | ||
| .as("Branch and main should have different snapshots") | ||
| .isNotEqualTo(mainSnapshotAfterDivergence); | ||
|
|
||
| List<Object[]> output = | ||
| sql( | ||
| "CALL %s.system.rewrite_data_files(table => '%s', branch => '%s')", | ||
| catalogName, tableName, branchName); | ||
|
|
||
| assertEquals( | ||
| "Action should rewrite 10 data files and add 1 data file", | ||
| row(10, 1), | ||
| Arrays.copyOf(output.get(0), 2)); | ||
|
|
||
| table.refresh(); | ||
|
|
||
| // Verify main snapshot unchanged | ||
| assertThat(table.currentSnapshot().snapshotId()) | ||
| .as("Main snapshot ID must remain unchanged after branch compaction") | ||
| .isEqualTo(mainSnapshotAfterDivergence); | ||
|
|
||
| // Verify main data unchanged | ||
| List<Object[]> actualMainRecords = currentData(); | ||
| assertEquals( | ||
| "Main data after compaction should not change", expectedMainRecords, actualMainRecords); | ||
|
|
||
| // Verify branch data unchanged | ||
| List<Object[]> actualBranchRecords = | ||
| rowsToJava( | ||
| spark | ||
| .sql( | ||
| String.format( | ||
| "SELECT * FROM %s.branch_%s ORDER BY c1, c2, c3", tableName, branchName)) | ||
| .collectAsList()); | ||
| assertEquals( | ||
| "Branch data after compaction should not change", | ||
| expectedBranchRecords, | ||
| actualBranchRecords); | ||
|
|
||
| // Verify branch snapshot changed | ||
| long branchSnapshotAfterCompaction = table.refs().get(branchName).snapshotId(); | ||
| assertThat(branchSnapshotAfterCompaction) | ||
| .as("Branch snapshot must be updated after compaction") | ||
| .isNotEqualTo(branchSnapshotBeforeCompaction); | ||
|
|
||
| // Verify the new branch snapshot is a child of the previous branch snapshot | ||
| assertThat(table.snapshot(branchSnapshotAfterCompaction).parentId()) | ||
| .as("New branch snapshot must be a child of the previous branch snapshot") | ||
| .isEqualTo(branchSnapshotBeforeCompaction); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,7 @@ | |
| import java.util.stream.Collectors; | ||
| import org.apache.iceberg.DataFile; | ||
| import org.apache.iceberg.FileScanTask; | ||
| import org.apache.iceberg.SnapshotRef; | ||
| import org.apache.iceberg.SortOrder; | ||
| import org.apache.iceberg.StructLike; | ||
| import org.apache.iceberg.Table; | ||
|
|
@@ -96,6 +97,7 @@ public class RewriteDataFilesSparkAction | |
| private boolean removeDanglingDeletes; | ||
| private boolean useStartingSequenceNumber; | ||
| private boolean caseSensitive; | ||
| private String branch = SnapshotRef.MAIN_BRANCH; | ||
| private BinPackRewriteFilePlanner planner = null; | ||
| private FileRewriteRunner<FileGroupInfo, FileScanTask, DataFile, RewriteFileGroup> runner = null; | ||
|
|
||
|
|
@@ -157,13 +159,24 @@ public RewriteDataFilesSparkAction filter(Expression expression) { | |
| return this; | ||
| } | ||
|
|
||
| public RewriteDataFilesSparkAction toBranch(String targetBranch) { | ||
| Preconditions.checkArgument(targetBranch != null, "Invalid branch name: null"); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have a branch with name "null"? The error message looks like
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @manuzhang: Sorry, I haven't seen your comment. Care to raise a PR?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I commented after the merge. @97harsh Can you create a follow-up PR? |
||
| this.branch = targetBranch; | ||
| return this; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: newline - if we keep this code
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we do this? It would make sense if we would define the If it is just Just a little more confusing, because you only can not reset the value back to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you this makes sense, the only logical path out of this I see is to default it to MAIN_BRANCH Updated to align with core Iceberg API behavior. SnapshotProducer.targetBranch() defaults to MAIN_BRANCH and rejects null with IllegalArgumentException - since RewriteDataFilesSparkAction eventually calls through to SnapshotProducer via rewrite.toBranch(), it makes sense to have consistent behavior at the action level. Changes:
cc: @huaxingao
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the update. The new change makes sense to me. |
||
| } | ||
|
|
||
| @Override | ||
| public RewriteDataFiles.Result execute() { | ||
| if (table.currentSnapshot() == null) { | ||
| return EMPTY_RESULT; | ||
| } | ||
|
|
||
| long startingSnapshotId = table.currentSnapshot().snapshotId(); | ||
| Preconditions.checkArgument( | ||
| table.snapshot(branch) != null, | ||
| "Cannot rewrite data files for branch %s: branch does not exist", | ||
| branch); | ||
|
|
||
| long startingSnapshotId = table.snapshot(branch).snapshotId(); | ||
|
|
||
| init(startingSnapshotId); | ||
|
|
||
|
|
@@ -230,7 +243,7 @@ private ExecutorService rewriteService() { | |
| @VisibleForTesting | ||
| RewriteDataFilesCommitManager commitManager(long startingSnapshotId) { | ||
| return new RewriteDataFilesCommitManager( | ||
| table, startingSnapshotId, useStartingSequenceNumber, commitSummary()); | ||
| table, startingSnapshotId, useStartingSequenceNumber, commitSummary(), branch); | ||
| } | ||
|
|
||
| private Builder doExecute( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor : i wonder if we could just set this to MAIN branch as default, so we don't have to do isNull checks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to use
nullto delegate toSnapshotProducer's default behavior rather than explicitly setting "main".This avoids imposing an opinion - if the default branch behavior changes in the future, this will automatically follow. The null check pattern is also consistent with how branch handling works elsewhere in the codebase (see SnapshotUtil methods that treat null as "use default").
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then again, it makes sense to remove a lot of isNull checks by making toBranch(null) a no-op. This should make the code cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary / @singhpk234 any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is something, I have also considered during the review, but I don't have a strong opinion, so I left as it is.
@singhpk234?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaning +1 on letting null mean “use default branch” and making toBranch(null) a no‑op.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your review, done