Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
* read as part of map-reduce framework
**/
public class TableScanOperator extends Operator<TableScanDesc> implements
Serializable, VectorizationContextRegion {
Serializable, VectorizationContextRegion, IConfigureJobConf {
private static final long serialVersionUID = 1L;

private VectorizationContext taskVectorizationContext;
Expand Down Expand Up @@ -87,6 +87,15 @@ public class TableScanOperator extends Operator<TableScanDesc> implements

private ProbeDecodeContext probeDecodeContextSet;

@Override
public void configureJobConf(JobConf job) {
Table table = getConf().getTableMetadata();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can getConf() be null?, maybe we can do, if there is such a chance

  public void configureJobConf(JobConf job) {
    if (getConf() != null && getConf().getTableMetadata() != null) {
      Utilities.setTableCreateTime(job, getConf().getTableMetadata());
    }

// Safety check: this may be null in certain scenarios, particularly in test cases.
if (table != null) {
Utilities.setTableCreateTime(job, table);
}
}

/**
* Inner wrapper class for TS ProbeDecode optimization
*/
Expand Down
45 changes: 45 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
Expand Down Expand Up @@ -258,6 +259,7 @@ public final class Utilities {
public static final String MAPNAME = "Map ";
public static final String REDUCENAME = "Reducer ";
public static final String ENSURE_OPERATORS_EXECUTED = "ENSURE_OPERATORS_EXECUTED";
public static final String CREATE_TIME = "create_time";
public static final String SNAPSHOT_REF = "snapshot_ref";

@Deprecated
Expand Down Expand Up @@ -5094,4 +5096,47 @@ public static String getTableOrMVSuffix(Context context, boolean createTableOrMV
}
return suffix;
}

/**
* Stores the creation time of the given table in the provided configuration.
* <p>
* The value is written under a composite key of the form:
* {@code &lt;dbName&gt;.&lt;tableName&gt;.&lt;CREATE_TIME&gt;}.
* </p>
*
* @param conf
* configuration to store the table creation time; must not be {@code null}
* @param table
* table whose database and name are used to construct the configuration key;
* must not be {@code null}
* @param tableCreateTime
* table creation time to store, represented as a string
*/
public static void setTableCreateTime(Configuration conf, Table table) {
Comment on lines +5112 to +5115
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this param doesn't exist

Objects.requireNonNull(table, "Cannot get table create time. Table object is expected to be non-null.");
String tableCreateTime = String.valueOf(table.getCreateTime());
String fullTableName = String.format("%s.%s", table.getDbName(), table.getTableName());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use:

table.getFullyQualifiedName();

or

TableName.getDbTable(table.getDbName(), table.getTableName());

conf.set(String.format("%s.%s", fullTableName, CREATE_TIME), tableCreateTime);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do setInt, rather than converting it into String and then setting

    conf.setInt(String.format("%s.%s", fullTableName, CREATE_TIME), table.getCreateTime());

}

/**
* Retrieves the table creation time from the configuration.
* <p>
* The value is expected to be stored under the key
* {@code &lt;tableName&gt;.&lt;CREATE_TIME&gt;}. If the value is not present,
* this method returns {@code 0}.
* </p>
*
* @param conf
* configuration containing the table creation time; must not be {@code null}
* @param tableName
* fully qualified table name ({@code dbName.tableName})
* used to construct the configuration key
* @return
* the table creation time, or {@code 0} if not set
*/
public static int getTableCreateTime(Configuration conf, String tableName) {
String createTime = conf.get(String.format("%s.%s", tableName, CREATE_TIME));
return createTime == null ? 0 : Integer.parseInt(createTime);
Comment on lines +5138 to +5140
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use

conf.getInt(...)

}
}
26 changes: 21 additions & 5 deletions ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3317,13 +3317,23 @@ public static AcidDirectory getAcidStateFromCache(Supplier<FileSystem> fileSyste
//dbName + tableName + dir
String key = writeIdList.getTableName() + "_" + candidateDirectory.toString();
DirInfoValue value = dirCache.getIfPresent(key);
int tableCreateTimeInCache = value == null ? -1 : value.getTableCreateTime();
int tableCreateTime = Utilities.getTableCreateTime(conf, writeIdList.getTableName());

// in case of open/aborted txns, recompute dirInfo
long[] exceptions = writeIdList.getInvalidWriteIds();
boolean recompute = (exceptions != null && exceptions.length > 0);

// Check whether the table was re-created after being stored in the cache.
// The value null check avoids a noisy log message during the initial lookup, when no cache entry exists.
if (value != null && tableCreateTimeInCache < tableCreateTime) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value is in int. Doubting about the Interger Overflow case, Should we just do

tableCreateTimeInCache != tableCreateTime

LOG.info("Table {} was recreated (at: {}) since it was stored in acid cache (at: {}), invalidating entry",
writeIdList.getTableName(), tableCreateTime, tableCreateTimeInCache);
recompute = true;
}

if (recompute) {
LOG.info("invalidating cache entry for key: {}", key);
LOG.info("Invalidating cache entry for key: {}", key);
dirCache.invalidate(key);
value = null;
}
Expand All @@ -3343,7 +3353,7 @@ public static AcidDirectory getAcidStateFromCache(Supplier<FileSystem> fileSyste
if (recompute || (value == null)) {
AcidDirectory dirInfo = getAcidState(fileSystem.get(), candidateDirectory, conf,
writeIdList, useFileIds, ignoreEmptyFiles);
value = new DirInfoValue(writeIdList.writeToString(), dirInfo);
value = new DirInfoValue(writeIdList.writeToString(), dirInfo, tableCreateTime);

if (value.dirInfo != null && value.dirInfo.getBaseDirectory() != null
&& value.dirInfo.getCurrentDirectories().isEmpty()) {
Expand Down Expand Up @@ -3405,12 +3415,14 @@ public static boolean acidTableWithoutTransactions(Table table) {
}

static class DirInfoValue {
private String txnString;
private AcidDirectory dirInfo;
private final String txnString;
private final AcidDirectory dirInfo;
private final int tableCreateTime;

DirInfoValue(String txnString, AcidDirectory dirInfo) {
DirInfoValue(String txnString, AcidDirectory dirInfo, int tableCreateTime) {
this.txnString = txnString;
this.dirInfo = dirInfo;
this.tableCreateTime = tableCreateTime;
}

String getTxnString() {
Expand All @@ -3420,6 +3432,10 @@ String getTxnString() {
AcidDirectory getDirInfo() {
return dirInfo;
}

int getTableCreateTime() {
return tableCreateTime;
}
}

public static String getPartitionName(Map<String, String> partitionSpec) throws SemanticException {
Expand Down
36 changes: 36 additions & 0 deletions ql/src/test/org/apache/hadoop/hive/ql/plan/TestMapWork.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.mapred.JobConf;

import org.junit.Test;

Expand Down Expand Up @@ -69,4 +74,35 @@ public void testDeriveLlapSetsCacheAffinityForTextInputFormat() {
mapWork.getCacheAffinity());
}

@Test
public void testConfigureJobConfPropagatesTableCreateTime() {
// Given a table with a realistic create time
String dbName = "test_db";
String tableName = "test_table";
int createTime = 1770653453;

Table table = new Table(dbName, tableName);
table.setCreateTime(createTime);

// And a TableScanOperator configured for that table
TableScanDesc tsDesc = new TableScanDesc(table);
CompilationOpContext cCtx = new CompilationOpContext();
TableScanOperator tsOp = new TableScanOperator(cCtx);
tsOp.setConf(tsDesc);

// And a MapWork that uses this TableScanOperator as a root
MapWork mapWork = new MapWork();
mapWork.getAliasToWork().put("t", tsOp);

JobConf jobConf = new JobConf();

// When configuring the job from the MapWork
mapWork.configureJobConf(jobConf);

// Then the table's create time should be present in the JobConf
String fullTableName = dbName + "." + tableName;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String fullTableName = TableName.getDbTable(dbName, tableName);

assertEquals(
createTime,
Utilities.getTableCreateTime(jobConf, fullTableName));
}
}