Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,11 @@ public enum Property {
"The number of threads used to run fault-tolerant executions (FATE)."
+ " These are primarily table operations like merge.",
"1.4.3"),
MANAGER_TSERVER_HALT_DURATION("manager.tservers.halt.grace.period", "0",
PropertyType.TIMEDURATION,
"Allows the manager to force tserver halting by setting the max duration of time spent attempting to halt a tserver "
+ " requests before deleting the tserver's zlock. A value of zero (default) disables this feature.",
"2.1.5"),
@Deprecated(since = "2.1.0")
MANAGER_REPLICATION_SCAN_INTERVAL("manager.replication.status.scan.interval", "30s",
PropertyType.TIMEDURATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Predicate;

import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.LockID;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
Expand Down Expand Up @@ -716,6 +719,35 @@ public long getSessionId() throws KeeperException, InterruptedException {
}
}

/**
* This method will delete multiple server locks for a given path according the predicate
* conditions.
*
* @param zk zookeeper instance
* @param zPath can be a path directly to a host or a general path like @{link
* org.apache.accumulo.core.Constants.ZTSERVERS} or a resource group
* @param hostPortPredicate conditional predicate for determining if the lock should be removed.
* @param messageOutput function for setting where the output from the lockPath goes
* @param dryRun allows lock format validation and the messageOutput to be sent without actually
* deleting the lock
*
*/
public static void deleteLocks(ZooReaderWriter zk, String zPath,
Predicate<HostAndPort> hostPortPredicate, Consumer<String> messageOutput, Boolean dryRun)
throws KeeperException, InterruptedException {
if (zk.exists(zPath)) {
List<String> children = zk.getChildren(zPath);
for (String child : children) {
if (hostPortPredicate.test(HostAndPort.fromString(child))) {
messageOutput.accept("Deleting " + zPath + "/" + child + " from zookeeper");
if (!dryRun) {
deleteLock(zk, path(child));
}
}
}
}
}

public static void deleteLock(ZooReaderWriter zk, ServiceLockPath path)
throws InterruptedException, KeeperException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ public void zap(SiteConfiguration siteConf, String... args) {
zoo.recursiveDelete(tserversPath + "/" + child, NodeMissingPolicy.SKIP);
}
} else {
removeLocks(zoo, tserversPath, hostPortPredicate, opts);
ServiceLock.deleteLocks(zoo, tserversPath, hostPortPredicate, m -> message(m, opts),
opts.dryRun);
}
} catch (KeeperException | InterruptedException e) {
log.error("Error deleting tserver locks", e);
Expand Down Expand Up @@ -269,7 +270,8 @@ static void removeGroupedLocks(ZooReaderWriter zoo, String path, Predicate<Strin
List<String> groups = zoo.getChildren(path);
for (String group : groups) {
if (groupPredicate.test(group)) {
removeLocks(zoo, path + "/" + group, hostPortPredicate, opts);
ServiceLock.deleteLocks(zoo, path + "/" + group, hostPortPredicate, m -> message(m, opts),
opts.dryRun);
}
}
}
Expand All @@ -278,19 +280,7 @@ static void removeGroupedLocks(ZooReaderWriter zoo, String path, Predicate<Strin
static void removeLocks(ZooReaderWriter zoo, String path,
Predicate<HostAndPort> hostPortPredicate, Opts opts)
throws KeeperException, InterruptedException {
if (zoo.exists(path)) {
List<String> children = zoo.getChildren(path);
for (String child : children) {
if (hostPortPredicate.test(HostAndPort.fromString(child))) {
message("Deleting " + path + "/" + child + " from zookeeper", opts);
if (!opts.dryRun) {
// TODO not sure this is the correct way to delete this lock.. the code was deleting
// locks in multiple different ways for diff servers types.
zoo.recursiveDelete(path + "/" + child, NodeMissingPolicy.SKIP);
}
}
}
}
ServiceLock.deleteLocks(zoo, path, hostPortPredicate, m -> message(m, opts), opts.dryRun);
}

static void removeSingletonLock(ZooReaderWriter zoo, String path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.manager.metrics.BalancerMetrics;
Expand Down Expand Up @@ -195,6 +196,8 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener,
final AuditedSecurityOperation security;
final Map<TServerInstance,AtomicInteger> badServers =
Collections.synchronizedMap(new HashMap<>());
final Map<TServerInstance,GracefulHaltTimer> tserverHaltRpcAttempts =
Collections.synchronizedMap(new HashMap<>());
final Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<>());
final Migrations migrations = new Migrations();
final EventCoordinator nextEvent = new EventCoordinator();
Expand Down Expand Up @@ -1141,6 +1144,30 @@ private List<TabletMigration> checkMigrationSanity(Set<TabletServerId> current,

}

/**
* This class tracks details about the haltRPCs used
*/
private static class GracefulHaltTimer {

Duration maxHaltGraceDuration;
Timer timer;

public GracefulHaltTimer(AccumuloConfiguration config) {
timer = null;
maxHaltGraceDuration =
Duration.ofMillis(config.getTimeInMillis(Property.MANAGER_TSERVER_HALT_DURATION));
}

public void startTimer() {
timer = Timer.startNew();
}

public boolean shouldForceHalt() {
return maxHaltGraceDuration.toMillis() != 0 && timer != null
&& timer.hasElapsed(maxHaltGraceDuration);
}
}

private SortedMap<TServerInstance,TabletServerStatus>
gatherTableInformation(Set<TServerInstance> currentServers) {
final long rpcTimeout = getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
Expand All @@ -1150,6 +1177,9 @@ private List<TabletMigration> checkMigrationSanity(Set<TabletServerId> current,
long start = System.currentTimeMillis();
final SortedMap<TServerInstance,TabletServerStatus> result = new ConcurrentSkipListMap<>();
final RateLimiter shutdownServerRateLimiter = RateLimiter.create(MAX_SHUTDOWNS_PER_SEC);
final int maxTserverRpcHaltAttempts =
getConfiguration().getCount(Property.MANAGER_TSERVER_HALT_DURATION);
final boolean forceHaltingEnabled = maxTserverRpcHaltAttempts != 0;
for (TServerInstance serverInstance : currentServers) {
final TServerInstance server = serverInstance;
if (threads == 0) {
Expand Down Expand Up @@ -1190,15 +1220,35 @@ private List<TabletMigration> checkMigrationSanity(Set<TabletServerId> current,
> MAX_BAD_STATUS_COUNT) {
if (shutdownServerRateLimiter.tryAcquire()) {
log.warn("attempting to stop {}", server);
try {
TServerConnection connection2 = tserverSet.getConnection(server);
if (connection2 != null) {
connection2.halt(managerLock);
var gracefulHaltTimer = tserverHaltRpcAttempts.computeIfAbsent(server,
s -> new GracefulHaltTimer(getConfiguration()));
if (gracefulHaltTimer.shouldForceHalt()) {
log.warn("tserver {} is not responding to halt requests, deleting zlock", server);
var zk = getContext().getZooReaderWriter();
var iid = getContext().getInstanceID();
String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS;
try {
ServiceLock.deleteLocks(zk, tserversPath, server.getHostAndPort()::equals,
log::info, false);
tserverHaltRpcAttempts.remove(server);
badServers.remove(server);
} catch (KeeperException | InterruptedException e) {
log.error("Failed to delete zlock for server {}", server, e);
}
} else {
try {
TServerConnection connection2 = tserverSet.getConnection(server);
if (connection2 != null) {
connection2.halt(managerLock);
}
} catch (TTransportException e1) {
// ignore: it's probably down so log the exception at trace
log.trace("error attempting to halt tablet server {}", server, e1);
} catch (Exception e2) {
log.info("error talking to troublesome tablet server {}", server, e2);
} finally {
gracefulHaltTimer.startTimer();
}
} catch (TTransportException e1) {
// ignore: it's probably down
} catch (Exception e2) {
log.info("error talking to troublesome tablet server", e2);
}
} else {
log.warn("Unable to shutdown {} as over the shutdown limit of {} per minute", server,
Expand All @@ -1225,6 +1275,12 @@ private List<TabletMigration> checkMigrationSanity(Set<TabletServerId> current,
badServers.keySet().retainAll(currentServers);
badServers.keySet().removeAll(info.keySet());
}

synchronized (tserverHaltRpcAttempts) {
tserverHaltRpcAttempts.keySet().retainAll(currentServers);
tserverHaltRpcAttempts.keySet().removeAll(info.keySet());
}

log.debug(String.format("Finished gathering information from %d of %d servers in %.2f seconds",
info.size(), currentServers.size(), (System.currentTimeMillis() - start) / 1000.));

Expand Down Expand Up @@ -1727,14 +1783,17 @@ public void update(LiveTServerSet current, Set<TServerInstance> deleted,
}
serversToShutdown.removeAll(deleted);
badServers.keySet().removeAll(deleted);
tserverHaltRpcAttempts.keySet().removeAll(deleted);
// clear out any bad server with the same host/port as a new server
synchronized (badServers) {
cleanListByHostAndPort(badServers.keySet(), deleted, added);
}
synchronized (serversToShutdown) {
cleanListByHostAndPort(serversToShutdown, deleted, added);
}

synchronized (tserverHaltRpcAttempts) {
cleanListByHostAndPort(tserverHaltRpcAttempts.keySet(), deleted, added);
}
migrations.removeServers(deleted);
nextEvent.event("There are now %d tablet servers", current.size());
}
Expand Down
Loading