Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.TxnCoordinator;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Table;
Expand Down Expand Up @@ -69,6 +70,7 @@ public class HiveTxnCoordinator implements TxnCoordinator {

public HiveTxnCoordinator(Configuration conf, IMetaStoreClient msClient, boolean isExplicitTransaction) {
this.conf = conf;
HiveConf.setBoolVar(conf, HiveConf.ConfVars.TXN_WRITE_X_LOCK, true);
this.msClient = msClient;
this.isExplicitTransaction = isExplicitTransaction;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,26 @@ commit;
select * from iceberg_txn_t1 order by a;
select * from iceberg_txn_t2 order by a;

-- Test with ext.locking.enabled

set iceberg.engine.hive.lock-enabled=false;
set hive.txn.ext.locking.enabled=true;

from (
select 1 as a union all select 2
) s
insert into iceberg_txn_t1
select a
insert into iceberg_txn_t2
select a + 10;

start transaction;
update iceberg_txn_t1 set a = a + 1;
insert into iceberg_txn_t2 select * from iceberg_txn_t1;
commit;

select * from iceberg_txn_t1 order by a;
select * from iceberg_txn_t2 order by a;

drop table if exists iceberg_txn_t1;
drop table if exists iceberg_txn_t2;
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,90 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
4
5
6
PREHOOK: query: from (
select 1 as a union all select 2
) s
insert into iceberg_txn_t1
select a
insert into iceberg_txn_t2
select a + 10
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@iceberg_txn_t1
PREHOOK: Output: default@iceberg_txn_t2
POSTHOOK: query: from (
select 1 as a union all select 2
) s
insert into iceberg_txn_t1
select a
insert into iceberg_txn_t2
select a + 10
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@iceberg_txn_t1
POSTHOOK: Output: default@iceberg_txn_t2
PREHOOK: query: start transaction
PREHOOK: type: START TRANSACTION
POSTHOOK: query: start transaction
POSTHOOK: type: START TRANSACTION
PREHOOK: query: update iceberg_txn_t1 set a = a + 1
PREHOOK: type: QUERY
PREHOOK: Input: default@iceberg_txn_t1
PREHOOK: Output: default@iceberg_txn_t1
PREHOOK: Output: default@iceberg_txn_t1
POSTHOOK: query: update iceberg_txn_t1 set a = a + 1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@iceberg_txn_t1
POSTHOOK: Output: default@iceberg_txn_t1
POSTHOOK: Output: default@iceberg_txn_t1
PREHOOK: query: insert into iceberg_txn_t2 select * from iceberg_txn_t1
PREHOOK: type: QUERY
PREHOOK: Input: default@iceberg_txn_t1
PREHOOK: Output: default@iceberg_txn_t2
POSTHOOK: query: insert into iceberg_txn_t2 select * from iceberg_txn_t1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@iceberg_txn_t1
POSTHOOK: Output: default@iceberg_txn_t2
PREHOOK: query: commit
PREHOOK: type: COMMIT
POSTHOOK: query: commit
POSTHOOK: type: COMMIT
PREHOOK: query: select * from iceberg_txn_t1 order by a
PREHOOK: type: QUERY
PREHOOK: Input: default@iceberg_txn_t1
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: select * from iceberg_txn_t1 order by a
POSTHOOK: type: QUERY
POSTHOOK: Input: default@iceberg_txn_t1
POSTHOOK: Output: hdfs://### HDFS PATH ###
2
3
4
5
6
7
PREHOOK: query: select * from iceberg_txn_t2 order by a
PREHOOK: type: QUERY
PREHOOK: Input: default@iceberg_txn_t2
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: select * from iceberg_txn_t2 order by a
POSTHOOK: type: QUERY
POSTHOOK: Input: default@iceberg_txn_t2
POSTHOOK: Output: hdfs://### HDFS PATH ###
11
12
12
13
2
3
3
4
4
5
5
6
6
7
PREHOOK: query: drop table if exists iceberg_txn_t1
PREHOOK: type: DROPTABLE
PREHOOK: Input: default@iceberg_txn_t1
Expand Down
6 changes: 4 additions & 2 deletions ql/src/java/org/apache/hadoop/hive/ql/DriverTxnHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -584,10 +584,12 @@ void destroy(String queryIdFromDriver) {

boolean isTxnOpen =
txnManager != null && txnManager.isTxnOpen() &&
(txnManager.isImplicitTransactionOpen(context) || COMMIT_OR_ROLLBACK.contains(hiveOp)) &&
org.apache.commons.lang3.StringUtils.equals(queryIdFromDriver, txnManager.getQueryid());

release(!hiveLocks.isEmpty() || isTxnOpen);
boolean isImplicitTxnOrCommit = isTxnOpen &&
(txnManager.isImplicitTransactionOpen(context) || COMMIT_OR_ROLLBACK.contains(hiveOp));

release((!hiveLocks.isEmpty() && !isTxnOpen) || isImplicitTxnOrCommit);
}

private void release(boolean releaseLocks) {
Expand Down
19 changes: 12 additions & 7 deletions ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ private void resetTxnInfo() {
lockMgr = null;
}
txnId = 0;
isExplicitTransaction = false;
txnCoordinator = null;
stmtId = -1;
numStatements = 0;
Expand Down Expand Up @@ -601,6 +602,9 @@ public void commitTxn() throws LockException {
clearLocksAndHB();
getTxnCoordinator().commit();

if (isExplicitTxnOpen()) {
getDefaultCoordinator().commit();
}
} catch (NoSuchTxnException e) {
LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId));
throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
Expand All @@ -625,6 +629,9 @@ public void rollbackTxn() throws LockException {
clearLocksAndHB();
getTxnCoordinator().rollback();

if (isExplicitTxnOpen()) {
getDefaultCoordinator().rollback();
}
} catch (NoSuchTxnException e) {
LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId));
throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
Expand Down Expand Up @@ -913,13 +920,7 @@ public boolean isImplicitTransactionOpen(Context ctx) {
//some commands like "show databases" don't start implicit transactions
return false;
}
if (!isExplicitTransaction) {
if (ctx == null || !ctx.isExplainSkipExecution()) {
assert numStatements <= 1 : "numStatements=" + numStatements;
}
return true;
}
return false;
return !isExplicitTransaction;
}

@Override
Expand Down Expand Up @@ -987,6 +988,10 @@ public boolean isTxnOpen() {
txnCoordinator != null && txnCoordinator.hasPendingWork();
}

private boolean isExplicitTxnOpen() {
return isExplicitTransaction && txnId > 0;
}

@Override
public long getCurrentTxnId() {
return txnId;
Expand Down
Loading