Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.RowLineageUtils;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
import org.apache.hadoop.hive.ql.parse.AlterTableSnapshotRefSpec;
Expand Down Expand Up @@ -313,8 +314,9 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String
tableDesc.getProperties().put(InputFormatConfig.OPERATION_TYPE_PREFIX + tableDesc.getTableName(), opType);
SessionStateUtil.getResource(conf, SessionStateUtil.MISSING_COLUMNS)
.ifPresent(cols -> map.put(SessionStateUtil.MISSING_COLUMNS, String.join(",", (HashSet<String>) cols)));
SessionStateUtil.getResource(conf, SessionStateUtil.ROW_LINEAGE)
.ifPresent(v -> map.put(SessionStateUtil.ROW_LINEAGE, v.toString()));
if (RowLineageUtils.isRowLineageInsert(conf)) {
map.put(SessionStateUtil.ROW_LINEAGE, Boolean.toString(true));
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.RowLineageUtils;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.TransformSpec;
import org.apache.hadoop.hive.ql.session.SessionState;
Expand Down Expand Up @@ -69,84 +70,140 @@ public boolean run(CompactorContext context) throws IOException, HiveException,

HiveConf conf = new HiveConf(context.getConf());
CompactionInfo ci = context.getCompactionInfo();
String compactionQuery = buildCompactionQuery(context, compactTableName, conf);
org.apache.hadoop.hive.ql.metadata.Table hiveTable =
new org.apache.hadoop.hive.ql.metadata.Table(context.getTable());
boolean rowLineageEnabled = RowLineageUtils.supportsRowLineage(hiveTable);
String compactionQuery = buildCompactionQuery(context, compactTableName, conf, rowLineageEnabled);

SessionState sessionState = setupQueryCompactionSession(conf, ci, tblProperties);

if (rowLineageEnabled) {
RowLineageUtils.enableRowLineage(sessionState);
LOG.debug("Row lineage flag set for compaction of table {}", compactTableName);
}

String compactionTarget = "table " + HiveUtils.unparseIdentifier(compactTableName) +
(ci.partName != null ? ", partition " + HiveUtils.unparseIdentifier(ci.partName) : "");

try {
DriverUtils.runOnDriver(conf, sessionState, compactionQuery);
DriverUtils.runOnDriver(sessionState.getConf(), sessionState, compactionQuery);
LOG.info("Completed compaction for {}", compactionTarget);
return true;
} catch (HiveException e) {
LOG.error("Failed compacting {}", compactionTarget, e);
throw e;
} finally {
RowLineageUtils.disableRowLineage(sessionState);
sessionState.setCompaction(false);
}
}

private String buildCompactionQuery(CompactorContext context, String compactTableName, HiveConf conf)
private String buildCompactionQuery(CompactorContext context, String compactTableName, HiveConf conf,
boolean rowLineageEnabled)
throws HiveException {
CompactionInfo ci = context.getCompactionInfo();
String rowLineageColumns = RowLineageUtils.getRowLineageSelectColumns(rowLineageEnabled);
org.apache.hadoop.hive.ql.metadata.Table table = Hive.get(conf).getTable(context.getTable().getDbName(),
context.getTable().getTableName());
Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
String orderBy = ci.orderByClause == null ? "" : ci.orderByClause;
String fileSizePredicate = null;
String compactionQuery;

if (ci.type == CompactionType.MINOR) {
long fileSizeInBytesThreshold = CompactionEvaluator.getFragmentSizeBytes(table.getParameters());
fileSizePredicate = String.format("%1$s in (select file_path from %2$s.files where file_size_in_bytes < %3$d)",
VirtualColumn.FILE_PATH.getName(), compactTableName, fileSizeInBytesThreshold);
conf.setLong(CompactorContext.COMPACTION_FILE_SIZE_THRESHOLD, fileSizeInBytesThreshold);
// IOW query containing a join with Iceberg .files metadata table fails with exception that Iceberg AVRO format
// doesn't support vectorization, hence disabling it in this case.
conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
String fileSizePredicate = buildMinorFileSizePredicate(ci, compactTableName, conf, table);

String compactionQuery = (ci.partName == null) ?
buildFullTableCompactionQuery(compactTableName, conf, icebergTable,
rowLineageColumns, fileSizePredicate, orderBy) :
buildPartitionCompactionQuery(ci, compactTableName, conf, icebergTable,
rowLineageColumns, fileSizePredicate, orderBy);

LOG.info("Compaction query: {}", compactionQuery);
return compactionQuery;
}

private static String buildMinorFileSizePredicate(
CompactionInfo ci, String compactTableName, HiveConf conf, org.apache.hadoop.hive.ql.metadata.Table table) {
if (ci.type != CompactionType.MINOR) {
return null;
}

if (ci.partName == null) {
if (!icebergTable.spec().isPartitioned()) {
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.FULL_TABLE.name());
compactionQuery = String.format("insert overwrite table %s select * from %<s %2$s %3$s", compactTableName,
fileSizePredicate == null ? "" : "where " + fileSizePredicate, orderBy);
} else if (icebergTable.specs().size() > 1) {
// Compacting partitions of old partition specs on a partitioned table with partition evolution
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name());
// A single filter on a virtual column causes errors during compilation,
// added another filter on file_path as a workaround.
compactionQuery = String.format("insert overwrite table %1$s select * from %1$s " +
"where %2$s != %3$d and %4$s is not null %5$s %6$s",
compactTableName, VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(),
VirtualColumn.FILE_PATH.getName(), fileSizePredicate == null ? "" : "and " + fileSizePredicate, orderBy);
} else {
// Partitioned table without partition evolution with partition spec as null in the compaction request - this
// code branch is not supposed to be reachable
throw new HiveException(ErrorMsg.COMPACTION_NO_PARTITION);
}
} else {
HiveConf.setBoolVar(conf, ConfVars.HIVE_CONVERT_JOIN, false);
conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
long fileSizeInBytesThreshold = CompactionEvaluator.getFragmentSizeBytes(table.getParameters());
conf.setLong(CompactorContext.COMPACTION_FILE_SIZE_THRESHOLD, fileSizeInBytesThreshold);
// IOW query containing a join with Iceberg .files metadata table fails with exception that Iceberg AVRO format
// doesn't support vectorization, hence disabling it in this case.
conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);

return String.format("%1$s in (select file_path from %2$s.files where file_size_in_bytes < %3$d)",
VirtualColumn.FILE_PATH.getName(), compactTableName, fileSizeInBytesThreshold);
}

private String buildFullTableCompactionQuery(
String compactTableName,
HiveConf conf,
Table icebergTable,
String rowLineageColumns,
String fileSizePredicate,
String orderBy) throws HiveException {
String selectColumns = buildSelectColumnList(icebergTable, conf);

if (!icebergTable.spec().isPartitioned()) {
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.FULL_TABLE.name());
return String.format("insert overwrite table %1$s select %2$s%3$s from %1$s %4$s %5$s",
compactTableName, selectColumns, rowLineageColumns,
fileSizePredicate == null ? "" : "where " + fileSizePredicate, orderBy);
}

if (icebergTable.specs().size() > 1) {
// Compacting partitions of old partition specs on a partitioned table with partition evolution
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name());
conf.set(IcebergCompactionService.PARTITION_PATH, new Path(ci.partName).toString());

PartitionSpec spec;
String partitionPredicate;
try {
spec = IcebergTableUtil.getPartitionSpec(icebergTable, ci.partName);
partitionPredicate = buildPartitionPredicate(ci, spec);
} catch (MetaException e) {
throw new HiveException(e);
}
// A single filter on a virtual column causes errors during compilation,
// added another filter on file_path as a workaround.
return String.format("insert overwrite table %1$s select %2$s%3$s from %1$s " +
"where %4$s != %5$d and %6$s is not null %7$s %8$s",
compactTableName, selectColumns, rowLineageColumns,
VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(),
VirtualColumn.FILE_PATH.getName(), fileSizePredicate == null ? "" : "and " + fileSizePredicate, orderBy);
}

// Partitioned table without partition evolution with partition spec as null in the compaction request - this
// code branch is not supposed to be reachable
throw new HiveException(ErrorMsg.COMPACTION_NO_PARTITION);
}

compactionQuery = String.format("INSERT OVERWRITE TABLE %1$s SELECT * FROM %1$s WHERE %2$s IN " +
"(SELECT FILE_PATH FROM %1$s.FILES WHERE %3$s AND SPEC_ID = %4$d) %5$s %6$s",
compactTableName, VirtualColumn.FILE_PATH.getName(), partitionPredicate, spec.specId(),
fileSizePredicate == null ? "" : "AND " + fileSizePredicate, orderBy);
private String buildPartitionCompactionQuery(
CompactionInfo ci,
String compactTableName,
HiveConf conf,
Table icebergTable,
String rowLineageColumns,
String fileSizePredicate,
String orderBy) throws HiveException {
HiveConf.setBoolVar(conf, ConfVars.HIVE_CONVERT_JOIN, false);
conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name());
conf.set(IcebergCompactionService.PARTITION_PATH, new Path(ci.partName).toString());

PartitionSpec spec;
String partitionPredicate;
try {
spec = IcebergTableUtil.getPartitionSpec(icebergTable, ci.partName);
partitionPredicate = buildPartitionPredicate(ci, spec);
} catch (MetaException e) {
throw new HiveException(e);
}
return compactionQuery;

return String.format("INSERT OVERWRITE TABLE %1$s SELECT *%2$s FROM %1$s WHERE %3$s IN " +
"(SELECT FILE_PATH FROM %1$s.FILES WHERE %4$s AND SPEC_ID = %5$d) %6$s %7$s",
compactTableName, rowLineageColumns, VirtualColumn.FILE_PATH.getName(), partitionPredicate, spec.specId(),
fileSizePredicate == null ? "" : "AND " + fileSizePredicate, orderBy);
}

/**
* Builds a comma-separated SELECT list from the Iceberg table schema.
*/
private static String buildSelectColumnList(Table icebergTable, HiveConf conf) {
return icebergTable.schema().columns().stream()
.map(Types.NestedField::name)
.map(col -> HiveUtils.unparseIdentifier(col, conf))
.collect(Collectors.joining(", "));
}

private String buildPartitionPredicate(CompactionInfo ci, PartitionSpec spec) throws MetaException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
-- SORT_QUERY_RESULTS

--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
--! qt:replace:/(MAJOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
--! qt:replace:/(MINOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
--! qt:replace:/(MINOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
-- Mask compaction id as they will be allocated in parallel threads
--! qt:replace:/^(\d+)(\t.*\tmanual\ticeberg\t)/#Masked#$2/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;

create database if not exists ice_comp_all with dbproperties('hive.compactor.worker.pool'='iceberg');
use ice_comp_all;

-- Partitioned table with minor and major compaction
create table part_tbl(
id int,
data string
)
partitioned by (dept_id int)
stored by iceberg stored as parquet
tblproperties (
'format-version'='3',
'hive.compactor.worker.pool'='iceberg',
-- Use target.size only to make Parquet data files (~996 bytes) count as fragments.
-- Default fragment ratio is 8, so fragment_size = target_size / 8.
-- Pick target_size > 996 * 8 (7968) so files are treated as fragment files and minor compaction is eligible.
'compactor.threshold.target.size'='8000',
'compactor.threshold.min.input.files'='2',
'compactor.threshold.delete.file.ratio'='0.0'
);

insert into part_tbl values (1,'p1', 10);
insert into part_tbl values (2,'p2', 10);
insert into part_tbl values (3,'p3', 20);
insert into part_tbl values (4,'p4', 20);

SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
FROM part_tbl
ORDER BY ROW__LINEAGE__ID;

alter table part_tbl compact 'minor' and wait pool 'iceberg';

SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
FROM part_tbl
ORDER BY ROW__LINEAGE__ID;

show compactions;

-- For MAJOR eligibility, avoid treating files as "fragments" by lowering target.size
alter table part_tbl set tblproperties ('compactor.threshold.target.size'='1500');

merge into part_tbl t
using (select 1 as id, 'p1_upd' as data, 10 as dept_id) s
on t.dept_id = s.dept_id and t.id = s.id
when matched then update set data = s.data;

SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
FROM part_tbl
ORDER BY ROW__LINEAGE__ID;

alter table part_tbl compact 'major' and wait pool 'iceberg';

SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
FROM part_tbl
ORDER BY ROW__LINEAGE__ID;

show compactions;

-- Partition evolution
alter table part_tbl set tblproperties ('compactor.threshold.target.size'='8000');

alter table part_tbl set partition spec(dept_id, id);

insert into part_tbl values (5,'p5', 10);
insert into part_tbl values (6,'p6', 20);

SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
FROM part_tbl
ORDER BY ROW__LINEAGE__ID;

alter table part_tbl compact 'minor' and wait pool 'iceberg';

SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
FROM part_tbl
ORDER BY ROW__LINEAGE__ID;

show compactions;

-- Unpartitioned table with minor and major compaction
create table unpart_tbl(
id int,
data string
)
stored by iceberg stored as parquet
tblproperties (
'format-version'='3',
'hive.compactor.worker.pool'='iceberg',
-- Use target.size only to make Parquet data files (~996 bytes) count as fragments (default ratio 8).
'compactor.threshold.target.size'='8000',
'compactor.threshold.min.input.files'='2',
'compactor.threshold.delete.file.ratio'='0.0'
);

insert into unpart_tbl values (1,'a');
insert into unpart_tbl values (2,'b');
insert into unpart_tbl values (3,'c');
insert into unpart_tbl values (4,'d');

SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
FROM unpart_tbl
ORDER BY ROW__LINEAGE__ID;

alter table unpart_tbl compact 'minor' and wait pool 'iceberg';

SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
FROM unpart_tbl
ORDER BY ROW__LINEAGE__ID;

show compactions;

-- For MAJOR eligibility, avoid treating files as "fragments" by lowering target.size, then create deletes via MERGE.
alter table unpart_tbl set tblproperties ('compactor.threshold.target.size'='1500');

merge into unpart_tbl t
using (select 1 as id, 'a_upd' as data) s
on t.id = s.id
when matched then update set data = s.data;

SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
FROM unpart_tbl
ORDER BY ROW__LINEAGE__ID;

alter table unpart_tbl compact 'major' and wait pool 'iceberg';

SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER
FROM unpart_tbl
ORDER BY ROW__LINEAGE__ID;

show compactions;
Loading
Loading