Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -456,18 +456,16 @@ private void addFile(Table t, Partition p, long minTxn, long maxTxn, int numReco
boolean allBucketsPresent, long visibilityId) throws Exception {
String partValue = (p == null) ? null : p.getValues().getFirst();
Path location = new Path(getLocation(t.getTableName(), partValue));
String filename = null;

switch (type) {
case BASE -> filename = AcidUtils.addVisibilitySuffix(AcidUtils.BASE_PREFIX + maxTxn, visibilityId);
String filename = switch (type) {
case BASE -> AcidUtils.addVisibilitySuffix(AcidUtils.BASE_PREFIX + maxTxn, visibilityId);
case LENGTH_FILE, // Fall through to delta
DELTA -> filename = AcidUtils.addVisibilitySuffix(makeDeltaDirName(minTxn, maxTxn), visibilityId);
case LEGACY -> {
// handled below
}
DELTA -> AcidUtils.addVisibilitySuffix(makeDeltaDirName(minTxn, maxTxn), visibilityId);
case LEGACY -> // handled below
null;
case null, default ->
throw new IllegalStateException("Unexpected type: " + type);
}
};

FileSystem fs = FileSystem.get(conf);
for (int bucket = 0; bucket < numBuckets; bucket++) {
Expand Down Expand Up @@ -735,7 +733,13 @@ enum CommitAction {
}

protected long compactInTxn(CompactionRequest rqst) throws Exception {
return compactInTxn(rqst, CommitAction.COMMIT);
long compactorTxnId = compactInTxn(rqst, CommitAction.COMMIT);

// Wait for the cooldown period so the Cleaner can see the last committed txn as the highest committed watermark
// TODO: doesn't belong here, should probably be moved to CompactorTest#startCleaner()
Thread.sleep(MetastoreConf.getTimeVar(
conf, ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
return compactorTxnId;
}

long compactInTxn(CompactionRequest rqst, CommitAction commitAction) throws Exception {
Expand Down Expand Up @@ -769,14 +773,11 @@ long compactInTxn(CompactionRequest rqst, CommitAction commitAction) throws Exce

switch (commitAction) {
case MARK_COMPACTED ->
txnHandler.markCompacted(ci);
txnHandler.markCompacted(ci);

case COMMIT -> {
txnHandler.markCompacted(ci);
txnHandler.commitTxn(new CommitTxnRequest(compactorTxnId));

Thread.sleep(MetastoreConf.getTimeVar(
conf, ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
}
case ABORT ->
txnHandler.abortTxn(new AbortTxnRequest(compactorTxnId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionResponse;
Expand All @@ -32,6 +31,7 @@
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.testutil.TxnStoreHelper;
Expand Down Expand Up @@ -94,8 +94,8 @@ public void testRetryAfterFailedCleanupDelayDisabled() throws Exception {
public void testRetryAfterFailedCleanup(boolean delayEnabled) throws Exception {
HiveConf.setBoolVar(conf, HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED, delayEnabled);
HiveConf.setTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 2, TimeUnit.SECONDS);
MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS, 3);
MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, 100, TimeUnit.MILLISECONDS);
MetastoreConf.setLongVar(conf, ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS, 3);
MetastoreConf.setTimeVar(conf, ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, 100, TimeUnit.MILLISECONDS);
String errorMessage = "No cleanup here!";

//Prevent cleaner from marking the compaction as cleaned
Expand Down Expand Up @@ -287,19 +287,19 @@ public void cleanupAfterMajorTableCompactionWithLongRunningQuery() throws Except
addBaseFile(t, null, 20L, 20);
addDeltaFile(t, null, 21L, 22L, 2);
addDeltaFile(t, null, 23L, 24L, 2);
addBaseFile(t, null, 25L, 25, 26);

burnThroughTransactions("default", "camtc", 25);

CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR);
long compactTxn = compactInTxn(rqst, CommitAction.MARK_COMPACTED);
addBaseFile(t, null, 25L, 25, 26);

// Open a query during compaction
long longQuery = openTxn();
TxnStoreHelper.wrap(txnHandler)
.registerMinOpenWriteId("default", "camtc", longQuery);

txnHandler.commitTxn(new CommitTxnRequest(compactTxn));

startCleaner();

// The long-running query should prevent the cleanup
Expand All @@ -313,7 +313,8 @@ public void cleanupAfterMajorTableCompactionWithLongRunningQuery() throws Except

// After the commit cleaning can proceed
txnHandler.commitTxn(new CommitTxnRequest(longQuery));
Thread.sleep(MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
Thread.sleep(MetastoreConf.getTimeVar(conf, ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));

startCleaner();

rsp = txnHandler.showCompact(new ShowCompactRequest());
Expand Down Expand Up @@ -725,20 +726,22 @@ public void testReadyForCleaningPileup() throws Exception {
Table t = newTable(dbName, tblName, true);
Partition p = newPartition(t, "today");

// block cleaner with an open txn
long blockingTxn = openTxn();

// minor compaction
addBaseFile(t, p, 20L, 20);
addDeltaFile(t, p, 21L, 21L, 1);
addDeltaFile(t, p, 22L, 22L, 1);
burnThroughTransactions(dbName, tblName, 22);

// block cleaner with an open txn
long blockingTxn = openTxn();
TxnStoreHelper.wrap(txnHandler)
.registerMinOpenWriteId(dbName, tblName, blockingTxn);

CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MINOR);
rqst.setPartitionname(partName);
long compactTxn = compactInTxn(rqst);
addDeltaFile(t, p, 21, 22, 2);

txnHandler.addWriteIdsToMinHistory(1, Collections.singletonMap("default.trfcp", 23L));
startCleaner();

// make sure cleaner didn't remove anything, and cleaning is still queued
Expand Down Expand Up @@ -1081,14 +1084,15 @@ public void testReady() throws Exception {
burnThroughTransactions(dbName, tblName, 22);

// block cleaner with an open txn
long txnId = openTxn();
long blockingTxn = openTxn();
TxnStoreHelper.wrap(txnHandler)
.registerMinOpenWriteId(dbName, tblName, txnId);
.registerMinOpenWriteId(dbName, tblName, blockingTxn);

CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MINOR);
rqst.setPartitionname(partName);
long ctxnid = compactInTxn(rqst);
addDeltaFile(t, p, 20, 22, 2, ctxnid);
long compactTxn = compactInTxn(rqst);
addDeltaFile(t, p, 20, 22, 2, compactTxn);

startCleaner();

// make sure cleaner didn't remove anything, and cleaning is still queued
Expand Down Expand Up @@ -1118,11 +1122,8 @@ public void testCompactionHighWatermarkIsHonored() throws Exception {

CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MINOR);
rqst.setPartitionname(partName);
long ctxnid = compactInTxn(rqst);
addDeltaFile(t, p, 20, 22, 3, ctxnid);

// block cleaner with an open txn
long openTxnId = openTxn();
long compactTxn = compactInTxn(rqst);
addDeltaFile(t, p, 20, 22, 3, compactTxn);

//2nd minor
addDeltaFile(t, p, 23L, 23L, 1);
Expand All @@ -1131,11 +1132,10 @@ public void testCompactionHighWatermarkIsHonored() throws Exception {

rqst = new CompactionRequest(dbName, tblName, CompactionType.MINOR);
rqst.setPartitionname(partName);
ctxnid = compactInTxn(rqst);
addDeltaFile(t, p, 20, 24, 5, ctxnid);
compactTxn = compactInTxn(rqst);
addDeltaFile(t, p, 20, 24, 5, compactTxn);

startCleaner();
txnHandler.abortTxn(new AbortTxnRequest(openTxnId));

ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
assertEquals(2, rsp.getCompactsSize());
Expand All @@ -1149,7 +1149,7 @@ public void testCompactionHighWatermarkIsHonored() throws Exception {
List<String> expectedDirs = Arrays.asList(
"base_19",
addVisibilitySuffix(makeDeltaDirName(20, 22), 23),
addVisibilitySuffix(makeDeltaDirName(20, 24), 27),
addVisibilitySuffix(makeDeltaDirName(20, 24), 26),
makeDeltaDirName(23, 23),
makeDeltaDirName(24, 24)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
Expand Down Expand Up @@ -100,7 +101,7 @@ public void cleanupAfterKilledAndRetriedMajorCompaction() throws Exception {

startCleaner();

// Check there are no compactions requests left.
// Validate that the cleanup attempt has failed.
rsp = txnHandler.showCompact(new ShowCompactRequest());
assertEquals(1, rsp.getCompactsSize());
assertEquals(FAILED_RESPONSE, rsp.getCompacts().getFirst().getState());
Expand All @@ -119,6 +120,31 @@ private static void revokeTimedoutWorkers(Configuration conf) throws Exception {
""".formatted(INITIATED_STATE, WORKING_STATE));
}

@Test
public void cleanupAfterMajorCompactionWithQueryWaitingToLockTheSnapshot() throws Exception {
Table t = prepareTestTable();
CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR);
long compactTxn = compactInTxn(rqst, CommitAction.MARK_COMPACTED);
addBaseFile(t, null, 25L, 25, compactTxn);

// Open a query during compaction,
// Do not register minOpenWriteId (i.e. simulate delay locking the snapshot)
openTxn();

txnHandler.commitTxn(new CommitTxnRequest(compactTxn));
startCleaner();

// Validate that the cleanup attempt has failed.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
assertEquals(1, rsp.getCompactsSize());
assertEquals(FAILED_RESPONSE, rsp.getCompacts().getFirst().getState());
assertEquals("txnid:27 is open and <= hwm: 27", rsp.getCompacts().getFirst().getErrorMessage());

// Check that the files are not removed
List<Path> paths = getDirectories(conf, t, null);
assertEquals(5, paths.size());
}

private Table prepareTestTable() throws Exception {
Table t = newTable("default", "camtc", false);

Expand Down