Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Predicate;

import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.LockID;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
Expand Down Expand Up @@ -716,6 +719,32 @@ public long getSessionId() throws KeeperException, InterruptedException {
}
}

/**
* This method will delete multiple server locks for a given path according the predicate
* conditions.
*
* @param hostPortPredicate conditional predicate for determining if the lock should be removed.
* @param messageOutput function for setting where the output from the lockPath goes
* @param dryRun allows lock format validation and the messageOutput to be sent without actually
* deleting the lock
*
*/
public static void deleteLocks(ZooReaderWriter zk, String zPath,
Predicate<HostAndPort> hostPortPredicate, Consumer<String> messageOutput, Boolean dryRun)
throws KeeperException, InterruptedException {
if (zk.exists(zPath)) {
List<String> children = zk.getChildren(zPath);
for (String child : children) {
if (hostPortPredicate.test(HostAndPort.fromString(child))) {
messageOutput.accept("Deleting " + zPath + "/" + child + " from zookeeper");
if (!dryRun) {
deleteLock(zk, path(child));
}
}
}
}
}

public static void deleteLock(ZooReaderWriter zk, ServiceLockPath path)
throws InterruptedException, KeeperException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ public void zap(SiteConfiguration siteConf, String... args) {
zoo.recursiveDelete(tserversPath + "/" + child, NodeMissingPolicy.SKIP);
}
} else {
removeLocks(zoo, tserversPath, hostPortPredicate, opts);
ServiceLock.deleteLocks(zoo, tserversPath, hostPortPredicate, m -> message(m, opts),
opts.dryRun);
}
} catch (KeeperException | InterruptedException e) {
log.error("Error deleting tserver locks", e);
Expand Down Expand Up @@ -269,7 +270,8 @@ static void removeGroupedLocks(ZooReaderWriter zoo, String path, Predicate<Strin
List<String> groups = zoo.getChildren(path);
for (String group : groups) {
if (groupPredicate.test(group)) {
removeLocks(zoo, path + "/" + group, hostPortPredicate, opts);
ServiceLock.deleteLocks(zoo, path + "/" + group, hostPortPredicate, m -> message(m, opts),
opts.dryRun);
}
}
}
Expand All @@ -278,19 +280,7 @@ static void removeGroupedLocks(ZooReaderWriter zoo, String path, Predicate<Strin
static void removeLocks(ZooReaderWriter zoo, String path,
Predicate<HostAndPort> hostPortPredicate, Opts opts)
throws KeeperException, InterruptedException {
if (zoo.exists(path)) {
List<String> children = zoo.getChildren(path);
for (String child : children) {
if (hostPortPredicate.test(HostAndPort.fromString(child))) {
message("Deleting " + path + "/" + child + " from zookeeper", opts);
if (!opts.dryRun) {
// TODO not sure this is the correct way to delete this lock.. the code was deleting
// locks in multiple different ways for diff servers types.
zoo.recursiveDelete(path + "/" + child, NodeMissingPolicy.SKIP);
}
}
}
}
ServiceLock.deleteLocks(zoo, path, hostPortPredicate, m -> message(m, opts), opts.dryRun);
}

static void removeSingletonLock(ZooReaderWriter zoo, String path,
Expand Down