Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,34 @@ private static Optional<TableState> getTableState(Result r) throws IOException {
*/
public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName) {
return toHRegionLocations(getTableRegionsAndLocations(metaTable, tableName, true));
}

/**
* Used to get a single-RPC, paginated slice of region locations for the specific table, starting
* at the meta row derived from {@code startKey} and capped at {@code rowLimit} regions.
* {@code startKey} must be a region start-key boundary (e.g. the end key of the previously
* visited region), or {@code null}/empty to start at the first region.
* @param metaTable scanner over meta table
* @param tableName table we're looking for
* @param startKey region start-key to begin scanning from (inclusive); {@code null} or empty
* starts from the first region
* @param rowLimit maximum number of meta rows to return; if {@code <= 0}, the underlying scan is
* unbounded
* @return the list of region locations. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName, byte[] startKey,
int rowLimit) {
return toHRegionLocations(
getTableRegionsAndLocations(metaTable, tableName, true, startKey, rowLimit));
}

private static CompletableFuture<List<HRegionLocation>>
toHRegionLocations(CompletableFuture<List<Pair<RegionInfo, ServerName>>> source) {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> {
addListener(source, (locations, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else if (locations == null || locations.isEmpty()) {
Expand Down Expand Up @@ -215,6 +241,39 @@ private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableReg
return future;
}

/**
* Variant of {@link #getTableRegionsAndLocations} that scans a bounded slice of meta starting at
* the row derived from {@code startKey} and stopping after at most {@code rowLimit} rows.
*/
private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableRegionsAndLocations(
final AsyncTable<AdvancedScanResultConsumer> metaTable, final TableName tableName,
final boolean excludeOfflinedSplitParents, final byte[] startKey, final int rowLimit) {
CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>();
if (TableName.META_TABLE_NAME.equals(tableName)) {
future.completeExceptionally(new IOException(
"This method can't be used to locate meta regions;" + " use MetaTableLocator instead"));
return future;
}

CollectRegionLocationsVisitor visitor =
new CollectRegionLocationsVisitor(excludeOfflinedSplitParents);

byte[] metaStart = (startKey == null || startKey.length == 0)
? getTableStartRowForMeta(tableName, QueryType.REGION)
: RegionInfo.createRegionName(tableName, startKey, HConstants.ZEROES, false);
byte[] metaStop = getTableStopRowForMeta(tableName, QueryType.REGION);

addListener(scanMeta(metaTable, metaStart, metaStop, QueryType.REGION, rowLimit, true, visitor),
(v, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
future.complete(visitor.getResults());
});
return future;
}

/**
* Performs a scan of META table for given table.
* @param metaTable scanner over meta table
Expand All @@ -225,22 +284,26 @@ private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableReg
private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
TableName tableName, QueryType type, final Visitor visitor) {
return scanMeta(metaTable, getTableStartRowForMeta(tableName, type),
getTableStopRowForMeta(tableName, type), type, Integer.MAX_VALUE, visitor);
getTableStopRowForMeta(tableName, type), type, Integer.MAX_VALUE, false, visitor);
}

/**
* Performs a scan of META table for given table.
* @param metaTable scanner over meta table
* @param startRow Where to start the scan
* @param stopRow Where to stop the scan
* @param type scanned part of meta
* @param maxRows maximum rows to return
* @param visitor Visitor invoked against each row
* @param metaTable scanner over meta table
* @param startRow Where to start the scan
* @param stopRow Where to stop the scan
* @param type scanned part of meta
* @param maxRows maximum rows to return
* @param isPagedScan when {@code true}, the scan is sized so the whole slice (up to
* {@code maxRows}) returns in a single ScannerNext RPC. When {@code false},
* uses the configured {@code hbase.meta.scanner.caching}.
* @param visitor Visitor invoked against each row
*/
private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
byte[] startRow, byte[] stopRow, QueryType type, int maxRows, final Visitor visitor) {
byte[] startRow, byte[] stopRow, QueryType type, int maxRows, boolean isPagedScan,
final Visitor visitor) {
int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
Scan scan = getMetaScan(metaTable, rowUpperLimit);
Scan scan = getMetaScan(metaTable, rowUpperLimit, isPagedScan);
for (byte[] family : type.getFamilies()) {
scan.addFamily(family);
}
Expand Down Expand Up @@ -437,7 +500,7 @@ void add(Result r) {
}
}

private static Scan getMetaScan(AsyncTable<?> metaTable, int rowUpperLimit) {
private static Scan getMetaScan(AsyncTable<?> metaTable, int rowUpperLimit, boolean isPagedScan) {
Scan scan = new Scan();
int scannerCaching = metaTable.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING,
HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);
Expand All @@ -447,11 +510,18 @@ private static Scan getMetaScan(AsyncTable<?> metaTable, int rowUpperLimit) {
) {
scan.setConsistency(Consistency.TIMELINE);
}
if (rowUpperLimit <= scannerCaching) {
if (isPagedScan) {
// Caller is doing a bounded paged scan and expects the whole slice back in one ScannerNext
// RPC. Size caching to the slice. Trade-off: a single larger response uses more RegionServer
// heap, fine for meta rows (small).
scan.setLimit(rowUpperLimit);
scan.setCaching(rowUpperLimit);
} else {
if (rowUpperLimit <= scannerCaching) {
scan.setLimit(rowUpperLimit);
}
scan.setCaching(Math.min(rowUpperLimit, scannerCaching));
}
int rows = Math.min(rowUpperLimit, scannerCaching);
scan.setCaching(rows);
return scan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,11 @@ void addLocationToCache(HRegionLocation loc) {
getTableCache(loc.getRegion().getTable()).regionLocationCache.add(createRegionLocations(loc));
}

RegionLocations getCachedLocation(TableName tableName, byte[] startKey) {
TableCache tableCache = cache.get(tableName);
return tableCache == null ? null : tableCache.regionLocationCache.get(startKey);
}

private HRegionLocation getCachedLocation(HRegionLocation loc) {
TableCache tableCache = cache.get(loc.getRegion().getTable());
if (tableCache == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Pair;
Expand Down Expand Up @@ -112,6 +113,54 @@ default CompletableFuture<List<HRegionLocation>> getRegionLocations(byte[] row)
*/
CompletableFuture<List<HRegionLocation>> getAllRegionLocations();

/**
* Bulk lookup of region locations from {@code hbase:meta} in a single RPC, starting at
* {@code startKey} (region start-key boundary, inclusive) and returning at most {@code limit}
* regions in start-key order.
* <p/>
* The returned list includes all replicas of each region (matching
* {@link #getAllRegionLocations()}), and the result is also written to the connection's region
* location cache.
* <p/>
* Ordering: regions are returned in ascending region start-key order (the natural order of
* {@code hbase:meta} rows for a single table). Within each region, replicas are returned in
* ascending replica-id order (replica 0, then 1, then 2, ...). Split parents are filtered out,
* which may cause a page to contain fewer than {@code limit} regions but never disturbs ordering
* of the survivors.
* <p/>
* To page through all regions of a table, call repeatedly passing
* {@code last.getRegion().getEndKey()} as the next {@code startKey}, where {@code last} is the
* final element of the previous response. All replicas of a region share the same
* {@link RegionInfo}, so the last entry's end key is the correct cursor regardless of which
* replica it is. Pass {@code null} for the first call. Stop paging when the returned list is
* empty or when the last region's end key is {@link HConstants#EMPTY_END_ROW} (zero-length) -
* that signals the end of the table; passing it back in would re-scan from the beginning since by
* convention an empty start key means "from the first region".
* <p/>
* Unlike {@link #getAllRegionLocations()}, this method performs at most one RPC against
* {@code hbase:meta} per invocation, so its latency is bounded by {@code limit} rather than table
* size. Note that this method does not coordinate with other in-flight meta lookups on the
* connection - aggregate pacing across concurrent callers is the caller's responsibility.
* <p/>
* This method is optional. Implementations that cannot support paginated lookups will return a
* future that completes exceptionally with {@link UnsupportedOperationException} (the default
* behavior); callers should fall back to {@link #getAllRegionLocations()} in that case.
* @param startKey region start-key to begin scanning from (inclusive); {@code null} or empty
* starts from the first region
* @param limit maximum number of regions to return; if &lt;= 0, falls back to
* {@code hbase.meta.scanner.caching}
* @return up to {@code limit} {@link HRegionLocation}s in start-key order, possibly empty when no
* more regions exist; errors are reported via the returned future
*/
default CompletableFuture<List<HRegionLocation>> getRegionLocationsPage(byte[] startKey,
int limit) {
CompletableFuture<List<HRegionLocation>> failed = new CompletableFuture<>();
failed.completeExceptionally(new UnsupportedOperationException(
"getRegionLocationsPage(byte[], int) is not supported by this AsyncTableRegionLocator;"
+ " fall back to getAllRegionLocations()"));
return failed;
}

/**
* Gets the starting row key for every region in the currently open table.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ClientMetaTableAccessor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -81,6 +83,39 @@ public CompletableFuture<List<HRegionLocation>> getRegionLocations(byte[] row, b
.thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
}

@Override
public CompletableFuture<List<HRegionLocation>> getRegionLocationsPage(byte[] startKey,
int limit) {
return tracedFuture(() -> {
if (TableName.isMetaTableName(tableName)) {
CompletableFuture<List<HRegionLocation>> failed = new CompletableFuture<>();
failed.completeExceptionally(
new IOException("getRegionLocationsPage(startKey, limit) is not supported for hbase:meta;"
+ " use getRegionLocation(EMPTY_START_ROW) instead."));
return failed;
}
int effectiveLimit = limit > 0
? limit
: conn.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING,
HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);
CompletableFuture<List<HRegionLocation>> future =
ClientMetaTableAccessor.getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME),
tableName, startKey, effectiveLimit);
addListener(future, (locs, error) -> {
if (error != null || locs == null) {
return;
}
for (HRegionLocation loc : locs) {
// the cache assumes that all locations have a serverName. only add if that's true
if (loc.getServerName() != null) {
conn.getLocator().getNonMetaRegionLocator().addLocationToCache(loc);
}
}
});
return future;
}, getClass().getSimpleName() + ".getRegionLocationsPage");
}

@Override
public void clearRegionLocationCache() {
conn.getLocator().clearCache(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ class ConnectionOverAsyncConnection implements Connection {
this.connConf = new ConnectionConfiguration(conn.getConfiguration());
}

AsyncConnectionImpl getAsyncConnection() {
return conn;
}

@Override
public void abort(String why, Throwable error) {
if (error != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Pair;
Expand Down Expand Up @@ -130,6 +131,53 @@ default List<HRegionLocation> getRegionLocations(byte[] row) throws IOException
*/
List<HRegionLocation> getAllRegionLocations() throws IOException;

/**
* Bulk lookup of region locations from {@code hbase:meta} in a single RPC, starting at
* {@code startKey} (region start-key boundary, inclusive) and returning at most {@code limit}
* regions in start-key order.
* <p/>
* The returned list includes all replicas of each region (matching
* {@link #getAllRegionLocations()}), and the result is also written to the connection's region
* location cache.
* <p/>
* Ordering: regions are returned in ascending region start-key order (the natural order of
* {@code hbase:meta} rows for a single table). Within each region, replicas are returned in
* ascending replica-id order (replica 0, then 1, then 2, ...). Split parents are filtered out,
* which may cause a page to contain fewer than {@code limit} regions but never disturbs ordering
* of the survivors.
* <p/>
* To page through all regions of a table, call repeatedly passing
* {@code last.getRegion().getEndKey()} as the next {@code startKey}, where {@code last} is the
* final element of the previous response. All replicas of a region share the same
* {@link RegionInfo}, so the last entry's end key is the correct cursor regardless of which
* replica it is. Pass {@code null} for the first call. Stop paging when the returned list is
* empty or when the last region's end key is {@link HConstants#EMPTY_END_ROW} (zero-length) -
* that signals the end of the table; passing it back in would re-scan from the beginning since by
* convention an empty start key means "from the first region".
* <p/>
* Unlike {@link #getAllRegionLocations()}, this method performs at most one RPC against
* {@code hbase:meta} per invocation, so its latency is bounded by {@code limit} rather than table
* size.
* <p/>
* This method is optional. Implementations that cannot support paginated lookups should throw
* {@link UnsupportedOperationException} (the default behavior); callers should fall back to
* {@link #getAllRegionLocations()} in that case.
* @param startKey region start-key to begin scanning from (inclusive); {@code null} or empty
* starts from the first region
* @param limit maximum number of regions to return; if &lt;= 0, falls back to
* {@code hbase.meta.scanner.caching}
* @return up to {@code limit} {@link HRegionLocation}s in start-key order, possibly empty when no
* more regions exist
* @throws IOException if a remote or network exception occurs
* @throws UnsupportedOperationException if this implementation does not support paginated lookups
*/
default List<HRegionLocation> getRegionLocationsPage(byte[] startKey, int limit)
throws IOException {
throw new UnsupportedOperationException(
"getRegionLocationsPage(byte[], int) is not supported by this RegionLocator;"
+ " fall back to getAllRegionLocations()");
}

/**
* Gets the starting row key for every region in the currently open table.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ public List<HRegionLocation> getAllRegionLocations() throws IOException {
return get(locator.getAllRegionLocations());
}

@Override
public List<HRegionLocation> getRegionLocationsPage(byte[] startKey, int limit)
throws IOException {
return get(locator.getRegionLocationsPage(startKey, limit));
}

@Override
public TableName getName() {
return locator.getName();
Expand Down
Loading