Skip to content

Commit 312bd83

Browse files
committed
<fix>[mn]: synchronize hash ring operations to prevent dual-MN task stalling
In dual management node scenarios, concurrent modifications to the consistent hash ring from heartbeat reconciliation and canonical event callbacks can cause NodeHash/Nodes inconsistency, leading to message routing failures and task timeouts. Fix: (1) synchronized all ResourceDestinationMakerImpl methods to ensure atomic nodeHash+nodes updates, (2) added lifecycleLock in ManagementNodeManagerImpl to serialize heartbeat reconciliation with event callbacks, (3) added two-round delayed confirmation before removing nodes from hash ring to avoid race with NodeJoin events. Resolves: ZSTAC-77711 Change-Id: I3d33d53595dd302784dff17417a5b25f2d0f3426
1 parent 1d41921 commit 312bd83

2 files changed

Lines changed: 82 additions & 47 deletions

File tree

core/src/main/java/org/zstack/core/cloudbus/ResourceDestinationMakerImpl.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,27 +27,27 @@ public class ResourceDestinationMakerImpl implements ManagementNodeChangeListene
2727
private DatabaseFacade dbf;
2828

2929
@Override
30-
public void nodeJoin(ManagementNodeInventory inv) {
30+
public synchronized void nodeJoin(ManagementNodeInventory inv) {
3131
nodeHash.add(inv.getUuid());
3232
nodes.put(inv.getUuid(), new NodeInfo(inv));
3333
}
3434

3535
@Override
36-
public void nodeLeft(ManagementNodeInventory inv) {
36+
public synchronized void nodeLeft(ManagementNodeInventory inv) {
3737
String nodeId = inv.getUuid();
3838
nodeHash.remove(nodeId);
3939
nodes.remove(nodeId);
4040
}
4141

4242
@Override
43-
public void iAmDead(ManagementNodeInventory inv) {
43+
public synchronized void iAmDead(ManagementNodeInventory inv) {
4444
String nodeId = inv.getUuid();
4545
nodeHash.remove(nodeId);
4646
nodes.remove(nodeId);
4747
}
4848

4949
@Override
50-
public void iJoin(ManagementNodeInventory inv) {
50+
public synchronized void iJoin(ManagementNodeInventory inv) {
5151
List<ManagementNodeVO> lst = Q.New(ManagementNodeVO.class).list();
5252
lst.forEach((ManagementNodeVO node) -> {
5353
nodeHash.add(node.getUuid());
@@ -56,7 +56,7 @@ public void iJoin(ManagementNodeInventory inv) {
5656
}
5757

5858
@Override
59-
public String makeDestination(String resourceUuid) {
59+
public synchronized String makeDestination(String resourceUuid) {
6060
String nodeUuid = nodeHash.get(resourceUuid);
6161
if (nodeUuid == null) {
6262
throw new CloudRuntimeException("Cannot find any available management node to send message");
@@ -66,18 +66,18 @@ public String makeDestination(String resourceUuid) {
6666
}
6767

6868
@Override
69-
public boolean isManagedByUs(String resourceUuid) {
69+
public synchronized boolean isManagedByUs(String resourceUuid) {
7070
String nodeUuid = makeDestination(resourceUuid);
7171
return nodeUuid.equals(Platform.getManagementServerId());
7272
}
7373

7474
@Override
75-
public Collection<String> getManagementNodesInHashRing() {
76-
return nodeHash.getNodes();
75+
public synchronized Collection<String> getManagementNodesInHashRing() {
76+
return new ArrayList<>(nodeHash.getNodes());
7777
}
7878

7979
@Override
80-
public NodeInfo getNodeInfo(String nodeUuid) {
80+
public synchronized NodeInfo getNodeInfo(String nodeUuid) {
8181
NodeInfo info = nodes.get(nodeUuid);
8282
if (info == null) {
8383
ManagementNodeVO vo = dbf.findByUuid(nodeUuid, ManagementNodeVO.class);
@@ -93,17 +93,17 @@ public NodeInfo getNodeInfo(String nodeUuid) {
9393
}
9494

9595
@Override
96-
public Collection<NodeInfo> getAllNodeInfo() {
97-
return nodes.values();
96+
public synchronized Collection<NodeInfo> getAllNodeInfo() {
97+
return new ArrayList<>(nodes.values());
9898
}
9999

100100
@Override
101-
public int getManagementNodeCount() {
102-
return nodes.values().size();
101+
public synchronized int getManagementNodeCount() {
102+
return nodes.size();
103103
}
104104

105105

106-
public boolean isNodeInCircle(String nodeId) {
106+
public synchronized boolean isNodeInCircle(String nodeId) {
107107
return nodeHash.hasNode(nodeId);
108108
}
109109
}

portal/src/main/java/org/zstack/portal/managementnode/ManagementNodeManagerImpl.java

Lines changed: 68 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import java.sql.SQLException;
7575
import java.sql.Timestamp;
7676
import java.util.ArrayList;
77+
import java.util.HashSet;
7778
import java.util.List;
7879
import java.util.Map;
7980
import java.util.Set;
@@ -107,6 +108,15 @@ public class ManagementNodeManagerImpl extends AbstractService implements Manage
107108
// A dictionary (nodeId -> ManagementNodeInventory) of joined management Node
108109
final private Map<String, ManagementNodeInventory> joinedManagementNodes = new ConcurrentHashMap<>();
109110

111+
// Lock to serialize lifecycle events from heartbeat reconciliation and canonical event callbacks,
112+
// preventing race conditions where a nodeJoin event is immediately followed by a stale nodeLeft
113+
// from the heartbeat thread, or vice versa. See ZSTAC-77711.
114+
private final Object lifecycleLock = new Object();
115+
116+
// Track nodes found in hash ring but missing from DB. Only call nodeLeft after a node
117+
// is missing for two consecutive heartbeat cycles, to avoid removing nodes that just joined.
118+
private final Set<String> suspectedMissingFromDb = new HashSet<>();
119+
110120
private static int NODE_STARTING = 0;
111121
private static int NODE_RUNNING = 1;
112122
private static int NODE_FAILED = -1;
@@ -368,12 +378,16 @@ protected void run(Map tokens, Object data) {
368378

369379
ManagementNodeLifeCycleData d = (ManagementNodeLifeCycleData) data;
370380

371-
if (LifeCycle.NodeJoin.toString().equals(d.getLifeCycle())) {
372-
nodeLifeCycle.nodeJoin(d.getInventory());
373-
} else if (LifeCycle.NodeLeft.toString().equals(d.getLifeCycle())) {
374-
nodeLifeCycle.nodeLeft(d.getInventory());
375-
} else {
376-
throw new CloudRuntimeException(String.format("unknown lifecycle[%s]", d.getLifeCycle()));
381+
synchronized (lifecycleLock) {
382+
if (LifeCycle.NodeJoin.toString().equals(d.getLifeCycle())) {
383+
// Clear from suspected set since the node is confirmed alive
384+
suspectedMissingFromDb.remove(d.getInventory().getUuid());
385+
nodeLifeCycle.nodeJoin(d.getInventory());
386+
} else if (LifeCycle.NodeLeft.toString().equals(d.getLifeCycle())) {
387+
nodeLifeCycle.nodeLeft(d.getInventory());
388+
} else {
389+
throw new CloudRuntimeException(String.format("unknown lifecycle[%s]", d.getLifeCycle()));
390+
}
377391
}
378392
}
379393
};
@@ -860,34 +874,55 @@ private void checkAllNodesHealth() {
860874

861875
Set<String> nodeUuidsInDb = nodesInDb.stream().map(ManagementNodeVO::getUuid).collect(Collectors.toSet());
862876

863-
// When a node is dying, we may not receive the the dead notification because the message bus may be also dead
864-
// at that moment. By checking if the node UUID is still in our hash ring, we know what nodes should be kicked out
865-
destinationMaker.getManagementNodesInHashRing().forEach(nodeUuid -> {
866-
if (!nodeUuidsInDb.contains(nodeUuid)) {
867-
logger.warn(String.format("found that a management node[uuid:%s] had no heartbeat in database but still in our hash ring," +
868-
"notify that it's dead", nodeUuid));
869-
ManagementNodeInventory inv = new ManagementNodeInventory();
870-
inv.setUuid(nodeUuid);
871-
inv.setHostName(destinationMaker.getNodeInfo(nodeUuid).getNodeIP());
872-
873-
nodeLifeCycle.nodeLeft(inv);
874-
}
875-
});
876-
877-
// check if any node missing in our hash ring
878-
nodesInDb.forEach(n -> {
879-
if (n.getUuid().equals(node().getUuid()) || suspects.contains(n)) {
880-
return;
881-
}
882-
883-
new Runnable() {
884-
@Override
885-
@AsyncThread
886-
public void run() {
887-
nodeLifeCycle.nodeJoin(ManagementNodeInventory.valueOf(n));
877+
// Reconcile hash ring with DB under lifecycleLock to prevent race with
878+
// canonical event callbacks (nodeJoin/nodeLeft). See ZSTAC-77711.
879+
synchronized (lifecycleLock) {
880+
// When a node is dying, we may not receive the dead notification because the message bus may be also dead
881+
// at that moment. By checking if the node UUID is still in our hash ring, we know what nodes should be kicked out.
882+
// Use two-round confirmation: first round marks as suspected, second round actually removes.
883+
Set<String> currentSuspected = new HashSet<>();
884+
destinationMaker.getManagementNodesInHashRing().forEach(nodeUuid -> {
885+
if (!nodeUuidsInDb.contains(nodeUuid)) {
886+
if (suspectedMissingFromDb.contains(nodeUuid)) {
887+
// Second consecutive detection — confirmed missing, remove from hash ring
888+
logger.warn(String.format("management node[uuid:%s] confirmed missing from database for two consecutive" +
889+
" heartbeat cycles, removing from hash ring", nodeUuid));
890+
ManagementNodeInventory inv = new ManagementNodeInventory();
891+
inv.setUuid(nodeUuid);
892+
try {
893+
inv.setHostName(destinationMaker.getNodeInfo(nodeUuid).getNodeIP());
894+
} catch (Exception e) {
895+
logger.warn(String.format("cannot get node info for node[uuid:%s], use empty hostname", nodeUuid));
896+
}
897+
898+
nodeLifeCycle.nodeLeft(inv);
899+
} else {
900+
// First detection — mark as suspected, defer removal to next cycle
901+
logger.warn(String.format("management node[uuid:%s] not found in database but still in hash ring," +
902+
" marking as suspected (will remove on next heartbeat if still missing)", nodeUuid));
903+
currentSuspected.add(nodeUuid);
904+
}
888905
}
889-
}.run();
890-
});
906+
});
907+
// Update suspected set: only keep nodes that are newly suspected this round
908+
suspectedMissingFromDb.clear();
909+
suspectedMissingFromDb.addAll(currentSuspected);
910+
911+
// check if any node missing in our hash ring
912+
nodesInDb.forEach(n -> {
913+
if (n.getUuid().equals(node().getUuid()) || suspects.contains(n)) {
914+
return;
915+
}
916+
917+
new Runnable() {
918+
@Override
919+
@AsyncThread
920+
public void run() {
921+
nodeLifeCycle.nodeJoin(ManagementNodeInventory.valueOf(n));
922+
}
923+
}.run();
924+
});
925+
}
891926
}
892927

893928
@Override

0 commit comments

Comments
 (0)