Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import org.apache.iceberg.hive.HiveTableOperations;
import org.apache.iceberg.hive.StagingTableOperations;

/**
* Transaction implementation that stages metadata changes for atomic batch HMS updates across
* multiple tables.
*
* <p>Extends BaseTransaction to leverage Iceberg's retry and conflict resolution logic while
* capturing metadata locations instead of publishing directly to HMS.
*/
public class HiveTransaction extends BaseTransaction {

private final HiveTableOperations hiveOps;
private final StagingTableOperations stagingOps;

public HiveTransaction(Table table, HiveTableOperations ops) {
this(table, ops, ops.toStagingOps());
}

private HiveTransaction(Table table, HiveTableOperations ops, StagingTableOperations stagingOps) {
super(table.name(), stagingOps, TransactionType.SIMPLE, ops.current());
this.hiveOps = ops;
this.stagingOps = stagingOps;
}

public HiveTableOperations ops() {
return hiveOps;
}

public StagingTableOperations stagingOps() {
return stagingOps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,16 @@ protected HiveTableOperations(
conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
}

/**
* Create a staging operations instance that skips HMS updates and locking.
* Used by HiveTransaction to defer HMS updates to coordinator for atomic batch commits.
*/
public StagingTableOperations toStagingOps() {
return new StagingTableOperations(conf, metaClients, fileIO, catalogName, database, tableName);
}

@Override
protected String tableName() {
public String tableName() {
return fullName;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.hive;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.TxnCoordinator;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableParamsUpdate;
import org.apache.iceberg.BaseMetastoreOperations;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.HiveTransaction;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.thrift.TException;

/**
* Transaction coordinator that aggregates Iceberg table commits and performs atomic HMS updates
* across multiple tables using {@code updateTableParams}.
*/
public final class HiveTxnCoordinator implements TxnCoordinator {

private final Configuration conf;
private final IMetaStoreClient msClient;

private final Map<String, HiveTransaction> stagedCommits = Maps.newConcurrentMap();

public HiveTxnCoordinator(Configuration conf, IMetaStoreClient msClient) {
this.conf = conf;
this.msClient = msClient;
}

@Override
public boolean hasPendingWork() {
return !stagedCommits.isEmpty();
}

public Transaction getOrCreateTransaction(org.apache.iceberg.Table table) {
HiveTableOperations ops = icebergTableOperations(table);
if (ops == null) {
throw new IllegalArgumentException(
"Iceberg transaction coordinator only supports HiveTableOperations tables");
}
return stagedCommits.computeIfAbsent(
ops.tableName(),
ignored -> new HiveTransaction(table, ops));
}

private HiveTableOperations icebergTableOperations(org.apache.iceberg.Table table) {
if (table instanceof BaseTable baseTable &&
baseTable.operations() instanceof HiveTableOperations ops) {
return ops;
}
return null;
}

@Override
public synchronized void commit() throws TException {
if (stagedCommits.isEmpty()) {
return;
}

// Sort commits by table name for deterministic ordering
List<Map.Entry<String, HiveTransaction>> updates = Lists.newArrayList(stagedCommits.entrySet());
updates.sort(Map.Entry.comparingByKey());

attemptCommit(updates);
}

@Override
public synchronized void rollback() {
clearState();
}

private void clearState() {
stagedCommits.clear();
}

private TableParamsUpdate buildTableParamsUpdate(
TableMetadata base, TableMetadata newMetadata, HiveTableOperations ops, String newMetadataLocation) {
Set<String> removedProps =
base.properties().keySet().stream()
.filter(k -> !newMetadata.properties().containsKey(k))
.collect(Collectors.toSet());

Table tbl = new Table();
tbl.setParameters(Maps.newHashMap());

long maxPropSize = conf.getLong(
HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE,
HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);

HMSTablePropertyHelper.updateHmsTableForIcebergTable(
newMetadataLocation,
tbl,
newMetadata,
removedProps,
true,
maxPropSize,
base.metadataFileLocation());

TableParamsUpdate newParams = new TableParamsUpdate();
newParams.setDb_name(ops.database());
newParams.setTable_name(ops.table());
newParams.setParams(tbl.getParameters());

newParams.setExpected_param_key(HiveTableOperations.METADATA_LOCATION_PROP);
if (base.metadataFileLocation() != null) {
newParams.setExpected_param_value(base.metadataFileLocation());
}
return newParams;
}

private void attemptCommit(List<Map.Entry<String, HiveTransaction>> updates) throws TException {
List<TableParamsUpdate> payload = Lists.newArrayList();
List<HiveTransaction> txns = Lists.newArrayList();
List<HiveLock> locks = Lists.newArrayList();

try {
for (Map.Entry<String, HiveTransaction> entry : updates) {
HiveTransaction hiveTxn = entry.getValue();

// Stage the transaction and track it for potential reset on retry
hiveTxn.commitTransaction();
txns.add(hiveTxn);

TableMetadata base = hiveTxn.startMetadata();
TableMetadata newMetadata = hiveTxn.currentMetadata();
HiveTableOperations ops = hiveTxn.ops();

String newMetadataFileLocation = hiveTxn.stagingOps().metadataLocation();

// Acquire lock - if this fails, txn is already in list for cleanup
HiveLock lock = ops.lockObject(base);
lock.lock();
locks.add(lock);

// Build and add the HMS update payload
TableParamsUpdate paramsUpdate = buildTableParamsUpdate(base, newMetadata, ops, newMetadataFileLocation);
payload.add(paramsUpdate);
}

// Ensure all locks are active
locks.forEach(HiveLock::ensureActive);

msClient.updateTableParams(payload);

// Success - release locks and clear state
releaseLocks(locks);
clearState();

} catch (Exception e) {
cleanupFailedCommit(txns, locks);
throw e;
}
}

private void cleanupFailedCommit(List<HiveTransaction> txns, List<HiveLock> locks) {
cleanupMetadata(txns);
releaseLocks(locks);
clearState();
}

private void cleanupMetadata(List<HiveTransaction> txns) {
txns.forEach(txn -> {
String metadataLocation = txn.stagingOps().metadataLocation();
if (metadataLocation != null) {
HiveOperationsBase.cleanupMetadata(
txn.ops().io(),
BaseMetastoreOperations.CommitStatus.FAILURE.name(),
metadataLocation);
}
});
}

private void releaseLocks(List<HiveLock> locks) {
locks.forEach(HiveLock::unlock);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.hive;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.io.FileIO;
import org.apache.thrift.TException;

/**
* TableOperations that skips HMS updates and locking during staging.
* Extends HiveTableOperations and overrides:
* 1. lockObject() to return NoLock (coordinator handles locking)
* 2. persistTable() to be no-op (coordinator handles HMS batch update)
* 3. Captures the staged metadata location for cleanup
*/
public class StagingTableOperations extends HiveTableOperations {

private String newMetadataLocation;

public StagingTableOperations(
Configuration conf,
ClientPool<IMetaStoreClient, TException> metaClients,
FileIO fileIO,
String catalogName,
String database,
String table) {
super(conf, metaClients, fileIO, catalogName, database, table);
}

@Override
protected String writeNewMetadataIfRequired(boolean newTable, TableMetadata metadata) {
String location = super.writeNewMetadataIfRequired(newTable, metadata);
this.newMetadataLocation = location;
return location;
}

@Override
HiveLock lockObject(TableMetadata metadata) {
// No lock needed during staging - coordinator will acquire locks before HMS batch update
return new NoLock();
}

@Override
public void persistTable(Table hmsTable, boolean updateHiveTable, String metadataLocation) {
// No-op - skip HMS update, metadata files already written by doCommit()
}

public String metadataLocation() {
return newMetadataLocation;
}
}
Loading