Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ public void onMessage(final DatanodeDetails datanodeDetails,
final EventPublisher publisher) {

try {
NodeStatus currentStatus =
nodeManager.getNodeStatus(datanodeDetails);

if (currentStatus.getHealth() != HddsProtos.NodeState.DEAD) {
LOG.info("Skip event for dead node {} since the current " +
"state is {}", datanodeDetails, currentStatus.getHealth());
return;
}

/*
* We should have already destroyed all the pipelines on this datanode
Expand All @@ -92,7 +100,7 @@ public void onMessage(final DatanodeDetails datanodeDetails,
closeContainers(datanodeDetails, publisher);
destroyPipelines(datanodeDetails);

boolean isNodeInMaintenance = nodeManager.getNodeStatus(datanodeDetails).isInMaintenance();
boolean isNodeInMaintenance = currentStatus.isInMaintenance();

// Remove the container replicas associated with the dead node unless it
// is IN_MAINTENANCE
Expand All @@ -119,15 +127,26 @@ public void onMessage(final DatanodeDetails datanodeDetails,
deletedBlockLog.onDatanodeDead(datanodeDetails.getID());
}

//move dead datanode out of ClusterNetworkTopology
NetworkTopology nt = nodeManager.getClusterNetworkTopologyMap();
if (nt.contains(datanodeDetails)) {
nt.remove(datanodeDetails);
//make sure after DN is removed from topology,
//DatanodeDetails instance returned from nodeStateManager has no parent.
Preconditions.checkState(
nodeManager.getNode(datanodeDetails.getID())
.getParent() == null);
// Only remove from topology if the node is still DEAD. Between the time
// the DEAD_NODE event was fired and now, the node may have been
// resurrected (DEAD -> HEALTHY_READONLY) via a heartbeat. Removing a
// resurrected node from the topology would leave it reachable but
// invisible to the placement policy.
currentStatus = nodeManager.getNodeStatus(datanodeDetails);
if (currentStatus.getHealth() == HddsProtos.NodeState.DEAD) {
Comment on lines +130 to +136
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivandika3 I was thinking would it make sense to add an early check at the start of onMessage and return if the node is no longer DEAD? In the race where the node is resurrected before this handler runs, we’d still run removeContainerReplicas, REPLICATION_MANAGER_NOTIFY, deletedBlockLog.onDatanodeDead, etc, which may not be appropriate for a resurrected node.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I added another check at the start. Technically, we need to do each check before doing any of these actions, but seems to be overkill.

NetworkTopology nt = nodeManager.getClusterNetworkTopologyMap();
if (nt.contains(datanodeDetails)) {
nt.remove(datanodeDetails);
DatanodeDetails node = nodeManager.getNode(datanodeDetails.getID());
//make sure after DN is removed from topology,
//DatanodeDetails instance returned from nodeStateManager has no parent.
if (node != null) {
Preconditions.checkState(node.getParent() == null);
}
}
} else {
LOG.info("Skipping topology removal for dead node {} whose current " +
"state is {}", datanodeDetails, currentStatus.getHealth());
}
} catch (NodeNotFoundException ex) {
// This should not happen, we cannot get a dead node event for an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,17 @@ public void onMessage(DatanodeDetails datanodeDetails,
}
}

//add node back if it is not present in networkTopology
// Always ensure the node is in the topology. Using unconditional add
// rather than a contains-then-add check to avoid a race with
// DeadNodeHandler, which may remove the node between the check and
// the add. InnerNodeImpl.add() is idempotent for existing nodes.
NetworkTopology nt = nodeManager.getClusterNetworkTopologyMap();
if (!nt.contains(datanodeDetails)) {
nt.add(datanodeDetails);
nt.add(datanodeDetails);
DatanodeDetails node = nodeManager.getNode(datanodeDetails.getID());
if (node != null) {
// make sure after DN is added back into topology, DatanodeDetails
// instance returned from nodeStateManager has parent correctly set.
Objects.requireNonNull(
nodeManager.getNode(datanodeDetails.getID())
.getParent(), "Parent == null");
Objects.requireNonNull(node.getParent(), "Parent == null");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -33,7 +36,9 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
Expand Down Expand Up @@ -226,6 +231,7 @@ public void testOnMessage(@TempDir File tempDir) throws Exception {
nodeManager.getClusterNetworkTopologyMap().contains(datanode1));
nodeManager.setNodeOperationalState(datanode1,
HddsProtos.NodeOperationalState.IN_MAINTENANCE);
setNodeHealthState(datanode1, HddsProtos.NodeState.DEAD);
deadNodeHandler.onMessage(datanode1, publisher);
// make sure the node is removed from
// ClusterNetworkTopology when it is considered as dead
Expand Down Expand Up @@ -258,6 +264,7 @@ public void testOnMessage(@TempDir File tempDir) throws Exception {
nodeManager.addDatanodeCommand(datanode1.getID(), cmd);
nodeManager.setNodeOperationalState(datanode1,
HddsProtos.NodeOperationalState.IN_SERVICE);
setNodeHealthState(datanode1, HddsProtos.NodeState.DEAD);
deadNodeHandler.onMessage(datanode1, publisher);
//datanode1 has been removed from ClusterNetworkTopology, another
//deadNodeHandler.onMessage call will not change this
Expand Down Expand Up @@ -290,6 +297,182 @@ public void testOnMessage(@TempDir File tempDir) throws Exception {

}

/**
* Verifies that DeadNodeHandler skips topology removal when the node has
* already been resurrected (state changed from DEAD to HEALTHY_READONLY).
*/
@Test
public void testDeadNodeHandlerSkipsRemovalWhenNodeResurrected(
@TempDir File tempDir) throws Exception {
DatanodeDetails datanode = MockDatanodeDetails.randomDatanodeDetails();
String storagePath = tempDir.getPath()
.concat("/data-" + datanode.getID());
String metaStoragePath = tempDir.getPath()
.concat("/metadata-" + datanode.getID());
StorageReportProto storageReport = HddsTestUtils.createStorageReport(
datanode.getID(), storagePath, 100 * OzoneConsts.TB,
10 * OzoneConsts.TB, 90 * OzoneConsts.TB, null);
MetadataStorageReportProto metaStorageReport =
HddsTestUtils.createMetadataStorageReport(metaStoragePath,
100 * OzoneConsts.GB, 10 * OzoneConsts.GB,
90 * OzoneConsts.GB, null);
nodeManager.register(datanode,
HddsTestUtils.createNodeReport(Arrays.asList(storageReport),
Arrays.asList(metaStorageReport)), null);
datanode = nodeManager.getNode(datanode.getID());

assertTrue(
nodeManager.getClusterNetworkTopologyMap().contains(datanode));

// Simulate: DEAD_NODE event was fired, but before DeadNodeHandler
// processes it, the node heartbeated and was resurrected to
// HEALTHY_READONLY. The handler should see the current state and
// skip removal.
setNodeHealthState(datanode, HddsProtos.NodeState.HEALTHY_READONLY);
deadNodeHandler.onMessage(datanode, publisher);

assertTrue(
nodeManager.getClusterNetworkTopologyMap().contains(datanode),
"Node should remain in topology when it has been resurrected");
}

/**
* Verifies that HealthyReadOnlyNodeHandler re-adds a node to topology
* even if it was removed by a concurrent DeadNodeHandler.
*/
@Test
public void testHealthyReadOnlyHandlerAddsRemovedNode(
@TempDir File tempDir) throws Exception {
DatanodeDetails datanode = MockDatanodeDetails.randomDatanodeDetails();
String storagePath = tempDir.getPath()
.concat("/data-" + datanode.getID());
String metaStoragePath = tempDir.getPath()
.concat("/metadata-" + datanode.getID());
StorageReportProto storageReport = HddsTestUtils.createStorageReport(
datanode.getID(), storagePath, 100 * OzoneConsts.TB,
10 * OzoneConsts.TB, 90 * OzoneConsts.TB, null);
MetadataStorageReportProto metaStorageReport =
HddsTestUtils.createMetadataStorageReport(metaStoragePath,
100 * OzoneConsts.GB, 10 * OzoneConsts.GB,
90 * OzoneConsts.GB, null);
nodeManager.register(datanode,
HddsTestUtils.createNodeReport(Arrays.asList(storageReport),
Arrays.asList(metaStorageReport)), null);
datanode = nodeManager.getNode(datanode.getID());

// Manually remove node from topology to simulate DeadNodeHandler
// having run first.
nodeManager.getClusterNetworkTopologyMap().remove(datanode);
assertFalse(
nodeManager.getClusterNetworkTopologyMap().contains(datanode));

// HealthyReadOnlyNodeHandler should add it back unconditionally.
healthyReadOnlyNodeHandler.onMessage(datanode, publisher);
assertTrue(
nodeManager.getClusterNetworkTopologyMap().contains(datanode),
"Node should be re-added to topology by HealthyReadOnlyNodeHandler");
}

/**
* Reproduces the race condition between DeadNodeHandler and
* HealthyReadOnlyNodeHandler where interleaved execution could leave
* a resurrected node missing from the network topology.
*
* The interleaving being tested:
* 1. DeadNodeHandler starts processing (slow: closing containers, etc.)
* 2. Node is resurrected (DEAD -> HEALTHY_READONLY) via heartbeat
* 3. HealthyReadOnlyNodeHandler runs, sees node in topology, does not add
* 4. DeadNodeHandler finishes and removes node from topology
*
* With the fix: step 4 checks current state and skips removal.
*/
@Test
public void testDeadNodeAndHealthyReadOnlyRaceCondition(
@TempDir File tempDir) throws Exception {
DatanodeDetails datanode = MockDatanodeDetails.randomDatanodeDetails();
String storagePath = tempDir.getPath()
.concat("/data-" + datanode.getID());
String metaStoragePath = tempDir.getPath()
.concat("/metadata-" + datanode.getID());
StorageReportProto storageReport = HddsTestUtils.createStorageReport(
datanode.getID(), storagePath, 100 * OzoneConsts.TB,
10 * OzoneConsts.TB, 90 * OzoneConsts.TB, null);
MetadataStorageReportProto metaStorageReport =
HddsTestUtils.createMetadataStorageReport(metaStoragePath,
100 * OzoneConsts.GB, 10 * OzoneConsts.GB,
90 * OzoneConsts.GB, null);
nodeManager.register(datanode,
HddsTestUtils.createNodeReport(Arrays.asList(storageReport),
Arrays.asList(metaStorageReport)), null);
datanode = nodeManager.getNode(datanode.getID());

assertTrue(
nodeManager.getClusterNetworkTopologyMap().contains(datanode));

// Block DeadNodeHandler just before the topology removal by making
// deletedBlockLog.onDatanodeDead() pause. This call happens right
// before the topology check in the handler.
CountDownLatch deadHandlerBlocked = new CountDownLatch(1);
CountDownLatch proceedWithRemoval = new CountDownLatch(1);

DeletedBlockLog blockingDeletedBlockLog = mock(DeletedBlockLog.class);
doAnswer(invocation -> {
deadHandlerBlocked.countDown();
proceedWithRemoval.await();
return null;
}).when(blockingDeletedBlockLog).onDatanodeDead(any());

setNodeHealthState(datanode, HddsProtos.NodeState.DEAD);

DeadNodeHandler blockingDeadHandler = new DeadNodeHandler(nodeManager,
mock(PipelineManager.class), containerManager,
blockingDeletedBlockLog);

DatanodeDetails finalDatanode = datanode;
AtomicReference<Exception> threadException = new AtomicReference<>();
Thread deadHandlerThread = new Thread(() -> {
try {
blockingDeadHandler.onMessage(finalDatanode, publisher);
} catch (Exception e) {
threadException.set(e);
}
});
deadHandlerThread.start();

// Wait for DeadNodeHandler to be blocked mid-execution.
assertTrue(deadHandlerBlocked.await(10, TimeUnit.SECONDS),
"DeadNodeHandler should have started processing");

// Simulate resurrection: node transitions to HEALTHY_READONLY.
setNodeHealthState(datanode, HddsProtos.NodeState.HEALTHY_READONLY);

// HealthyReadOnlyNodeHandler runs while DeadNodeHandler is blocked.
healthyReadOnlyNodeHandler.onMessage(datanode, publisher);

// Release the DeadNodeHandler to finish.
proceedWithRemoval.countDown();
deadHandlerThread.join(10_000);

assertNull(threadException.get(),
"DeadNodeHandler should not throw");

// With the fix, the node should remain in topology because
// DeadNodeHandler sees the node is no longer DEAD and skips removal.
assertTrue(
nodeManager.getClusterNetworkTopologyMap().contains(datanode),
"Resurrected node must remain in topology after race between "
+ "DeadNodeHandler and HealthyReadOnlyNodeHandler");
}

private void setNodeHealthState(DatanodeDetails datanode,
HddsProtos.NodeState healthState) throws NodeNotFoundException {
DatanodeInfo dnInfo = nodeManager.getNodeStateManager()
.getNode(datanode);
NodeStatus current = dnInfo.getNodeStatus();
dnInfo.setNodeStatus(NodeStatus.valueOf(
current.getOperationalState(), healthState));
}

private void registerReplicas(ContainerManager contManager,
ContainerInfo container, DatanodeDetails... datanodes)
throws ContainerNotFoundException {
Expand Down