Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,47 @@ default Future<Void> modifyTableAsync(TableDescriptor td) throws IOException {
*/
Future<Void> modifyTableAsync(TableDescriptor td, boolean reopenRegions) throws IOException;

/**
* Reopen all regions of a table. This is useful after calling
* {@link #modifyTableAsync(TableDescriptor, boolean)} with reopenRegions=false to gradually roll
* out table descriptor changes to regions. Regions are reopened in-place (no move).
* @param tableName table whose regions to reopen
* @throws IOException if a remote or network exception occurs
*/
default void reopenTableRegions(TableName tableName) throws IOException {
get(reopenTableRegionsAsync(tableName), getSyncWaitTimeout(), TimeUnit.MILLISECONDS);
}

/**
* Reopen specific regions of a table. Useful for canary testing table descriptor changes on a
* subset of regions before rolling out to the entire table.
* @param tableName table whose regions to reopen
* @param regions specific regions to reopen
* @throws IOException if a remote or network exception occurs
*/
default void reopenTableRegions(TableName tableName, List<RegionInfo> regions)
throws IOException {
get(reopenTableRegionsAsync(tableName, regions), getSyncWaitTimeout(), TimeUnit.MILLISECONDS);
}

/**
* Asynchronously reopen all regions of a table.
* @param tableName table whose regions to reopen
* @return Future for tracking completion
* @throws IOException if a remote or network exception occurs
*/
Future<Void> reopenTableRegionsAsync(TableName tableName) throws IOException;

/**
* Asynchronously reopen specific regions of a table.
* @param tableName table whose regions to reopen
* @param regions specific regions to reopen
* @return Future for tracking completion
* @throws IOException if a remote or network exception occurs
*/
Future<Void> reopenTableRegionsAsync(TableName tableName, List<RegionInfo> regions)
throws IOException;

/**
* Change the store file tracker of the given table.
* @param tableName the table you want to change
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,17 @@ public Future<Void> modifyTableAsync(TableDescriptor td, boolean reopenRegions)
return admin.modifyTable(td, reopenRegions);
}

@Override
public Future<Void> reopenTableRegionsAsync(TableName tableName) throws IOException {
return admin.reopenTableRegions(tableName).toCompletableFuture();
}

@Override
public Future<Void> reopenTableRegionsAsync(TableName tableName, List<RegionInfo> regions)
throws IOException {
return admin.reopenTableRegions(tableName, regions).toCompletableFuture();
}

@Override
public Future<Void> modifyTableStoreFileTrackerAsync(TableName tableName, String dstSFT)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,24 @@ default CompletableFuture<Void> modifyTable(TableDescriptor desc) {
*/
CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenRegions);

/**
* Reopen all regions of a table. This is useful after calling
* {@link #modifyTable(TableDescriptor, boolean)} with reopenRegions=false to gradually roll out
* table descriptor changes to regions. Regions are reopened in-place (no move).
* @param tableName table whose regions to reopen
* @return CompletableFuture that completes when all regions have been reopened
*/
CompletableFuture<Void> reopenTableRegions(TableName tableName);

/**
* Reopen specific regions of a table. Useful for canary testing table descriptor changes on a
* subset of regions before rolling out to the entire table.
* @param tableName table whose regions to reopen
* @param regions specific regions to reopen
* @return CompletableFuture that completes when specified regions have been reopened
*/
CompletableFuture<Void> reopenTableRegions(TableName tableName, List<RegionInfo> regions);

/**
* Change the store file tracker of the given table.
* @param tableName the table you want to change
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,16 @@ public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenR
return wrap(rawAdmin.modifyTable(desc, reopenRegions));
}

@Override
public CompletableFuture<Void> reopenTableRegions(TableName tableName) {
return wrap(rawAdmin.reopenTableRegions(tableName));
}

@Override
public CompletableFuture<Void> reopenTableRegions(TableName tableName, List<RegionInfo> regions) {
return wrap(rawAdmin.reopenTableRegions(tableName, regions));
}

@Override
public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) {
return wrap(rawAdmin.modifyTableStoreFileTracker(tableName, dstSFT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ReopenTableRegionsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ReopenTableRegionsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest;
Expand Down Expand Up @@ -754,6 +756,21 @@ public CompletableFuture<Void> modifyTable(TableDescriptor desc, boolean reopenR
new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
}

@Override
public CompletableFuture<Void> reopenTableRegions(TableName tableName) {
return reopenTableRegions(tableName, Collections.emptyList());
}

@Override
public CompletableFuture<Void> reopenTableRegions(TableName tableName, List<RegionInfo> regions) {
List<byte[]> regionNames = regions.stream().map(RegionInfo::getRegionName).toList();
return this.<ReopenTableRegionsRequest, ReopenTableRegionsResponse> procedureCall(tableName,
RequestConverter.buildReopenTableRegionsRequest(tableName, regionNames, ng.getNonceGroup(),
ng.newNonce()),
(s, c, req, done) -> s.reopenTableRegions(c, req, done), (resp) -> resp.getProcId(),
new ReopenTableRegionsProcedureBiConsumer(this, tableName));
}

@Override
public CompletableFuture<Void> modifyTableStoreFileTracker(TableName tableName, String dstSFT) {
return this.<ModifyTableStoreFileTrackerRequest,
Expand Down Expand Up @@ -2833,6 +2850,18 @@ String getOperationType() {
}
}

private static class ReopenTableRegionsProcedureBiConsumer extends TableProcedureBiConsumer {

ReopenTableRegionsProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
super(tableName);
}

@Override
String getOperationType() {
return "REOPEN_TABLE_REGIONS";
}
}

private static class ModifyTableStoreFileTrackerProcedureBiConsumer
extends TableProcedureBiConsumer {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RegionSpecifierAndState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ReopenTableRegionsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
Expand Down Expand Up @@ -1128,6 +1129,31 @@ public static ModifyTableRequest buildModifyTableRequest(final TableName tableNa
return builder.build();
}

/**
* Creates a protocol buffer ReopenTableRegionsRequest
* @param tableName table whose regions to reopen
* @param regionNames specific regions to reopen (empty = all regions)
* @param nonceGroup nonce group
* @param nonce nonce
* @return a ReopenTableRegionsRequest
*/
public static ReopenTableRegionsRequest buildReopenTableRegionsRequest(final TableName tableName,
final List<byte[]> regionNames, final long nonceGroup, final long nonce) {
ReopenTableRegionsRequest.Builder builder = ReopenTableRegionsRequest.newBuilder();
builder.setTableName(ProtobufUtil.toProtoTableName(tableName));

if (regionNames != null && !regionNames.isEmpty()) {
for (byte[] regionName : regionNames) {
builder.addRegionNames(UnsafeByteOperations.unsafeWrap(regionName));
}
}

builder.setNonceGroup(nonceGroup);
builder.setNonce(nonce);

return builder.build();
}

public static ModifyTableStoreFileTrackerRequest buildModifyTableStoreFileTrackerRequest(
final TableName tableName, final String dstSFT, final long nonceGroup, final long nonce) {
ModifyTableStoreFileTrackerRequest.Builder builder =
Expand Down
18 changes: 18 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,17 @@ message ModifyTableResponse {
optional uint64 proc_id = 1;
}

message ReopenTableRegionsRequest {
required TableName table_name = 1;
repeated bytes region_names = 2; // empty = all regions
optional uint64 nonce_group = 3 [default = 0];
optional uint64 nonce = 4 [default = 0];
}

message ReopenTableRegionsResponse {
optional uint64 proc_id = 1;
}

message FlushTableRequest {
required TableName table_name = 1;
repeated bytes column_family = 2;
Expand Down Expand Up @@ -910,6 +921,13 @@ service MasterService {
rpc ModifyTable(ModifyTableRequest)
returns(ModifyTableResponse);

/**
* Reopen regions of a table. Regions are reopened in-place without moving.
* Useful for rolling out table descriptor changes after modifyTable(reopenRegions=false).
*/
rpc ReopenTableRegions(ReopenTableRegionsRequest)
returns(ReopenTableRegionsResponse);

/** Creates a new table asynchronously */
rpc CreateTable(CreateTableRequest)
returns(CreateTableResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4251,6 +4251,54 @@ protected String getDescription() {

}

/**
* Reopen regions provided in the argument. Applies throttling to the procedure to avoid
* overwhelming the system. This is used by the reopenTableRegions methods in the Admin API via
* HMaster.
* @param tableName The current table name
* @param regionNames The region names of the regions to reopen
* @param nonceGroup Identifier for the source of the request, a client or process
* @param nonce A unique identifier for this operation from the client or process identified
* by <code>nonceGroup</code> (the source must ensure each operation gets a
* unique id).
* @return procedure Id
* @throws IOException if reopening region fails while running procedure
*/
long reopenRegionsThrottled(final TableName tableName, final List<byte[]> regionNames,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throttling is a net new feature with this change? I'm surprised it's not introduced as a separate feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throttling has already been implemented in the ReopenTableRegionsProcedure, but for some reason the existing method on HMaster doesn't use it. I didn't want to change the existing implementation so I exposed a new method to allow users to choose - and used it for these changes too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please file a ticket to report the other flow not making use of this method? We should centralize the implementations if we can.

final long nonceGroup, final long nonce) throws IOException {

checkInitialized();

if (!tableStateManager.isTablePresent(tableName)) {
throw new TableNotFoundException(tableName);
}

return MasterProcedureUtil
.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
@Override
protected void run() throws IOException {
ReopenTableRegionsProcedure proc;
if (regionNames.isEmpty()) {
proc = ReopenTableRegionsProcedure.throttled(getConfiguration(),
getTableDescriptors().get(tableName));
} else {
proc = ReopenTableRegionsProcedure.throttled(getConfiguration(),
getTableDescriptors().get(tableName), regionNames);
}

LOG.info("{} throttled reopening {} regions for table {}", getClientIdAuditPrefix(),
regionNames.isEmpty() ? "all" : regionNames.size(), tableName);

submitProcedure(proc);
}

@Override
protected String getDescription() {
return "Throttled ReopenTableRegionsProcedure for " + tableName;
}
});
}

@Override
public ReplicationPeerManager getReplicationPeerManager() {
return replicationPeerManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RegionSpecifierAndState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ReopenTableRegionsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ReopenTableRegionsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest;
Expand Down Expand Up @@ -1554,6 +1556,29 @@ public ModifyTableResponse modifyTable(RpcController controller, ModifyTableRequ
}
}

@Override
public ReopenTableRegionsResponse reopenTableRegions(RpcController controller,
ReopenTableRegionsRequest request) throws ServiceException {
try {
server.checkInitialized();

final TableName tableName = ProtobufUtil.toTableName(request.getTableName());
final List<byte[]> regionNames = request.getRegionNamesList().stream()
.map(ByteString::toByteArray).collect(Collectors.toList());

LOG.info("Reopening regions for table={}, regionCount={}", tableName,
regionNames.isEmpty() ? "all" : regionNames.size());

long procId = server.reopenRegionsThrottled(tableName, regionNames, request.getNonceGroup(),
request.getNonce());

return ReopenTableRegionsResponse.newBuilder().setProcId(procId).build();

} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}

@Override
public ModifyTableStoreFileTrackerResponse modifyTableStoreFileTracker(RpcController controller,
ModifyTableStoreFileTrackerRequest req) throws ServiceException {
Expand Down
Loading