Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.doris.proto.OlapFile.EncryptionAlgorithmPB;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.computegroup.ComputeGroup;
import org.apache.doris.rpc.RpcException;
Expand Down Expand Up @@ -236,8 +237,8 @@ public enum OlapTableState {

// Cache for table version in cloud mode
// This value is set when get the table version from meta-service, 0 means version is not cached yet
private long lastTableVersionCachedTimeMs = 0;
private long cachedTableVersion = -1;
private volatile long lastTableVersionCachedTimeMs = 0;
private volatile long cachedTableVersion = -1;

public OlapTable() {
// for persist
Expand Down Expand Up @@ -3285,24 +3286,24 @@ public long getNextVersion() {
}
}

public boolean isCachedTableVersionExpired() {
@VisibleForTesting
protected boolean isCachedTableVersionExpired() {
// -1 means no cache yet, need to fetch from MS
if (cachedTableVersion == -1) {
return true;
}
ConnectContext ctx = ConnectContext.get();
if (ctx == null) {
return true;
}
long cacheExpirationMs = ctx.getSessionVariable().cloudTableVersionCacheTtlMs;
long cacheExpirationMs = ctx == null ? VariableMgr.getDefaultSessionVariable().cloudTableVersionCacheTtlMs
: ctx.getSessionVariable().cloudTableVersionCacheTtlMs;
if (cacheExpirationMs <= 0) { // always expired
return true;
}
return System.currentTimeMillis() - lastTableVersionCachedTimeMs > cacheExpirationMs;
}

public void setCachedTableVersion(long version) {
if (version > cachedTableVersion) {
@VisibleForTesting
protected void setCachedTableVersion(long version) {
if (version >= cachedTableVersion) {
cachedTableVersion = version;
lastTableVersionCachedTimeMs = System.currentTimeMillis();
}
Expand Down Expand Up @@ -3356,9 +3357,9 @@ public long getVisibleVersion() throws RpcException {
}

// Get the table versions in batch.
public static List<Long> getVisibleVersionInBatch(Collection<OlapTable> tables) {
public static List<Long> getVisibleVersionInBatch(List<OlapTable> tables) {
if (tables.isEmpty()) {
return new ArrayList<>();
return Collections.emptyList();
}

if (Config.isNotCloudMode()) {
Expand All @@ -3367,14 +3368,21 @@ public static List<Long> getVisibleVersionInBatch(Collection<OlapTable> tables)
.collect(Collectors.toList());
}

List<Long> dbIds = new ArrayList<>();
List<Long> tableIds = new ArrayList<>();
List<Long> dbIds = new ArrayList<>(tables.size());
List<Long> tableIds = new ArrayList<>(tables.size());
for (OlapTable table : tables) {
dbIds.add(table.getDatabase().getId());
tableIds.add(table.getId());
}

return getVisibleVersionFromMeta(dbIds, tableIds);
List<Long> versions = getVisibleVersionFromMeta(dbIds, tableIds);

// update cache
Preconditions.checkState(tables.size() == versions.size());
for (int i = 0; i < tables.size(); i++) {
tables.get(i).setCachedTableVersion(versions.get(i));
}
return versions;
}

private static List<Long> getVisibleVersionFromMeta(List<Long> dbIds, List<Long> tableIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,18 @@
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;

import com.google.common.annotations.VisibleForTesting;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
Expand All @@ -56,7 +58,7 @@ public class CloudPartition extends Partition {
private long tableId;

// This value is set when get the version from meta-service, 0 means version is not cached yet
private long lastVersionCachedTimeMs = 0;
private volatile long lastVersionCachedTimeMs = 0;

private ReentrantLock lock = new ReentrantLock(true);

Expand Down Expand Up @@ -96,7 +98,7 @@ protected void setVisibleVersion(long visibleVersion) {
return;
}

public void setCachedVisibleVersion(long version, Long versionUpdateTimeMs) {
public void setCachedVisibleVersion(long version, long versionUpdateTimeMs) {
// we only care the version should increase monotonically and ignore the readers
LOG.debug("setCachedVisibleVersion use CloudPartition {}, version: {}, old version: {}",
super.getId(), version, super.getVisibleVersion());
Expand All @@ -115,8 +117,14 @@ public long getCachedVisibleVersion() {
return super.getVisibleVersion();
}

public boolean isCachedVersionExpired() {
long cacheExpirationMs = SessionVariable.cloudPartitionVersionCacheTtlMs;
@VisibleForTesting
protected boolean isCachedVersionExpired() {
if (lastVersionCachedTimeMs == 0) {
return true;
}
ConnectContext ctx = ConnectContext.get();
long cacheExpirationMs = ctx == null ? VariableMgr.getDefaultSessionVariable().cloudPartitionVersionCacheTtlMs
: ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs;
if (cacheExpirationMs <= 0) { // always expired
return true;
}
Expand Down Expand Up @@ -148,6 +156,7 @@ public long getVisibleVersionFromMs(boolean waitForPendingTxns) {
.setTableId(this.tableId)
.setPartitionId(super.getId())
.setBatchMode(false)
.setWaitForPendingTxn(waitForPendingTxns)
.build();

try {
Expand Down Expand Up @@ -251,16 +260,18 @@ public static List<Long> getSnapshotVisibleVersionFromMs(
// Return the visible version in order of the specified partition ids, -1 means version NOT FOUND.
public static List<Long> getSnapshotVisibleVersion(List<CloudPartition> partitions) throws RpcException {
if (partitions.isEmpty()) {
return new ArrayList<>();
return Collections.emptyList();
}

if (SessionVariable.cloudPartitionVersionCacheTtlMs <= 0) { // No cached versions will be used
long cloudPartitionVersionCacheTtlMs = ConnectContext.get() == null ? 0
: ConnectContext.get().getSessionVariable().cloudPartitionVersionCacheTtlMs;
if (cloudPartitionVersionCacheTtlMs <= 0) { // No cached versions will be used
return getSnapshotVisibleVersionFromMs(partitions, false);
}

// partitionId -> cachedVersion
List<Pair<Long, Long>> allVersions = new ArrayList<>();
List<CloudPartition> expiredPartitions = new ArrayList<>();
List<Pair<Long, Long>> allVersions = new ArrayList<>(partitions.size());
List<CloudPartition> expiredPartitions = new ArrayList<>(partitions.size());
for (CloudPartition partition : partitions) {
long ver = partition.getCachedVisibleVersion();
if (partition.isCachedVersionExpired()) {
Expand All @@ -272,8 +283,7 @@ public static List<Long> getSnapshotVisibleVersion(List<CloudPartition> partitio

if (LOG.isDebugEnabled()) {
LOG.debug("cloudPartitionVersionCacheTtlMs={}, numPartitions={}, numFilteredPartitions={}",
SessionVariable.cloudPartitionVersionCacheTtlMs,
partitions.size(), partitions.size() - expiredPartitions.size());
cloudPartitionVersionCacheTtlMs, partitions.size(), partitions.size() - expiredPartitions.size());
}

List<Long> versions = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2807,7 +2807,7 @@ public void setDetailShapePlanNodes(String detailShapePlanNodes) {
@VariableMgr.VarAttr(name = DISABLE_EMPTY_PARTITION_PRUNE)
public boolean disableEmptyPartitionPrune = false;
@VariableMgr.VarAttr(name = CLOUD_PARTITION_VERSION_CACHE_TTL_MS)
public static long cloudPartitionVersionCacheTtlMs = 0;
public long cloudPartitionVersionCacheTtlMs = 0;
@VariableMgr.VarAttr(name = CLOUD_TABLE_VERSION_CACHE_TTL_MS)
public long cloudTableVersionCacheTtlMs = 0;
// CLOUD_VARIABLES_END
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.rpc.VersionHelper;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.rpc.RpcException;

Expand All @@ -44,20 +45,30 @@ public static CloudPartition createPartition(long pId, long dbId, long tblId) {

@Test
public void testIsCachedVersionExpired() {
// Create ConnectContext with SessionVariable
ConnectContext ctx = new ConnectContext();
ctx.setSessionVariable(new SessionVariable());
ctx.setThreadLocalInfo();

// test isCachedVersionExpired
CloudPartition part = createPartition(1, 2, 3);
SessionVariable.cloudPartitionVersionCacheTtlMs = 0;
ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = 0;
Assertions.assertTrue(part.isCachedVersionExpired());
SessionVariable.cloudPartitionVersionCacheTtlMs = -10086;
ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = -10086;
part.setCachedVisibleVersion(2, 10086L); // update version and last cache time
SessionVariable.cloudPartitionVersionCacheTtlMs = 10000;
ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = 10000;
Assertions.assertFalse(part.isCachedVersionExpired()); // not expired due to long expiration duration
Assertions.assertEquals(2, part.getCachedVisibleVersion());

}

@Test
public void testCachedVersion() throws RpcException {
// Create ConnectContext with SessionVariable
ConnectContext ctx = new ConnectContext();
ctx.setSessionVariable(new SessionVariable());
ctx.setThreadLocalInfo();

CloudPartition part = createPartition(1, 2, 3);
List<CloudPartition> parts = new ArrayList<>();
for (long i = 0; i < 3; ++i) {
Expand Down Expand Up @@ -87,7 +98,7 @@ public Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionRequest req)
};
// CHECKSTYLE ON

SessionVariable.cloudPartitionVersionCacheTtlMs = -1; // disable cache
ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = -1; // disable cache
{
// test single get version
Assertions.assertEquals(2, part.getVisibleVersion()); // should not get from cache
Expand All @@ -106,7 +117,7 @@ public Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionRequest req)
}

// enable change expiration and make it cached in long duration
SessionVariable.cloudPartitionVersionCacheTtlMs = 100000;
ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = 100000;
{
// test single get version
Assertions.assertEquals(2, part.getVisibleVersion()); // cached version
Expand All @@ -125,7 +136,7 @@ public Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionRequest req)
}

// enable change expiration and make it expired
SessionVariable.cloudPartitionVersionCacheTtlMs = 500;
ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = 500;
try {
Thread.sleep(550);
} catch (InterruptedException e) {
Expand Down
Loading