Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,29 @@ public int getRegionSizeMB(int region) {
if (load == null) {
return 0;
}
return regionLoads[region].getLast().getStorefileSizeMB();
return load.getLast().getStorefileSizeMB();
}

/**
* Finds and return the sum of latest reported cache ratio and cold data ratio for the region on
* the RegionServer it's currently online.
*/
float getSumRegionCacheAndColdDataRatio(int region) {
Deque<BalancerRegionLoad> dq = regionLoads[region];
if (dq == null || dq.isEmpty()) {
return 0.0f;
}
BalancerRegionLoad load = dq.getLast();
return load.getCurrentRegionCacheRatio() + load.getRegionColdDataRatio();
}

int getRegionSizeMinusColdDataMB(int region) {
Deque<BalancerRegionLoad> dq = regionLoads[region];
if (dq == null || dq.isEmpty()) {
return 0;
}
BalancerRegionLoad load = dq.getLast();
return load.getRegionSizeMB() - (int) (load.getRegionSizeMB() * load.getRegionColdDataRatio());
}

/**
Expand Down Expand Up @@ -592,23 +614,11 @@ private void computeCachedLocalities() {

}

/**
* Returns the size of hFiles from the most recent RegionLoad for region
*/
public int getTotalRegionHFileSizeMB(int region) {
Deque<BalancerRegionLoad> load = regionLoads[region];
if (load == null) {
// This means, that the region has no actual data on disk
return 0;
}
return regionLoads[region].getLast().getRegionSizeMB();
}

/**
* Returns the weighted cache ratio of a region on the given region server
*/
public float getOrComputeWeightedRegionCacheRatio(int region, int server) {
return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server);
return getRegionSizeMinusColdDataMB(region) * getOrComputeRegionCacheRatio(region, server);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class BalancerRegionLoad {
private final int storefileSizeMB;
private final int regionSizeMB;
private final float currentRegionPrefetchRatio;
private final float regionColdDataRatio;

BalancerRegionLoad(RegionMetrics regionMetrics) {
readRequestsCount = regionMetrics.getReadRequestCount();
Expand All @@ -45,6 +46,7 @@ class BalancerRegionLoad {
storefileSizeMB = (int) regionMetrics.getStoreFileSize().get(Size.Unit.MEGABYTE);
regionSizeMB = (int) regionMetrics.getRegionSizeMB().get(Size.Unit.MEGABYTE);
currentRegionPrefetchRatio = regionMetrics.getCurrentRegionCachedRatio();
regionColdDataRatio = regionMetrics.getCurrentRegionColdDataRatio();
}

public long getReadRequestsCount() {
Expand Down Expand Up @@ -74,4 +76,8 @@ public int getRegionSizeMB() {
public float getCurrentRegionCacheRatio() {
return currentRegionPrefetchRatio;
}

public float getRegionColdDataRatio() {
return regionColdDataRatio;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY;

import java.math.BigDecimal;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -36,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.RegionMetrics;
Expand Down Expand Up @@ -88,10 +90,11 @@ public class CacheAwareLoadBalancer extends StochasticLoadBalancer {
private Long sleepTime;
private Configuration configuration;

public enum GeneratorFunctionType {
LOAD,
CACHE_RATIO
}
private float lowCacheRatioThreshold;
private float potentialCacheRatioAfterMove;
private float minFreeCacheSpaceFactor;

private BigDecimal simulatedRatio = new BigDecimal(0);

@Override
public void loadConf(Configuration configuration) {
Expand All @@ -101,6 +104,12 @@ public void loadConf(Configuration configuration) {
ratioThreshold =
this.configuration.getFloat(CACHE_RATIO_THRESHOLD, CACHE_RATIO_THRESHOLD_DEFAULT);
sleepTime = configuration.getLong(MOVE_THROTTLING, MOVE_THROTTLING_DEFAULT.toMillis());
lowCacheRatioThreshold = configuration.getFloat(LOW_CACHE_RATIO_FOR_RELOCATION_KEY,
LOW_CACHE_RATIO_FOR_RELOCATION_DEFAULT);
potentialCacheRatioAfterMove = configuration.getFloat(POTENTIAL_CACHE_RATIO_AFTER_MOVE_KEY,
POTENTIAL_CACHE_RATIO_AFTER_MOVE_DEFAULT);
minFreeCacheSpaceFactor =
configuration.getFloat(MIN_FREE_CACHE_SPACE_FACTOR_KEY, MIN_FREE_CACHE_SPACE_FACTOR_DEFAULT);
}

@Override
Expand Down Expand Up @@ -192,15 +201,13 @@ private void updateRegionLoad() {
int regionSizeMB =
regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getSecond();
// The coldDataSize accounts for data size classified as "cold" by DataTieringManager,
// which should be kept out of cache. We sum cold region size in the cache ratio, as we
// which should be kept out of cache. We calculate cache ratio on old server based
// only on the hot data size for the region (regionSizeMB - coldDataSize), as we
// don't want to move regions with low cache ratio due to data classified as cold.
float regionCacheRatioOnOldServer =
regionSizeMB
== 0
? 0.0f
: (float) (regionSizeInCache
+ sm.getRegionColdDataSize().getOrDefault(regionEncodedName, 0))
/ regionSizeMB;
int coldDataSize = sm.getRegionColdDataSize().getOrDefault(regionEncodedName, 0);
float regionCacheRatioOnOldServer = (regionSizeMB - coldDataSize) <= 0
? 0.0f
: (float) regionSizeInCache / (regionSizeMB - coldDataSize);
regionCacheRatioOnOldServerMap.put(regionEncodedName,
new Pair<>(sn, regionCacheRatioOnOldServer));
}
Expand Down Expand Up @@ -271,6 +278,7 @@ protected List<RegionPlan> balanceTable(TableName tableName,
private class CacheAwareCandidateGenerator extends CandidateGenerator {
@Override
protected BalanceAction generate(BalancerClusterState cluster) {
simulatedRatio = BigDecimal.ZERO;
// Move the regions to the servers they were previously hosted on based on the cache ratio
if (
!regionCacheRatioOnOldServerMap.isEmpty()
Expand Down Expand Up @@ -310,6 +318,50 @@ protected BalanceAction generate(BalancerClusterState cluster) {
regionCacheRatioOnOldServerMap.remove(regionEncodedName);
return action;
}
return generatePlanForFreeCacheSpace(cluster);
}

private BalanceAction generatePlanForFreeCacheSpace(BalancerClusterState cluster) {
if (cluster.serverBlockCacheFreeSize == null) {
return BalanceAction.NULL_ACTION;
}
List<BalanceAction> possibleActions = new ArrayList<>();
Map<Integer, Long> serverFreeCacheAfterAction = new HashMap<>();
for (int region = 0; region < cluster.numRegions; region++) {
RegionInfo regionInfo = cluster.regions[region];
if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
continue;
}
int currentServer = cluster.regionIndexToServerIndex[region];
float ratio = cluster.getSumRegionCacheAndColdDataRatio(region);
if (ratio >= lowCacheRatioThreshold) {
continue;
}
int regionSizeMb = cluster.getRegionSizeMinusColdDataMB(region);
if (regionSizeMb <= 0) {
continue;
}
long bytesNeeded = (long) (regionSizeMb * 1024L * 1024L * minFreeCacheSpaceFactor);
for (int server = 0; server < cluster.numServers; server++) {
// Skips current server for region, as we can't generate a move to same server
if (server == currentServer) {
continue;
}
serverFreeCacheAfterAction.putIfAbsent(server, cluster.serverBlockCacheFreeSize[server]);
if (serverFreeCacheAfterAction.get(server) >= bytesNeeded) {
serverFreeCacheAfterAction.compute(server, (s, freeCache) -> freeCache - bytesNeeded);
possibleActions.add(getAction(currentServer, region, server, -1));
}
}
}
if (!possibleActions.isEmpty()) {
BalanceAction action =
possibleActions.get(ThreadLocalRandom.current().nextInt(possibleActions.size()));
LOG.debug("region {} had sum ratio {}",
cluster.regions[((MoveRegionAction) action).getRegion()].getEncodedName(),
cluster.getSumRegionCacheAndColdDataRatio(((MoveRegionAction) action).getRegion()));
return action;
}
return BalanceAction.NULL_ACTION;
}

Expand All @@ -319,7 +371,7 @@ private BalanceAction generatePlan(BalancerClusterState cluster, int regionIndex
return moveRegionToOldServer(cluster, regionIndex, currentServerIndex,
cacheRatioOnCurrentServer, oldServerIndex, cacheRatioOnOldServer)
? getAction(currentServerIndex, regionIndex, oldServerIndex, -1)
: BalanceAction.NULL_ACTION;
: generatePlanForFreeCacheSpace(cluster);
}

private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIndex,
Expand Down Expand Up @@ -385,6 +437,7 @@ private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIn
private class CacheAwareSkewnessCandidateGenerator extends LoadCandidateGenerator {
@Override
BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) {
simulatedRatio = BigDecimal.ZERO;
// First move all the regions which were hosted previously on some other server back to their
// old servers
if (
Expand Down Expand Up @@ -519,7 +572,7 @@ public final void updateWeight(Map<Class<? extends CandidateGenerator>, Double>
}
}

static class CacheAwareCostFunction extends CostFunction {
class CacheAwareCostFunction extends CostFunction {
private static final String CACHE_COST_KEY = "hbase.master.balancer.stochastic.cacheCost";
private double cacheRatio;
private double bestCacheRatio;
Expand Down Expand Up @@ -561,14 +614,13 @@ private void recomputeCacheRatio(BalancerClusterState cluster) {
currentSum += currentWeighted[region];
// here we only get the server index where this region cache ratio is the highest
int serverIndexBestCache = cluster.getOrComputeServerWithBestRegionCachedRatio()[region];
// get the highest cacheRatio for this region on the current state of allocations
double currentHighestCache =
cluster.getOrComputeWeightedRegionCacheRatio(region, serverIndexBestCache);
// Get a hypothetical best cache ratio for this region if any server has enough free cache
// to host it.
double potentialHighestCache =
potentialBestWeightedFromFreeCache(cluster, region, currentHighestCache);
double actualHighest = Math.max(currentHighestCache, potentialHighestCache);
bestCacheSum += actualHighest;
double potentialHighestCache = potentialBestWeightedFromFreeCache(cluster, region);
bestCacheSum += Math.max(currentHighestCache, potentialHighestCache);
}
bestCacheRatio = bestCacheSum;
if (bestCacheSum <= 0.0) {
Expand All @@ -583,11 +635,24 @@ private double[] computeCurrentWeightedContributions(BalancerClusterState cluste
double[] contrib = new double[totalRegions];
for (int r = 0; r < totalRegions; r++) {
int s = cluster.regionIndexToServerIndex[r];
int sizeMb = cluster.getTotalRegionHFileSizeMB(r);
int sizeMb = cluster.getRegionSizeMinusColdDataMB(r);
if (sizeMb <= 0) {
contrib[r] = 0.0;
continue;
}
boolean movedInSimulation = cluster.initialRegionIndexToServerIndex[r] != s;
if (
cluster.serverBlockCacheFreeSize != null && movedInSimulation
&& cluster.getSumRegionCacheAndColdDataRatio(r) < lowCacheRatioThreshold
) {
LOG.debug("Region {} is simulated moved to new server {}",
cluster.regions[r].getEncodedName(), cluster.servers[s].getHostname());
long bytesNeeded = (long) (sizeMb * 1024L * 1024L * minFreeCacheSpaceFactor);
if (cluster.serverBlockCacheFreeSize[s] >= bytesNeeded) {
contrib[r] = sizeMb * potentialCacheRatioAfterMove;
continue;
}
}
contrib[r] = cluster.getOrComputeWeightedRegionCacheRatio(r, s);
}
return contrib;
Expand All @@ -599,21 +664,20 @@ private double[] computeCurrentWeightedContributions(BalancerClusterState cluste
* #potentialCacheRatioAfterMove} * region MB) so placement is not considered optimal solely
* from low ratios when capacity exists somewhere in the cluster.
*/
private double potentialBestWeightedFromFreeCache(BalancerClusterState cluster, int region,
double currentHighestCache) {
float observedRatio = cluster.getObservedRegionCacheRatio(region);
private double potentialBestWeightedFromFreeCache(BalancerClusterState cluster, int region) {
float observedRatio = cluster.getSumRegionCacheAndColdDataRatio(region);
if (observedRatio >= lowCacheRatioThreshold) {
return 0.0;
}
int regionSizeMb = cluster.getTotalRegionHFileSizeMB(region);
int regionSizeMb = cluster.getRegionSizeMinusColdDataMB(region);
if (regionSizeMb <= 0) {
return 0.0;
}
long regionSizeBytes = (long) regionSizeMb * 1024L * 1024L;
long requiredFree = (long) (regionSizeBytes * minFreeCacheSpaceFactor);
for (int s = 0; s < cluster.numServers; s++) {
if (cluster.serverBlockCacheFreeSize[s] >= requiredFree) {
return Math.max(currentHighestCache, regionSizeMb * potentialCacheRatioAfterMove);
return regionSizeMb * potentialCacheRatioAfterMove;
}
}
return 0.0;
Expand All @@ -628,18 +692,39 @@ protected double cost() {
protected void regionMoved(int region, int oldServer, int newServer) {
double regionCacheRatioOnOldServer =
cluster.getOrComputeWeightedRegionCacheRatio(region, oldServer);
double regionCacheRatioOnNewServer =
cluster.getOrComputeWeightedRegionCacheRatio(region, newServer);
double cacheRatioDiff = regionCacheRatioOnNewServer - regionCacheRatioOnOldServer;
double normalizedDelta = bestCacheRatio == 0.0 ? 0.0 : cacheRatioDiff / bestCacheRatio;
cacheRatio += normalizedDelta;
if (LOG.isDebugEnabled() && (cacheRatio < 0.0 || cacheRatio > 1.0)) {
if (simulatedRatio.equals(BigDecimal.ZERO)) {
double potentialCachedSizeOnNewServer =
cluster.getRegionSizeMinusColdDataMB(region) * potentialCacheRatioAfterMove;
boolean simulateCacheBasedOnFreeSpace =
cluster.getOrComputeRegionCacheRatio(region, oldServer) < lowCacheRatioThreshold
&& cluster.serverBlockCacheFreeSize[newServer] >= potentialCachedSizeOnNewServer;
double regionCacheRatioOnNewServer = simulateCacheBasedOnFreeSpace
? potentialCachedSizeOnNewServer
: cluster.getOrComputeWeightedRegionCacheRatio(region, newServer);
double cacheRatioDiff = regionCacheRatioOnNewServer - regionCacheRatioOnOldServer;
double normalizedDelta = bestCacheRatio == 0.0 ? 0.0 : cacheRatioDiff / bestCacheRatio;
LOG.debug(
"CacheAwareCostFunction:regionMoved:region:{}:from:{}:to:{}:regionCacheRatioOnOldServer:{}:"
+ "regionCacheRatioOnNewServer:{}:bestRegionCacheRatio:{}:cacheRatio:{}",
cluster.regions[region].getEncodedName(), cluster.servers[oldServer].getHostname(),
cluster.servers[newServer].getHostname(), regionCacheRatioOnOldServer,
regionCacheRatioOnNewServer, bestCacheRatio, cacheRatio);
"simulating moving region {} using simulateCacheBasedOnFreeSpace={} "
+ "got a normalized delta of {} to be added to cacheRatio: {}",
cluster.regions[region].getEncodedName(), simulateCacheBasedOnFreeSpace, normalizedDelta,
cacheRatio);
simulatedRatio = BigDecimal.valueOf(normalizedDelta);
cacheRatio += normalizedDelta;
if (cacheRatio < 0.0 || cacheRatio > 1.0) {
LOG.info(
"Recomputing cacheRatio after calculating impact of region move: \n "
+ "CacheAwareCostFunction:regionMoved:region:{}:from:{}:to:{}:"
+ "regionCacheRatioOnOldServer:{}:regionCacheRatioOnNewServer:{}:"
+ "bestRegionCacheRatio:{}:cacheRatio:{}",
cluster.regions[region].getEncodedName(), cluster.servers[oldServer].getHostname(),
cluster.servers[newServer].getHostname(), regionCacheRatioOnOldServer,
regionCacheRatioOnNewServer, bestCacheRatio, cacheRatio);
recomputeCacheRatio(cluster);
}
} else {
// This means we are in an undoAction call and need to reverse the cache delta applied in
// the region move simulation
cacheRatio -= simulatedRatio.doubleValue();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,15 @@ protected List<RegionPlan> balanceTable(TableName tableName,

newCost = computeCost(cluster, currentCost);

if (LOG.isDebugEnabled()) {
LOG.debug(
"action moving region {} from {} to {} with cost {}. currentCost={}, functionCost={}",
cluster.regions[((MoveRegionAction) action).getRegion()].getEncodedName(),
cluster.servers[((MoveRegionAction) action).getFromServer()].getServerName(),
cluster.servers[((MoveRegionAction) action).getToServer()].getServerName(), newCost,
currentCost, functionCost());
}

double costImprovement = currentCost - newCost;
double minimumImprovement =
Math.max(CostFunction.getCostEpsilon(currentCost), CostFunction.getCostEpsilon(newCost));
Expand Down
Loading