Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public static String[] getIPs(String strInterface,


/**
* Returns the first available IP address associated with the provided
* Returns the first available IP address with a resolvable hostname associated with the provided
* network interface or the local host IP if "default" is given.
*
* @param strInterface The name of the network interface or subinterface to query
Expand All @@ -214,7 +214,24 @@ public static String[] getIPs(String strInterface,
public static String getDefaultIP(String strInterface)
throws UnknownHostException {
String[] ips = getIPs(strInterface);
return ips[0];
UnknownHostException lastException = null;
for (String ip : ips) {
String resolved = null;
try {
resolved = InetAddress.getByName(ip).getHostName();
} catch (UnknownHostException e) {
// skip ip addresses that cannot be resolved to a hostname
lastException = e;
continue;
}
if (resolved.equals(ip)) {
lastException = new UnknownHostException(
"IP address " + ip + " for " + strInterface + " interface cannot be resolved to a hostname");
continue;
}
return ip;
}
throw lastException;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BKException;
Expand Down Expand Up @@ -76,11 +78,23 @@ public class Auditor implements AutoCloseable {
private LedgerManager ledgerManager;
private LedgerUnderreplicationManager ledgerUnderreplicationManager;
private final ScheduledExecutorService executor;
private List<String> knownBookies = new ArrayList<String>();
// volatile so the executor thread always sees the reference set by start()
private volatile List<String> knownBookies = new ArrayList<String>();
private final String bookieIdentifier;
protected volatile Future<?> auditTask;
private final Set<String> bookiesToBeAudited = Sets.newHashSet();
private volatile int lostBookieRecoveryDelayBeforeChange;
// Latest bookie sets received from watcher callbacks (null until the first watcher fires).
// Written by ZK watcher callback threads, read by the executor thread.
private final AtomicReference<Set<String>> pendingWritableBookies = new AtomicReference<>();
private final AtomicReference<Set<String>> pendingReadOnlyBookies = new AtomicReference<>();
// Counts tasks (running + queued) per change type. Incremented when a task is
// submitted, decremented when it finishes. Capped at 2 (one in-progress + one
// queued): a queued task always reads the latest pending bookie set when it
// starts, so there is no value in queuing more than one additional task.
private static final int MAX_AUDIT_TASKS_PER_TYPE = 2;
private final AtomicInteger writableAuditTaskCount = new AtomicInteger(0);
private final AtomicInteger readOnlyAuditTaskCount = new AtomicInteger(0);
protected AuditorBookieCheckTask auditorBookieCheckTask;
protected AuditorTask auditorCheckAllLedgersTask;
protected AuditorTask auditorPlacementPolicyCheckTask;
Expand Down Expand Up @@ -237,85 +251,164 @@ private void submitShutdownTask() {
}
}

/**
* Submit an audit task triggered by a bookie change notification from the watcher.
*
* <p>At most {@value #MAX_AUDIT_TASKS_PER_TYPE} tasks (one in-progress + one
* queued) are allowed per change type at any time. The counter is incremented on
* submission and decremented when the task finishes, so the limit is enforced
* across the full task lifetime. A queued task always reads the latest pending
* bookie set when it starts, so queuing more than one additional task would only
* duplicate work without improving correctness.
*/
private synchronized void submitAuditTaskForBookieChange(boolean writableChange) {
if (executor.isShutdown()) {
return;
}
AtomicInteger count = writableChange ? writableAuditTaskCount : readOnlyAuditTaskCount;
if (count.get() >= MAX_AUDIT_TASKS_PER_TYPE) {
// One task is already running and one is queued. The queued task will
// pick up the latest pendingWritableBookies / pendingReadOnlyBookies
// when it executes, so no additional task is needed.
return;
}
count.incrementAndGet();
executor.submit(() -> {
try {
runAuditTask();
} finally {
count.decrementAndGet();
}
});
}

/**
* Submit a full bookie-check audit task unconditionally. Used by tests and by the
* LostBookieRecoveryDelay-changed event handler.
*
* <p>Runs the full {@code auditBookies()} scan (which uses ledger metadata) rather
* than the lightweight {@link #runAuditTask()}, so that bookies which registered
* and died since the auditor started are correctly detected even if they were never
* added to {@code knownBookies}.
*/
@VisibleForTesting
synchronized Future<?> submitAuditTask() {
if (executor.isShutdown()) {
SettableFuture<Void> f = SettableFuture.<Void>create();
f.setException(new BKAuditException("Auditor shutting down"));
return f;
}
return executor.submit(() -> {
try {
waitIfLedgerReplicationDisabled();
int lostBookieRecoveryDelay = Auditor.this.ledgerUnderreplicationManager
.getLostBookieRecoveryDelay();
List<String> availableBookies = getAvailableBookies();

// casting to String, as knownBookies and availableBookies
// contains only String values
// find new bookies(if any) and update the known bookie list
Collection<String> newBookies = CollectionUtils.subtract(
availableBookies, knownBookies);
knownBookies.addAll(newBookies);
if (!bookiesToBeAudited.isEmpty() && knownBookies.containsAll(bookiesToBeAudited)) {
// the bookie, which went down earlier and had an audit scheduled for,
// has come up. So let us stop tracking it and cancel the audit. Since
// we allow delaying of audit when there is only one failed bookie,
// bookiesToBeAudited should just have 1 element and hence containsAll
// check should be ok
if (auditTask != null && auditTask.cancel(false)) {
auditTask = null;
auditorStats.getNumDelayedBookieAuditsCancelled().inc();
}
bookiesToBeAudited.clear();
return executor.submit(() -> auditorBookieCheckTask.startAudit(false));
}

/**
* Core audit logic: determine which bookies have disappeared and trigger
* re-replication if needed. Runs on the single-threaded {@link #executor}.
*
* <p>Uses the bookie sets most recently received from the ZK watcher callbacks
* ({@link #pendingWritableBookies} / {@link #pendingReadOnlyBookies}) when
* both are available, avoiding a redundant ZK round-trip and the race where a
* fresh {@code getAvailableBookies()} call could see a different state than the
* event that triggered this task.
*/
private void runAuditTask() {
try {
waitIfLedgerReplicationDisabled();
int lostBookieRecoveryDelay = Auditor.this.ledgerUnderreplicationManager
.getLostBookieRecoveryDelay();

// Use the bookie sets captured synchronously by the watcher callbacks.
// If only one type has fired so far, fetch only the other type from the
// admin API to avoid a redundant ZK round-trip for the known half.
// Fall back to a full getAvailableBookies() only when neither watcher
// has fired yet (e.g. direct test invocations via submitAuditTask()).
Set<String> writableSnapshot = pendingWritableBookies.get();
Set<String> readOnlySnapshot = pendingReadOnlyBookies.get();
List<String> availableBookies;
if (writableSnapshot != null && readOnlySnapshot != null) {
availableBookies = new ArrayList<>(writableSnapshot);
availableBookies.addAll(readOnlySnapshot);
} else if (writableSnapshot != null) {
// Readonly watcher hasn't fired yet; fetch only readonly bookies.
availableBookies = new ArrayList<>(writableSnapshot);
for (BookieId id : admin.getReadOnlyBookies()) {
availableBookies.add(id.toString());
}
} else if (readOnlySnapshot != null) {
// Writable watcher hasn't fired yet; fetch only writable bookies.
availableBookies = new ArrayList<>();
for (BookieId id : admin.getAvailableBookies()) {
availableBookies.add(id.toString());
}
availableBookies.addAll(readOnlySnapshot);
} else {
availableBookies = getAvailableBookies();
}

// find lost bookies(if any)
bookiesToBeAudited.addAll(CollectionUtils.subtract(knownBookies, availableBookies));
if (bookiesToBeAudited.size() == 0) {
return;
// casting to String, as knownBookies and availableBookies
// contains only String values
// find new bookies(if any) and update the known bookie list
Collection<String> newBookies = CollectionUtils.subtract(
availableBookies, knownBookies);
knownBookies.addAll(newBookies);
if (!bookiesToBeAudited.isEmpty() && knownBookies.containsAll(bookiesToBeAudited)) {
// the bookie, which went down earlier and had an audit scheduled for,
// has come up. So let us stop tracking it and cancel the audit. Since
// we allow delaying of audit when there is only one failed bookie,
// bookiesToBeAudited should just have 1 element and hence containsAll
// check should be ok
if (auditTask != null && auditTask.cancel(false)) {
auditTask = null;
auditorStats.getNumDelayedBookieAuditsCancelled().inc();
}
bookiesToBeAudited.clear();
}

knownBookies.removeAll(bookiesToBeAudited);
if (lostBookieRecoveryDelay == 0) {
auditorBookieCheckTask.startAudit(false);
bookiesToBeAudited.clear();
return;
// find lost bookies(if any)
bookiesToBeAudited.addAll(CollectionUtils.subtract(knownBookies, availableBookies));
if (bookiesToBeAudited.size() == 0) {
return;
}

knownBookies.removeAll(bookiesToBeAudited);
if (lostBookieRecoveryDelay == 0) {
auditorBookieCheckTask.startAudit(false);
bookiesToBeAudited.clear();
return;
}
if (bookiesToBeAudited.size() > 1) {
// if more than one bookie is down, start the audit immediately;
LOG.info("Multiple bookie failure; not delaying bookie audit. "
+ "Bookies lost now: {}; All lost bookies: {}",
CollectionUtils.subtract(knownBookies, availableBookies),
bookiesToBeAudited);
if (auditTask != null && auditTask.cancel(false)) {
auditTask = null;
auditorStats.getNumDelayedBookieAuditsCancelled().inc();
}
if (bookiesToBeAudited.size() > 1) {
// if more than one bookie is down, start the audit immediately;
LOG.info("Multiple bookie failure; not delaying bookie audit. "
+ "Bookies lost now: {}; All lost bookies: {}",
CollectionUtils.subtract(knownBookies, availableBookies),
bookiesToBeAudited);
if (auditTask != null && auditTask.cancel(false)) {
auditTask = null;
auditorStats.getNumDelayedBookieAuditsCancelled().inc();
}
auditorBookieCheckTask.startAudit(false);
bookiesToBeAudited.clear();
return;
}
if (auditTask == null) {
// if there is no scheduled audit, schedule one
auditTask = executor.schedule(() -> {
auditorBookieCheckTask.startAudit(false);
auditTask = null;
bookiesToBeAudited.clear();
return;
}
if (auditTask == null) {
// if there is no scheduled audit, schedule one
auditTask = executor.schedule(() -> {
auditorBookieCheckTask.startAudit(false);
auditTask = null;
bookiesToBeAudited.clear();
}, lostBookieRecoveryDelay, TimeUnit.SECONDS);
auditorStats.getNumBookieAuditsDelayed().inc();
LOG.info("Delaying bookie audit by {} secs for {}", lostBookieRecoveryDelay,
bookiesToBeAudited);
}
} catch (BKException bke) {
LOG.error("Exception getting bookie list", bke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while watching available bookies ", ie);
} catch (UnavailableException ue) {
LOG.error("Exception while watching available bookies", ue);
}, lostBookieRecoveryDelay, TimeUnit.SECONDS);
auditorStats.getNumBookieAuditsDelayed().inc();
LOG.info("Delaying bookie audit by {} secs for {}", lostBookieRecoveryDelay,
bookiesToBeAudited);
}
});
} catch (BKException bke) {
LOG.error("Exception getting bookie list", bke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while watching available bookies ", ie);
} catch (UnavailableException ue) {
LOG.error("Exception while watching available bookies", ue);
}
}

synchronized Future<?> submitLostBookieRecoveryDelayChangedEvent() {
Expand Down Expand Up @@ -386,13 +479,12 @@ public void start() {
}

try {
watchBookieChanges();
// Start with all available bookies
// to handle situations where the auditor
// is started after some bookies have already failed
// Initialize knownBookies before registering watchers so that any
// watcher callback that fires immediately sees a consistent baseline.
knownBookies = admin.getAllBookies().stream()
.map(BookieId::toString)
.collect(Collectors.toList());
watchBookieChanges();
this.ledgerUnderreplicationManager
.notifyLostBookieRecoveryDelayChanged(new LostBookieRecoveryDelayChangedCb());
} catch (BKException bke) {
Expand Down Expand Up @@ -598,8 +690,18 @@ protected List<String> getAvailableBookies() throws BKException {
}

private void watchBookieChanges() throws BKException {
admin.watchWritableBookiesChanged(bookies -> submitAuditTask());
admin.watchReadOnlyBookiesChanged(bookies -> submitAuditTask());
admin.watchWritableBookiesChanged(bookies -> {
pendingWritableBookies.set(bookies.getValue().stream()
.map(BookieId::toString)
.collect(Collectors.toSet()));
submitAuditTaskForBookieChange(true);
});
admin.watchReadOnlyBookiesChanged(bookies -> {
pendingReadOnlyBookies.set(bookies.getValue().stream()
.map(BookieId::toString)
.collect(Collectors.toSet()));
submitAuditTaskForBookieChange(false);
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@

import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.LinkedHashSet;
import java.util.Set;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.net.DNS;
import org.apache.bookkeeper.util.PortManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -69,12 +73,27 @@ public static ServerConfiguration newServerConfiguration() {

private static String getLoopbackInterfaceName() {
try {
Set<String> loopbackInterfaces = new LinkedHashSet<>();
Enumeration<NetworkInterface> nifs = NetworkInterface.getNetworkInterfaces();
for (NetworkInterface nif : Collections.list(nifs)) {
if (nif.isLoopback()) {
return nif.getName();
String nifName = nif.getName();
try {
DNS.getDefaultIP(nifName);
} catch (UnknownHostException e) {
// skip interfaces that don't have a resolvable hostname
continue;
}
loopbackInterfaces.add(nifName);
}
}
// prefer lo if available to avoid issues on Linux
if (loopbackInterfaces.contains("lo")) {
return "lo";
}
if (!loopbackInterfaces.isEmpty()) {
return loopbackInterfaces.iterator().next();
}
} catch (SocketException se) {
LOG.warn("Exception while figuring out loopback interface. Will use null.", se);
return null;
Expand Down
Loading