Skip to content

Commit 8d711b3

Browse files
authored
HIVE-29540: Iceberg: [V3] Support filtering with ROW LINEAGE columns (#6403)
1 parent ef2c646 commit 8d711b3

3 files changed

Lines changed: 60 additions & 2 deletions

File tree

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,9 +210,11 @@
210210
import org.slf4j.LoggerFactory;
211211

212212
import static org.apache.hadoop.hive.ql.metadata.VirtualColumn.FILE_PATH;
213+
import static org.apache.hadoop.hive.ql.metadata.VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER;
213214
import static org.apache.hadoop.hive.ql.metadata.VirtualColumn.PARTITION_HASH;
214215
import static org.apache.hadoop.hive.ql.metadata.VirtualColumn.PARTITION_PROJECTION;
215216
import static org.apache.hadoop.hive.ql.metadata.VirtualColumn.PARTITION_SPEC_ID;
217+
import static org.apache.hadoop.hive.ql.metadata.VirtualColumn.ROW_LINEAGE_ID;
216218
import static org.apache.hadoop.hive.ql.metadata.VirtualColumn.ROW_POSITION;
217219
import static org.apache.iceberg.SnapshotSummary.ADDED_RECORDS_PROP;
218220
import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP;
@@ -392,8 +394,9 @@ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer dese
392394
ExprNodeDesc pushedPredicate = exprNodeDesc.clone();
393395

394396
List<ExprNodeDesc> subExprNodes = pushedPredicate.getChildren();
395-
Set<String> skipList = Stream.of(FILE_PATH, PARTITION_SPEC_ID, PARTITION_HASH)
396-
.map(VirtualColumn::getName).collect(Collectors.toSet());
397+
Set<String> skipList =
398+
Stream.of(FILE_PATH, PARTITION_SPEC_ID, PARTITION_HASH, ROW_LINEAGE_ID, LAST_UPDATED_SEQUENCE_NUMBER)
399+
.map(VirtualColumn::getName).collect(Collectors.toSet());
397400

398401
if (subExprNodes.removeIf(nodeDesc -> nodeDesc.getCols() != null &&
399402
nodeDesc.getCols().stream().anyMatch(skipList::contains))) {

iceberg/iceberg-handler/src/test/queries/positive/row_lineage.q

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@ update ice_t set balance = 500 where id = 2;
66

77
select id, name, balance, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t order by id;
88

9+
-- Test filtering with row lineage columns
10+
select id, name, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t where ROW__LINEAGE__ID = 1;
11+
select id, name, balance, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t where LAST__UPDATED__SEQUENCE__NUMBER = 1;
12+
select *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t where LAST__UPDATED__SEQUENCE__NUMBER = 2 OR ROW__LINEAGE__ID = 1;
13+
delete from ice_t where ROW__LINEAGE__ID = 1 OR LAST__UPDATED__SEQUENCE__NUMBER = 2;
14+
select id, name, balance, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t order by id;
15+
916
-- copy-on-write
1017
create table ice_t_cow (id int, name string, balance int) stored by iceberg TBLPROPERTIES ('format-version'='3', 'write.update.mode'='copy-on-write');
1118
insert into ice_t_cow values (1, 'aaa', 25),(2, 'bbb', 35),(3, 'ccc', 82),(4, 'ddd', 91);

iceberg/iceberg-handler/src/test/results/positive/row_lineage.q.out

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,54 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
4848
2 bbb 500 1 2
4949
3 ccc 82 2 1
5050
4 ddd 91 3 1
51+
PREHOOK: query: select id, name, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t where ROW__LINEAGE__ID = 1
52+
PREHOOK: type: QUERY
53+
PREHOOK: Input: default@ice_t
54+
PREHOOK: Output: hdfs://### HDFS PATH ###
55+
POSTHOOK: query: select id, name, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t where ROW__LINEAGE__ID = 1
56+
POSTHOOK: type: QUERY
57+
POSTHOOK: Input: default@ice_t
58+
POSTHOOK: Output: hdfs://### HDFS PATH ###
59+
2 bbb 1 2
60+
PREHOOK: query: select id, name, balance, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t where LAST__UPDATED__SEQUENCE__NUMBER = 1
61+
PREHOOK: type: QUERY
62+
PREHOOK: Input: default@ice_t
63+
PREHOOK: Output: hdfs://### HDFS PATH ###
64+
POSTHOOK: query: select id, name, balance, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t where LAST__UPDATED__SEQUENCE__NUMBER = 1
65+
POSTHOOK: type: QUERY
66+
POSTHOOK: Input: default@ice_t
67+
POSTHOOK: Output: hdfs://### HDFS PATH ###
68+
1 aaa 25 0 1
69+
3 ccc 82 2 1
70+
4 ddd 91 3 1
71+
PREHOOK: query: select *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t where LAST__UPDATED__SEQUENCE__NUMBER = 2 OR ROW__LINEAGE__ID = 1
72+
PREHOOK: type: QUERY
73+
PREHOOK: Input: default@ice_t
74+
PREHOOK: Output: hdfs://### HDFS PATH ###
75+
POSTHOOK: query: select *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t where LAST__UPDATED__SEQUENCE__NUMBER = 2 OR ROW__LINEAGE__ID = 1
76+
POSTHOOK: type: QUERY
77+
POSTHOOK: Input: default@ice_t
78+
POSTHOOK: Output: hdfs://### HDFS PATH ###
79+
2 bbb 500 1 2
80+
PREHOOK: query: delete from ice_t where ROW__LINEAGE__ID = 1 OR LAST__UPDATED__SEQUENCE__NUMBER = 2
81+
PREHOOK: type: QUERY
82+
PREHOOK: Input: default@ice_t
83+
PREHOOK: Output: default@ice_t
84+
POSTHOOK: query: delete from ice_t where ROW__LINEAGE__ID = 1 OR LAST__UPDATED__SEQUENCE__NUMBER = 2
85+
POSTHOOK: type: QUERY
86+
POSTHOOK: Input: default@ice_t
87+
POSTHOOK: Output: default@ice_t
88+
PREHOOK: query: select id, name, balance, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t order by id
89+
PREHOOK: type: QUERY
90+
PREHOOK: Input: default@ice_t
91+
PREHOOK: Output: hdfs://### HDFS PATH ###
92+
POSTHOOK: query: select id, name, balance, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER from ice_t order by id
93+
POSTHOOK: type: QUERY
94+
POSTHOOK: Input: default@ice_t
95+
POSTHOOK: Output: hdfs://### HDFS PATH ###
96+
1 aaa 25 0 1
97+
3 ccc 82 2 1
98+
4 ddd 91 3 1
5199
PREHOOK: query: create table ice_t_cow (id int, name string, balance int) stored by iceberg TBLPROPERTIES ('format-version'='3', 'write.update.mode'='copy-on-write')
52100
PREHOOK: type: CREATETABLE
53101
PREHOOK: Output: database:default

0 commit comments

Comments
 (0)