Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, SQLGen
return;
}
CommitTxnMessage msg =
MessageBuilder.getInstance().buildCommitTxnMessage(commitTxnEvent.getTxnId(), commitTxnEvent.getDatabases(), commitTxnEvent.getWriteId());
MessageBuilder.getInstance().buildCommitTxnMessage(commitTxnEvent.getTxnId(), commitTxnEvent.getCatalogs(), commitTxnEvent.getDatabases(), commitTxnEvent.getWriteId());

NotificationEvent event =
new NotificationEvent(0, now(), EventType.COMMIT_TXN.toString(),
Expand Down Expand Up @@ -840,11 +840,12 @@ public void onAddCheckConstraint(AddCheckConstraintEvent addCheckConstraintEvent
*/
@Override
public void onDropConstraint(DropConstraintEvent dropConstraintEvent) throws MetaException {
String catName = dropConstraintEvent.getCatName();
String dbName = dropConstraintEvent.getDbName();
String tableName = dropConstraintEvent.getTableName();
String constraintName = dropConstraintEvent.getConstraintName();
DropConstraintMessage msg = MessageBuilder.getInstance()
.buildDropConstraintMessage(dbName, tableName, constraintName);
.buildDropConstraintMessage(catName, dbName, tableName, constraintName);
NotificationEvent event =
new NotificationEvent(0, now(), EventType.DROP_CONSTRAINT.toString(),
msgEncoder.getSerializer().serialize(msg));
Expand All @@ -863,8 +864,9 @@ public void onAllocWriteId(AllocWriteIdEvent allocWriteIdEvent, Connection dbCon
throws MetaException {
String tableName = allocWriteIdEvent.getTableName();
String dbName = allocWriteIdEvent.getDbName();
String catName = allocWriteIdEvent.getCatName();
AllocWriteIdMessage msg = MessageBuilder.getInstance()
.buildAllocWriteIdMessage(allocWriteIdEvent.getTxnToWriteIdList(), dbName, tableName);
.buildAllocWriteIdMessage(allocWriteIdEvent.getTxnToWriteIdList(), catName, dbName, tableName);
NotificationEvent event =
new NotificationEvent(0, now(), EventType.ALLOC_WRITE_ID.toString(),
msgEncoder.getSerializer().serialize(msg)
Expand Down Expand Up @@ -914,6 +916,7 @@ public void onBatchAcidWrite(BatchAcidWriteEvent batchAcidWriteEvent, Connection
NotificationEvent event = new NotificationEvent(0, now(), EventType.ACID_WRITE.toString(),
msgEncoder.getSerializer().serialize(msg));
event.setMessageFormat(msgEncoder.getMessageFormat());
event.setCatName(batchAcidWriteEvent.getCatalog(i));
event.setDbName(batchAcidWriteEvent.getDatabase(i));
event.setTableName(batchAcidWriteEvent.getTable(i));
eventBatch.add(event);
Expand Down Expand Up @@ -946,8 +949,8 @@ public void onUpdateTableColumnStat(UpdateTableColumnStatEvent updateTableColumn
@Override
public void onDeleteTableColumnStat(DeleteTableColumnStatEvent deleteTableColumnStatEvent) throws MetaException {
DeleteTableColumnStatMessage msg = MessageBuilder.getInstance()
.buildDeleteTableColumnStatMessage(deleteTableColumnStatEvent.getDBName(),
deleteTableColumnStatEvent.getColName());
.buildDeleteTableColumnStatMessage(deleteTableColumnStatEvent.getCatName(),
deleteTableColumnStatEvent.getDBName(), deleteTableColumnStatEvent.getColName());
NotificationEvent event = new NotificationEvent(0, now(), EventType.DELETE_TABLE_COLUMN_STAT.toString(),
msgEncoder.getSerializer().serialize(msg));
event.setCatName(deleteTableColumnStatEvent.getCatName());
Expand Down Expand Up @@ -1008,9 +1011,9 @@ public void onUpdatePartitionColumnStatInBatch(UpdatePartitionColumnStatEventBat
@Override
public void onDeletePartitionColumnStat(DeletePartitionColumnStatEvent deletePartColStatEvent) throws MetaException {
DeletePartitionColumnStatMessage msg = MessageBuilder.getInstance()
.buildDeletePartitionColumnStatMessage(deletePartColStatEvent.getDBName(),
deletePartColStatEvent.getColName(), deletePartColStatEvent.getPartName(),
deletePartColStatEvent.getPartVals());
.buildDeletePartitionColumnStatMessage(deletePartColStatEvent.getCatName(),
deletePartColStatEvent.getDBName(), deletePartColStatEvent.getColName(),
deletePartColStatEvent.getPartName(), deletePartColStatEvent.getPartVals());
NotificationEvent event = new NotificationEvent(0, now(), EventType.DELETE_PARTITION_COLUMN_STAT.toString(),
msgEncoder.getSerializer().serialize(msg));
event.setCatName(deletePartColStatEvent.getCatName());
Expand Down Expand Up @@ -1205,25 +1208,28 @@ private void addWriteNotificationLog(List<NotificationEvent> eventBatch, List<Ac
ResultSet rs = null;
String select = sqlGenerator.addForUpdateClause("select \"WNL_ID\", \"WNL_FILES\" from" +
" \"TXN_WRITE_NOTIFICATION_LOG\" " +
"where \"WNL_DATABASE\" = ? " +
"where \"WNL_CATALOG\" = ? " +
"and \"WNL_DATABASE\" = ? " +
"and \"WNL_TABLE\" = ? " + " and (\"WNL_PARTITION\" = ? OR (? IS NULL AND \"WNL_PARTITION\" IS NULL)) " +
"and \"WNL_TXNID\" = ? ");
List<Integer> insertList = new ArrayList<>();
Map<Integer, Pair<Long, String>> updateMap = new HashMap<>();
try (PreparedStatement pst = dbConn.prepareStatement(select)) {
for (int i = 0; i < acidWriteEventList.size(); i++) {
String catName = acidWriteEventList.get(i).getCatalog();
String dbName = acidWriteEventList.get(i).getDatabase();
String tblName = acidWriteEventList.get(i).getTable();
String partition = acidWriteEventList.get(i).getPartition();
Long txnId = acidWriteEventList.get(i).getTxnId();

LOG.debug("Going to execute query <" + select.replaceAll("\\?", "{}") + ">",
quoteString(dbName), quoteString(tblName), quoteString(partition));
pst.setString(1, dbName);
pst.setString(2, tblName);
pst.setString(3, partition);
quoteString(catName), quoteString(dbName), quoteString(tblName), quoteString(partition));
pst.setString(1, catName);
pst.setString(2, dbName);
pst.setString(3, tblName);
pst.setString(4, partition);
pst.setLong(5, txnId);
pst.setString(5, partition);
pst.setLong(6, txnId);
rs = pst.executeQuery();
if (!rs.next()) {
insertList.add(i);
Expand All @@ -1244,15 +1250,16 @@ private void addWriteNotificationLog(List<NotificationEvent> eventBatch, List<Ac
"org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog", insertList.size());

String insert = "insert into \"TXN_WRITE_NOTIFICATION_LOG\" " +
"(\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\", \"WNL_DATABASE\", \"WNL_TABLE\", " +
"(\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\", \"WNL_CATALOG\", \"WNL_DATABASE\", \"WNL_TABLE\", " +
"\"WNL_PARTITION\", \"WNL_TABLE_OBJ\", \"WNL_PARTITION_OBJ\", " +
"\"WNL_FILES\", \"WNL_EVENT_TIME\") VALUES (?,?,?,?,?,?,?,?,?,?)";
"\"WNL_FILES\", \"WNL_EVENT_TIME\") VALUES (?,?,?,?,?,?,?,?,?,?,?)";
try (PreparedStatement pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(insert))) {
numRows = 0;
for (int idx : insertList) {
String tableObj = msgBatch.get(idx).getTableObjStr();
String partitionObj = msgBatch.get(idx).getPartitionObjStr();
String files = ReplChangeManager.joinWithSeparator(msgBatch.get(idx).getFiles());
String catName = acidWriteEventList.get(idx).getCatalog();
String dbName = acidWriteEventList.get(idx).getDatabase();
String tblName = acidWriteEventList.get(idx).getTable();
String partition = acidWriteEventList.get(idx).getPartition();
Expand All @@ -1261,16 +1268,17 @@ private void addWriteNotificationLog(List<NotificationEvent> eventBatch, List<Ac
pst.setLong(1, nextNLId++);
pst.setLong(2, acidWriteEventList.get(idx).getTxnId());
pst.setLong(3, acidWriteEventList.get(idx).getWriteId());
pst.setString(4, dbName);
pst.setString(5, tblName);
pst.setString(6, partition);
pst.setString(7, tableObj);
pst.setString(8, partitionObj);
pst.setString(9, files);
pst.setInt(10, currentTime);
pst.setString(4, catName);
pst.setString(5, dbName);
pst.setString(6, tblName);
pst.setString(7, partition);
pst.setString(8, tableObj);
pst.setString(9, partitionObj);
pst.setString(10, files);
pst.setInt(11, currentTime);
LOG.debug("Going to execute insert <" + insert.replaceAll("\\?", "{}") + ">", nextNLId
, acidWriteEventList.get(idx).getTxnId(), acidWriteEventList.get(idx).getWriteId()
, quoteString(dbName), quoteString(tblName),
, quoteString(catName), quoteString(dbName), quoteString(tblName),
quoteString(partition), quoteString(tableObj), quoteString(partitionObj), quoteString(files), currentTime);
pst.addBatch();
numRows++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private void addCompactionTargetIfEligible(Table table, org.apache.iceberg.Table
Set<CompactionInfo> compactions, ShowCompactResponse currentCompactions, Set<String> skipDBs,
Set<String> skipTables) {

CompactionInfo ci = new CompactionInfo(table.getDbName(), table.getTableName(), partitionName,
CompactionInfo ci = new CompactionInfo(table.getCatName(), table.getDbName(), table.getTableName(), partitionName,
CompactionType.SMART_OPTIMIZE);

// Common Hive compaction eligibility checks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,13 +814,14 @@ private List<Long> allocateTxns(int numTxns) throws Throwable {

private List<TxnToWriteId> allocateWriteIds(List<Long> txnIds, String dbName, String tblName) throws Throwable {
AllocateTableWriteIdsRequest allocateTableWriteIdsRequest = new AllocateTableWriteIdsRequest(dbName, tblName);
allocateTableWriteIdsRequest.setCatName(Warehouse.DEFAULT_CATALOG_NAME);
allocateTableWriteIdsRequest.setTxnIds(txnIds);
return hmsHandler.allocate_table_write_ids(allocateTableWriteIdsRequest).getTxnToWriteIds();
}

private String getValidWriteIds(String dbName, String tblName) throws Throwable {
GetValidWriteIdsRequest validWriteIdsRequest = new GetValidWriteIdsRequest(
Collections.singletonList(TableName.getDbTable(dbName, tblName)));
Collections.singletonList(TableName.getQualified(Warehouse.DEFAULT_CATALOG_NAME, dbName, tblName)));
GetValidWriteIdsResponse validWriteIdsResponse = hmsHandler.get_valid_write_ids(validWriteIdsRequest);
return TxnCommonUtils.createValidReaderWriteIdList(validWriteIdsResponse.
getTblValidWriteIds().get(0)).writeToString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ List<Long> allocateWriteIdsForTablesAndAcquireLocks(String primaryCatName, Strin
List<Long> txns, HiveConf primaryConf) throws Throwable {
AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest();
rqst.setDbName(primaryDbName);
rqst.setCatName(primaryCatName);
List<Long> lockIds = new ArrayList<>();
for(Map.Entry<String, Long> entry : tables.entrySet()) {
rqst.setTableName(entry.getKey());
Expand All @@ -373,17 +374,17 @@ List<Long> allocateWriteIdsForTablesAndAcquireLocks(String primaryCatName, Strin
lockIds.add(txnHandler.lock(lockRequest).getLockid());
}
}
verifyWriteIdsForTables(tables, primaryConf, primaryDbName);
verifyWriteIdsForTables(tables, primaryConf, PRIMARY_CAT_NAME, primaryDbName);
return lockIds;
}

void verifyWriteIdsForTables(Map<String, Long> tables, HiveConf conf, String dbName)
void verifyWriteIdsForTables(Map<String, Long> tables, HiveConf conf, String catName, String dbName)
throws Throwable {
for(Map.Entry<String, Long> entry : tables.entrySet()) {
Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from TXN_TO_WRITE_ID"),
entry.getValue().longValue(),
TestTxnDbUtil.countQueryAgent(conf,
"select count(*) from TXN_TO_WRITE_ID where t2w_database = '"
"select count(*) from TXN_TO_WRITE_ID where t2w_catalog = '" + catName.toLowerCase() + "' and t2w_database = '"
+ dbName.toLowerCase()
+ "' and t2w_table = '" + entry.getKey() + "'"));
}
Expand Down Expand Up @@ -411,25 +412,26 @@ void verifyAllOpenTxnsNotAborted(List<Long> txns, HiveConf primaryConf) throws T
"select count(*) from TXNS where txn_state = 'a' and " + txnIdRange));
}

void verifyNextId(Map<String, Long> tables, String dbName, HiveConf conf) throws Throwable {
void verifyNextId(Map<String, Long> tables, String catName, String dbName, HiveConf conf) throws Throwable {
// Verify the next write id
for(Map.Entry<String, Long> entry : tables.entrySet()) {
String[] nextWriteId =
TestTxnDbUtil.queryToString(conf,
"select nwi_next from NEXT_WRITE_ID where nwi_database = '"
"select nwi_next from NEXT_WRITE_ID where nwi_catalog = '"
+ catName.toLowerCase() + "' and nwi_database = '"
+ dbName.toLowerCase() + "' and nwi_table = '"
+ entry.getKey() + "'").split("\n");
Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), entry.getValue() + 1);
}
}

void verifyCompactionQueue(Map<String, Long> tables, String dbName, HiveConf conf)
void verifyCompactionQueue(Map<String, Long> tables, String catName, String dbName, HiveConf conf)
throws Throwable {
for(Map.Entry<String, Long> entry : tables.entrySet()) {
Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"),
entry.getValue().longValue(),
TestTxnDbUtil.countQueryAgent(conf,
"select count(*) from COMPACTION_QUEUE where cq_database = '" + dbName
"select count(*) from COMPACTION_QUEUE where cq_catalog = '" + catName + "' and cq_database = '" + dbName
+ "' and cq_table = '" + entry.getKey() + "'"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ public boolean validate(Task task) {
return validator.hasTask(rootTask);
}

private Task getReplLoadRootTask(String sourceDb, String replicadb, boolean isIncrementalDump,
private Task getReplLoadRootTask(String sourceCat, String sourceDb, String replicadb, boolean isIncrementalDump,
Tuple tuple) throws Throwable {
HiveConf confTemp = driverMirror.getConf();
Path loadPath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
Expand All @@ -544,7 +544,7 @@ private Task getReplLoadRootTask(String sourceDb, String replicadb, boolean isIn
run only database creation task, and only in next iteration of Repl Load Task execution, remaining tasks will be
executed. Hence disabling this to perform the test on task optimization. */
confTemp.setBoolVar(HiveConf.ConfVars.REPL_RETAIN_CUSTOM_LOCATIONS_FOR_DB_ON_TARGET, false);
ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), sourceDb, replicadb,
ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), sourceCat, sourceDb, replicadb,
null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId),
0L, metricCollector, false);
Task replLoadTask = TaskFactory.get(replLoadWork, confTemp);
Expand All @@ -565,7 +565,7 @@ public void testTaskCreationOptimization() throws Throwable {
Tuple dump = replDumpDb(dbName);

//bootstrap load should not have move task
Task task = getReplLoadRootTask(dbName, dbNameReplica, false, dump);
Task task = getReplLoadRootTask(Warehouse.DEFAULT_CATALOG_NAME, dbName, dbNameReplica, false, dump);
assertEquals(false, hasMoveTask(task));
assertEquals(true, hasPartitionTask(task));

Expand All @@ -579,7 +579,7 @@ public void testTaskCreationOptimization() throws Throwable {

// Partition level statistics gets updated as part of the INSERT above. So we see a partition
// task corresponding to an ALTER_PARTITION event.
task = getReplLoadRootTask(dbName, dbNameReplica, true, dump);
task = getReplLoadRootTask(Warehouse.DEFAULT_CATALOG_NAME, dbName, dbNameReplica, true, dump);
assertEquals(true, hasMoveTask(task));
assertEquals(true, hasPartitionTask(task));

Expand All @@ -592,7 +592,7 @@ public void testTaskCreationOptimization() throws Throwable {
dump = replDumpDb(dbName);

//no move task should be added as the operation is adding a dynamic partition
task = getReplLoadRootTask(dbName, dbNameReplica, true, dump);
task = getReplLoadRootTask(Warehouse.DEFAULT_CATALOG_NAME, dbName, dbNameReplica, true, dump);
assertEquals(false, hasMoveTask(task));
assertEquals(true, hasPartitionTask(task));
}
Expand Down
Loading
Loading