Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static ByteBuffer getSerializedOrcTail(Path path, SyntheticFileId fileId,
// Note: Since Hive doesn't know about partition information of Iceberg tables, partitionDesc is only used to
// deduct the table (and DB) name here.
CacheTag cacheTag = HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE) ?
LlapHiveUtils.getDbAndTableNameForMetrics(path, true, partitionDesc) : null;
LlapHiveUtils.getCacheTag(path, true, partitionDesc) : null;

try {
// Schema has to be serialized and deserialized as it is passed between different packages of TypeDescription:
Expand Down
3 changes: 2 additions & 1 deletion llap-common/src/protobuf/LlapDaemonProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,11 @@ message SetCapacityRequestProto {
message SetCapacityResponseProto {
}

// Used for proactive eviction request. Must contain one DB name, and optionally table information.
// Used for proactive eviction request. Must contain a DB name, and optionally table information and catalog name.
message EvictEntityRequestProto {
required string db_name = 1;
repeated TableProto table = 2;
optional string catalog_name = 3 [default = "hive"];
}

// Used in EvictEntityRequestProto, can be used for non-partitioned and partitioned tables too.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import org.apache.hadoop.hive.llap.cache.PathCache;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.io.encoded.LlapOrcCacheLoader;
import org.apache.hadoop.hive.metastore.Warehouse;

Check warning on line 34 in llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapCacheMetadataSerializer.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Unused import - org.apache.hadoop.hive.metastore.Warehouse.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ1HoyZUoxp4O0LdzLms&open=AZ1HoyZUoxp4O0LdzLms&pullRequest=6379

Check warning on line 34 in llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapCacheMetadataSerializer.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused import 'org.apache.hadoop.hive.metastore.Warehouse'.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ1HoyZUoxp4O0LdzLmr&open=AZ1HoyZUoxp4O0LdzLmr&pullRequest=6379
import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hive.common.util.FixedSizedObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -149,7 +151,7 @@
}

private void loadData(LlapDaemonProtocolProtos.CacheEntry ce) throws IOException {
CacheTag cacheTag = decodeCacheTag(ce.getCacheTag());
CacheTag cacheTag = decodeCacheTag(ce.getCacheTag(), conf);
DiskRangeList ranges = decodeRanges(ce.getRangesList());
Object fileKey = decodeFileKey(ce.getFileKey());
try (LlapOrcCacheLoader llr = new LlapOrcCacheLoader(new Path(ce.getFilePath()), fileKey, conf, cache,
Expand All @@ -167,9 +169,16 @@
return helper.get();
}

private static CacheTag decodeCacheTag(LlapDaemonProtocolProtos.CacheTag ct) {
return ct.getPartitionDescCount() == 0 ? CacheTag.build(ct.getTableName()) : CacheTag
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about handling old cached data? Will there be any compatibility issues here?
I'm not very familiar with LLAP caching—perhaps a cache decoding failure won't cause user sql jobs to fail? Maybe we just need to restart LLAP to update the cache?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it would be an issue. I think we should document that post upgrade LLAP daemons would need a restart to flush the cache. This however does not have any functional impact. Worst case if the user does not restarts, they would face cache miss but no SQL job failures as far as I think.
@deniskuzZ any idea on this ?

.build(ct.getTableName(), ct.getPartitionDescList());
private static CacheTag decodeCacheTag(LlapDaemonProtocolProtos.CacheTag ct, Configuration conf) {
String tableName = ct.getTableName();
String[] parts = tableName.split("\\.");
if (parts.length == 2) {
// db.table without catalog, prepend current or default catalog
tableName = HiveUtils.getCurrentCatalogOrDefault(conf) + '.' + tableName;
}
return ct.getPartitionDescCount() == 0
? CacheTag.build(tableName)
: CacheTag.build(tableName, ct.getPartitionDescList());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -324,9 +325,13 @@ public long evictEntity(LlapDaemonProtocolProtos.EvictEntityRequestProto protoRe
if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append(markedBytes).append(" bytes marked for eviction from LLAP cache buffers that belong to table(s): ");
for (String table : request.getEntities().get(request.getSingleDbName()).keySet()) {
sb.append(table).append(" ");
}
String catalog = request.getSingleCatalogName();
String db = request.getSingleDbName();
request.getEntities()
.getOrDefault(catalog, Map.of())
.getOrDefault(db, Map.of())
.keySet()
.forEach(table -> sb.append(catalog + db + table).append(" "));
sb.append(" Duration: ").append(time).append(" ms");
LOG.debug(sb.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff
// LlapInputFormat needs to know the file schema to decide if schema evolution is supported.
PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(split.getPath(), parts);
cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, partitionDesc) : null;
? LlapHiveUtils.getCacheTag(split.getPath(), true, partitionDesc) : null;
// 1. Get file metadata from cache, or create the reader and read it.
// Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
fsSupplier = getFsSupplier(split.getPath(), jobConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public MemoryBuffer create() {
PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(split.getPath(), parts);
fileKey = determineCacheKey(fs, split, partitionDesc, daemonConf);
cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, partitionDesc) : null;
? LlapHiveUtils.getCacheTag(split.getPath(), true, partitionDesc) : null;
this.sourceInputFormat = sourceInputFormat;
this.sourceSerDe = sourceSerDe;
this.reporter = reporter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.hive.ql.io.orc.encoded.IncompleteCb;
Expand Down Expand Up @@ -140,7 +141,6 @@ public boolean isMarkedForEviction() {

@Override
public CacheTag getTag() {
// We don't care about these.
return CacheTag.build("OrcEstimates");
return CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "OrcEstimates");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
*/
package org.apache.hadoop.hive.llap.cache;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.metastore.Warehouse;

import org.junit.BeforeClass;
import org.junit.Test;
Expand Down Expand Up @@ -127,7 +126,7 @@ public void testCacheTagComparison() {
public void testEncodingDecoding() throws Exception {
LinkedHashMap<String, String> partDescs = new LinkedHashMap<>();
partDescs.put("pytha=goras", "a2+b2=c2");
CacheTag tag = CacheTag.build("math.rules", partDescs);
CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + ".math.rules", partDescs);
CacheTag.SinglePartitionCacheTag stag = ((CacheTag.SinglePartitionCacheTag)tag);
assertEquals("pytha=goras=a2+b2=c2", stag.partitionDescToString());
assertEquals(1, stag.getPartitionDescMap().size());
Expand All @@ -136,7 +135,7 @@ public void testEncodingDecoding() throws Exception {
partDescs.clear();
partDescs.put("mutli=one", "one=/1");
partDescs.put("mutli=two/", "two=2");
tag = CacheTag.build("math.rules", partDescs);
tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME + ".math.rules", partDescs);
CacheTag.MultiPartitionCacheTag mtag = ((CacheTag.MultiPartitionCacheTag)tag);
assertEquals("mutli=one=one=/1/mutli=two/=two=2", mtag.partitionDescToString());
assertEquals(2, mtag.getPartitionDescMap().size());
Expand Down Expand Up @@ -168,6 +167,10 @@ private static LlapCacheableBuffer createMockBuffer(long size, CacheTag cacheTag
}

public static CacheTag cacheTagBuilder(String dbAndTable, String... partitions) {
String[] parts = dbAndTable.split("\\.");
if(parts.length < 3) {
dbAndTable = Warehouse.DEFAULT_CATALOG_NAME + "." + dbAndTable;
}
if (partitions != null && partitions.length > 0) {
LinkedHashMap<String, String> partDescs = new LinkedHashMap<>();
for (String partition : partitions) {
Expand Down Expand Up @@ -215,33 +218,33 @@ private static void evictSomeTestBuffers() {
private static final String EXPECTED_CACHE_STATE_WHEN_FULL =
"\n" +
"Cache state: \n" +
"default : 2/2, 2101248/2101248\n" +
"default.testtable : 2/2, 2101248/2101248\n" +
"otherdb : 7/7, 1611106304/1611106304\n" +
"otherdb.testtable : 4/4, 231424/231424\n" +
"otherdb.testtable/p=v1 : 3/3, 100352/100352\n" +
"otherdb.testtable/p=v1/pp=vv1 : 2/2, 34816/34816\n" +
"otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
"otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
"otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
"otherdb.testtable2 : 2/2, 537133056/537133056\n" +
"otherdb.testtable2/p=v3 : 2/2, 537133056/537133056\n" +
"otherdb.testtable3 : 1/1, 1073741824/1073741824";
"hive.default : 2/2, 2101248/2101248\n" +
"hive.default.testtable : 2/2, 2101248/2101248\n" +
"hive.otherdb : 7/7, 1611106304/1611106304\n" +
"hive.otherdb.testtable : 4/4, 231424/231424\n" +
"hive.otherdb.testtable/p=v1 : 3/3, 100352/100352\n" +
"hive.otherdb.testtable/p=v1/pp=vv1 : 2/2, 34816/34816\n" +
"hive.otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
"hive.otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
"hive.otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
"hive.otherdb.testtable2 : 2/2, 537133056/537133056\n" +
"hive.otherdb.testtable2/p=v3 : 2/2, 537133056/537133056\n" +
"hive.otherdb.testtable3 : 1/1, 1073741824/1073741824";

private static final String EXPECTED_CACHE_STATE_AFTER_EVICTION =
"\n" +
"Cache state: \n" +
"default : 0/2, 0/2101248\n" +
"default.testtable : 0/2, 0/2101248\n" +
"otherdb : 5/7, 1074202624/1611106304\n" +
"otherdb.testtable : 3/4, 198656/231424\n" +
"otherdb.testtable/p=v1 : 2/3, 67584/100352\n" +
"otherdb.testtable/p=v1/pp=vv1 : 1/2, 2048/34816\n" +
"otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
"otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
"otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
"otherdb.testtable2 : 1/2, 262144/537133056\n" +
"otherdb.testtable2/p=v3 : 1/2, 262144/537133056\n" +
"otherdb.testtable3 : 1/1, 1073741824/1073741824";
"hive.default : 0/2, 0/2101248\n" +
"hive.default.testtable : 0/2, 0/2101248\n" +
"hive.otherdb : 5/7, 1074202624/1611106304\n" +
"hive.otherdb.testtable : 3/4, 198656/231424\n" +
"hive.otherdb.testtable/p=v1 : 2/3, 67584/100352\n" +
"hive.otherdb.testtable/p=v1/pp=vv1 : 1/2, 2048/34816\n" +
"hive.otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
"hive.otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
"hive.otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
"hive.otherdb.testtable2 : 1/2, 262144/537133056\n" +
"hive.otherdb.testtable2/p=v3 : 1/2, 262144/537133056\n" +
"hive.otherdb.testtable3 : 1/1, 1073741824/1073741824";

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.base.Function;
import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.junit.Test;

import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -32,7 +33,7 @@ public void testFileCacheMetadata() {
ConcurrentHashMap<Object, FileCache<Object>> cache = new ConcurrentHashMap<>();
Object fileKey = 1234L;
Function<Void, Object> f = a -> new Object();
CacheTag tag = CacheTag.build("test_table");
CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "test_db.test_table");

FileCache<Object> result = FileCache.getOrAddFileSubCache(cache, fileKey, f, tag);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
Expand Down Expand Up @@ -309,13 +310,13 @@

LlapDataBuffer[] buffs1 = IntStream.range(0, 4).mapToObj(i -> fb()).toArray(LlapDataBuffer[]::new);
DiskRange[] drs1 = drs(IntStream.range(1, 5).toArray());
CacheTag tag1 = CacheTag.build("default.table1");
CacheTag tag1 = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table1");

LlapDataBuffer[] buffs2 = IntStream.range(0, 41).mapToObj(i -> fb()).toArray(LlapDataBuffer[]::new);
DiskRange[] drs2 = drs(IntStream.range(1, 42).toArray());
CacheTag tag2 = CacheTag.build("default.table2");
CacheTag tag2 = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table2");

Predicate<CacheTag> predicate = tag -> "default.table1".equals(tag.getTableName());
Predicate<CacheTag> predicate = tag -> (Warehouse.DEFAULT_CATALOG_NAME + "." + "default.table1").equals(tag.getTableName());

Check warning on line 319 in llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 120 characters (found 128).

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ0zumJEbMJlKzboIx5R&open=AZ0zumJEbMJlKzboIx5R&pullRequest=6379

cache.putFileData(fn1, drs1, buffs1, 0, Priority.NORMAL, null, tag1);
cache.putFileData(fn2, drs2, buffs2, 0, Priority.NORMAL, null, tag2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.IllegalCacheConfigurationException;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader;
import org.apache.hadoop.hive.llap.io.metadata.MetadataCache;
Expand Down Expand Up @@ -249,7 +250,7 @@
Path path = new Path("../data/files/alltypesorc");
Configuration jobConf = new Configuration();
Configuration daemonConf = new Configuration();
CacheTag tag = CacheTag.build("test-table");
CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "test-db.test-table");
OrcTail uncached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache, null);
jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true");
OrcTail cached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache, null);
Expand All @@ -270,7 +271,7 @@
Path path = new Path("../data/files/alltypesorc");
Configuration jobConf = new Configuration();
Configuration daemonConf = new Configuration();
CacheTag tag = CacheTag.build("test-table");
CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "test-db.test-table");
FileSystem fs = FileSystem.get(daemonConf);
FileStatus fileStatus = fs.getFileStatus(path);
OrcTail uncached = OrcEncodedDataReader.getOrcTailForPath(fileStatus.getPath(), jobConf, tag, daemonConf, cache, new SyntheticFileId(fileStatus));
Expand All @@ -294,7 +295,7 @@
Path path = new Path("../data/files/alltypesorc");
Configuration jobConf = new Configuration();
Configuration daemonConf = new Configuration();
CacheTag tag = CacheTag.build("test-table");
CacheTag tag = CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "test-db.test-table");
OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache, new SyntheticFileId(path, 100, 100));
jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true");
Exception ex = null;
Expand Down Expand Up @@ -337,19 +338,22 @@
// below is of length 65
ByteBuffer bb2 = ByteBuffer.wrap("-large-meta-data-content-large-meta-data-content-large-meta-data-".getBytes());

LlapBufferOrBuffers table1Buffers1 = cache.putFileMetadata(fn1, bb, CacheTag.build("default.table1"), isStopped);
LlapBufferOrBuffers table1Buffers1 = cache.putFileMetadata(fn1, bb,
CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table1"), isStopped);
assertNotNull(table1Buffers1.getSingleLlapBuffer());

LlapBufferOrBuffers table1Buffers2 = cache.putFileMetadata(fn2, bb2, CacheTag.build("default.table1"), isStopped);
LlapBufferOrBuffers table1Buffers2 = cache.putFileMetadata(fn2, bb2,
CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table1"), isStopped);
assertNotNull(table1Buffers2.getMultipleLlapBuffers());
assertEquals(2, table1Buffers2.getMultipleLlapBuffers().length);

// Case for when metadata consists of just 1 buffer (most of the realworld cases)
ByteBuffer bb3 = ByteBuffer.wrap("small-meta-data-content-for-otherFile".getBytes());
LlapBufferOrBuffers table2Buffers1 = cache.putFileMetadata(fn3, bb3, CacheTag.build("default.table2"), isStopped);
LlapBufferOrBuffers table2Buffers1 = cache.putFileMetadata(fn3, bb3,
CacheTag.build(Warehouse.DEFAULT_CATALOG_NAME, "default.table2"), isStopped);
assertNotNull(table2Buffers1.getSingleLlapBuffer());

Predicate<CacheTag> predicate = tag -> "default.table1".equals(tag.getTableName());
Predicate<CacheTag> predicate = tag -> (Warehouse.DEFAULT_CATALOG_NAME + ".default.table1").equals(tag.getTableName());

Check warning on line 356 in llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 120 characters (found 123).

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ0HHTDQZ7vxcJvLujNO&open=AZ0HHTDQZ7vxcJvLujNO&pullRequest=6379

// Simulating eviction on some buffers
table1Buffers2.getMultipleLlapBuffers()[1].decRef();
Expand Down
Loading
Loading