Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions compute/src/main/java/org/zstack/compute/vm/VmInstanceBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -7226,6 +7226,7 @@ protected void scripts() {
self.setZoneUuid(spec.getDestHost().getZoneUuid());
}
}.execute());
syncVmDevicesAddressInfo(self.getUuid());
logger.debug(String.format("vm[uuid:%s] is running ..", self.getUuid()));
VmInstanceInventory inv = VmInstanceInventory.valueOf(self);
extEmitter.afterStartVm(inv);
Expand Down Expand Up @@ -7495,6 +7496,9 @@ public void run(FlowTrigger trigger, Map data) {
self.setHypervisorType(spec.getDestHost().getHypervisorType());
self.setRootVolumeUuid(spec.getDestRootVolume().getUuid());
});
if (struct.getStrategy() == VmCreationStrategy.InstantStart) {
syncVmDevicesAddressInfo(self.getUuid());
}
logger.debug(String.format("vm[uuid:%s] is started ..", self.getUuid()));
VmInstanceInventory inv = VmInstanceInventory.valueOf(self);
extEmitter.afterStartNewCreatedVm(inv);
Expand Down Expand Up @@ -7931,6 +7935,7 @@ public void handle(Map data) {
public void done() {
self = changeVmStateInDb(VmInstanceStateEvent.running,
() -> self.setHostUuid(originalCopy.getHostUuid()));
syncVmDevicesAddressInfo(self.getUuid());
VmInstanceInventory inv = VmInstanceInventory.valueOf(self);
extEmitter.afterRebootVm(inv);
new StaticIpOperator().deleteIpChange(self.getUuid());
Expand Down Expand Up @@ -8347,6 +8352,7 @@ protected void resumeVm(final Message msg, Completion completion) {
@Override
public void handle(Map Data) {
self = changeVmStateInDb(VmInstanceStateEvent.running);
syncVmDevicesAddressInfo(self.getUuid());
completion.success();
}
}).error(new FlowErrorHandler(completion) {
Expand Down Expand Up @@ -8467,6 +8473,28 @@ public String getName() {
});
}

private void syncVmDevicesAddressInfo(String vmUuid) {
if (self.getHostUuid() == null) {
return;
}
SyncVmDeviceInfoMsg msg = new SyncVmDeviceInfoMsg();
msg.setVmInstanceUuid(vmUuid);
msg.setHostUuid(self.getHostUuid());
bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, msg.getHostUuid());
bus.send(msg, new CloudBusCallBack(msg) {
@Override
public void run(MessageReply reply) {
if (!reply.isSuccess()) {
logger.warn(String.format("Failed to sync vm device info for vm[uuid:%s], %s",
vmUuid, reply.getError()));
} else {
logger.debug(String.format("Sent SyncVmDeviceInfoMsg for vm[uuid:%s] on host[uuid:%s]",
vmUuid, self.getHostUuid()));
}
}
});
}

private void deleteVmCdRom(String cdRomUuid, Completion completion) {
boolean exist = dbf.isExist(cdRomUuid, VmCdRomVO.class);
if (!exist) {
Expand Down
2 changes: 1 addition & 1 deletion conf/i18n/globalErrorCodeMapping/global-error-en_US.json
Original file line number Diff line number Diff line change
Expand Up @@ -3374,7 +3374,7 @@
"ORG_ZSTACK_NETWORK_HUAWEI_IMASTER_10019": "delete token of SDN controller [IP:%s] failed because %s",
"ORG_ZSTACK_STORAGE_PRIMARY_BLOCK_10004": "Cannot execute volume mapping to host flow due to invalid volume ID.%s",
"ORG_ZSTACK_NETWORK_SERVICE_PORTFORWARDING_10007": "port forwarding rule [uuid:%s] has not been attached to any virtual machine network interface, cannot detach",
"ORG_ZSTACK_MEVOCO_10088": "cannot take a snapshot for volumes[%s] when volume[uuid: %s] is not attached",
"ORG_ZSTACK_MEVOCO_10088": "cannot create snapshot for volume[uuid:%s] because it is not attached to any VM instance. Please attach the volume to a VM first. Affected volumes: %s",
"ORG_ZSTACK_STORAGE_PRIMARY_BLOCK_10005": "Cannot execute map LUN to host flow due to invalid LUN type: %s",
"ORG_ZSTACK_NETWORK_SERVICE_PORTFORWARDING_10008": "port forwarding rule [uuid:%s] has been associated with vm nic [uuid:%s], cannot be reassigned again",
"ORG_ZSTACK_MEVOCO_10087": "A Running VM[uuid:%s] has no associated Host UUID.",
Expand Down
2 changes: 1 addition & 1 deletion conf/i18n/globalErrorCodeMapping/global-error-zh_CN.json
Original file line number Diff line number Diff line change
Expand Up @@ -3374,7 +3374,7 @@
"ORG_ZSTACK_NETWORK_HUAWEI_IMASTER_10019": "删除 SDN 控制器 [IP:%s] 的令牌失败,因为 %s",
"ORG_ZSTACK_STORAGE_PRIMARY_BLOCK_10004": "无法执行映射LUN到主机流程,无效的LUN ID",
"ORG_ZSTACK_NETWORK_SERVICE_PORTFORWARDING_10007": "端口转发规则 rule[uuid:%s] 没有绑定到任何 VM 的网卡上,无法解除绑定",
"ORG_ZSTACK_MEVOCO_10088": "无法为挂载状态以外的卷[%s]创建快照",
"ORG_ZSTACK_MEVOCO_10088": "无法为云盘[uuid:%s]创建快照,因为该云盘未挂载到任何云主机。请先将云盘挂载到云主机后再创建快照。相关云盘: %s",
"ORG_ZSTACK_STORAGE_PRIMARY_BLOCK_10005": "无法执行映射LUN到主机流程,无效的LUN类型",
"ORG_ZSTACK_NETWORK_SERVICE_PORTFORWARDING_10008": "端口转发规则[uuid:%s]已绑定到VM网卡[uuid:%s],无法再次绑定",
"ORG_ZSTACK_MEVOCO_10087": "如何一个运行中的VM[uuid:%s]没有宿主机uuid?",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,27 @@ public class ResourceDestinationMakerImpl implements ManagementNodeChangeListene
private DatabaseFacade dbf;

@Override
public void nodeJoin(ManagementNodeInventory inv) {
public synchronized void nodeJoin(ManagementNodeInventory inv) {
nodeHash.add(inv.getUuid());
nodes.put(inv.getUuid(), new NodeInfo(inv));
}

@Override
public void nodeLeft(ManagementNodeInventory inv) {
public synchronized void nodeLeft(ManagementNodeInventory inv) {
String nodeId = inv.getUuid();
nodeHash.remove(nodeId);
nodes.remove(nodeId);
}

@Override
public void iAmDead(ManagementNodeInventory inv) {
public synchronized void iAmDead(ManagementNodeInventory inv) {
String nodeId = inv.getUuid();
nodeHash.remove(nodeId);
nodes.remove(nodeId);
}

@Override
public void iJoin(ManagementNodeInventory inv) {
public synchronized void iJoin(ManagementNodeInventory inv) {
List<ManagementNodeVO> lst = Q.New(ManagementNodeVO.class).list();
lst.forEach((ManagementNodeVO node) -> {
nodeHash.add(node.getUuid());
Expand All @@ -56,7 +56,7 @@ public void iJoin(ManagementNodeInventory inv) {
}

@Override
public String makeDestination(String resourceUuid) {
public synchronized String makeDestination(String resourceUuid) {
String nodeUuid = nodeHash.get(resourceUuid);
if (nodeUuid == null) {
throw new CloudRuntimeException("Cannot find any available management node to send message");
Expand All @@ -66,18 +66,18 @@ public String makeDestination(String resourceUuid) {
}

@Override
public boolean isManagedByUs(String resourceUuid) {
public synchronized boolean isManagedByUs(String resourceUuid) {
String nodeUuid = makeDestination(resourceUuid);
return nodeUuid.equals(Platform.getManagementServerId());
}

@Override
public Collection<String> getManagementNodesInHashRing() {
return nodeHash.getNodes();
public synchronized Collection<String> getManagementNodesInHashRing() {
return new ArrayList<>(nodeHash.getNodes());
}

@Override
public NodeInfo getNodeInfo(String nodeUuid) {
public synchronized NodeInfo getNodeInfo(String nodeUuid) {
NodeInfo info = nodes.get(nodeUuid);
if (info == null) {
ManagementNodeVO vo = dbf.findByUuid(nodeUuid, ManagementNodeVO.class);
Expand All @@ -93,17 +93,17 @@ public NodeInfo getNodeInfo(String nodeUuid) {
}

@Override
public Collection<NodeInfo> getAllNodeInfo() {
return nodes.values();
public synchronized Collection<NodeInfo> getAllNodeInfo() {
return new ArrayList<>(nodes.values());
}

@Override
public int getManagementNodeCount() {
return nodes.values().size();
public synchronized int getManagementNodeCount() {
return nodes.size();
}


public boolean isNodeInCircle(String nodeId) {
public synchronized boolean isNodeInCircle(String nodeId) {
return nodeHash.hasNode(nodeId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,9 @@ public void setBootMode(String bootMode) {

public long getRootDiskAllocateSize() {
if (rootDiskOffering == null) {
return this.getImageSpec().getInventory().getSize();
long virtualSize = this.getImageSpec().getInventory().getSize();
long actualSize = this.getImageSpec().getInventory().getActualSize();
return Math.max(virtualSize, actualSize);
}
return rootDiskOffering.getDiskSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public enum VmInstanceState {
new Transaction(VmInstanceStateEvent.destroyed, VmInstanceState.Destroyed),
new Transaction(VmInstanceStateEvent.destroying, VmInstanceState.Destroying),
new Transaction(VmInstanceStateEvent.running, VmInstanceState.Running),
new Transaction(VmInstanceStateEvent.stopped, VmInstanceState.Stopped),
new Transaction(VmInstanceStateEvent.expunging, VmInstanceState.Expunging)
);
Destroyed.transactions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ public List<ActiveVolumeClient> getActiveClients(String installPath, String prot
if (VolumeProtocol.CBD.toString().equals(protocol)) {
GetVolumeClientsCmd cmd = new GetVolumeClientsCmd();
cmd.setPath(installPath);
GetVolumeClientsRsp rsp = syncHttpCall(GET_VOLUME_CLIENTS_PATH, cmd, GetVolumeClientsRsp.class);
GetVolumeClientsRsp rsp = new HttpCaller<>(GET_VOLUME_CLIENTS_PATH, cmd, GetVolumeClientsRsp.class,
null, TimeUnit.SECONDS, 30, true)
.setTryNext(true)
.syncCall();
List<ActiveVolumeClient> clients = new ArrayList<>();

if (!rsp.isSuccess()) {
Expand Down Expand Up @@ -1411,6 +1414,11 @@ public class HttpCaller<T extends AgentResponse> {

private boolean tryNext = false;

HttpCaller<T> setTryNext(boolean tryNext) {
this.tryNext = tryNext;
return this;
}

public HttpCaller(String path, AgentCommand cmd, Class<T> retClass, ReturnValueCompletion<T> callback) {
this(path, cmd, retClass, callback, null, 0, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -107,6 +108,15 @@ public class ManagementNodeManagerImpl extends AbstractService implements Manage
// A dictionary (nodeId -> ManagementNodeInventory) of joined management Node
final private Map<String, ManagementNodeInventory> joinedManagementNodes = new ConcurrentHashMap<>();

// Lock to serialize lifecycle events from heartbeat reconciliation and canonical event callbacks,
// preventing race conditions where a nodeJoin event is immediately followed by a stale nodeLeft
// from the heartbeat thread, or vice versa. See ZSTAC-77711.
private final Object lifecycleLock = new Object();

// Track nodes found in hash ring but missing from DB. Only call nodeLeft after a node
// is missing for two consecutive heartbeat cycles, to avoid removing nodes that just joined.
private final Set<String> suspectedMissingFromDb = new HashSet<>();

private static int NODE_STARTING = 0;
private static int NODE_RUNNING = 1;
private static int NODE_FAILED = -1;
Expand Down Expand Up @@ -368,12 +378,16 @@ protected void run(Map tokens, Object data) {

ManagementNodeLifeCycleData d = (ManagementNodeLifeCycleData) data;

if (LifeCycle.NodeJoin.toString().equals(d.getLifeCycle())) {
nodeLifeCycle.nodeJoin(d.getInventory());
} else if (LifeCycle.NodeLeft.toString().equals(d.getLifeCycle())) {
nodeLifeCycle.nodeLeft(d.getInventory());
} else {
throw new CloudRuntimeException(String.format("unknown lifecycle[%s]", d.getLifeCycle()));
synchronized (lifecycleLock) {
if (LifeCycle.NodeJoin.toString().equals(d.getLifeCycle())) {
// Clear from suspected set since the node is confirmed alive
suspectedMissingFromDb.remove(d.getInventory().getUuid());
nodeLifeCycle.nodeJoin(d.getInventory());
} else if (LifeCycle.NodeLeft.toString().equals(d.getLifeCycle())) {
nodeLifeCycle.nodeLeft(d.getInventory());
} else {
throw new CloudRuntimeException(String.format("unknown lifecycle[%s]", d.getLifeCycle()));
}
}
}
};
Expand Down Expand Up @@ -860,34 +874,55 @@ private void checkAllNodesHealth() {

Set<String> nodeUuidsInDb = nodesInDb.stream().map(ManagementNodeVO::getUuid).collect(Collectors.toSet());

// When a node is dying, we may not receive the the dead notification because the message bus may be also dead
// at that moment. By checking if the node UUID is still in our hash ring, we know what nodes should be kicked out
destinationMaker.getManagementNodesInHashRing().forEach(nodeUuid -> {
if (!nodeUuidsInDb.contains(nodeUuid)) {
logger.warn(String.format("found that a management node[uuid:%s] had no heartbeat in database but still in our hash ring," +
"notify that it's dead", nodeUuid));
ManagementNodeInventory inv = new ManagementNodeInventory();
inv.setUuid(nodeUuid);
inv.setHostName(destinationMaker.getNodeInfo(nodeUuid).getNodeIP());

nodeLifeCycle.nodeLeft(inv);
}
});

// check if any node missing in our hash ring
nodesInDb.forEach(n -> {
if (n.getUuid().equals(node().getUuid()) || suspects.contains(n)) {
return;
}

new Runnable() {
@Override
@AsyncThread
public void run() {
nodeLifeCycle.nodeJoin(ManagementNodeInventory.valueOf(n));
// Reconcile hash ring with DB under lifecycleLock to prevent race with
// canonical event callbacks (nodeJoin/nodeLeft). See ZSTAC-77711.
synchronized (lifecycleLock) {
// When a node is dying, we may not receive the dead notification because the message bus may be also dead
// at that moment. By checking if the node UUID is still in our hash ring, we know what nodes should be kicked out.
// Use two-round confirmation: first round marks as suspected, second round actually removes.
Set<String> currentSuspected = new HashSet<>();
destinationMaker.getManagementNodesInHashRing().forEach(nodeUuid -> {
if (!nodeUuidsInDb.contains(nodeUuid)) {
if (suspectedMissingFromDb.contains(nodeUuid)) {
// Second consecutive detection — confirmed missing, remove from hash ring
logger.warn(String.format("management node[uuid:%s] confirmed missing from database for two consecutive" +
" heartbeat cycles, removing from hash ring", nodeUuid));
ManagementNodeInventory inv = new ManagementNodeInventory();
inv.setUuid(nodeUuid);
try {
inv.setHostName(destinationMaker.getNodeInfo(nodeUuid).getNodeIP());
} catch (Exception e) {
logger.warn(String.format("cannot get node info for node[uuid:%s], use empty hostname", nodeUuid));
}

nodeLifeCycle.nodeLeft(inv);
} else {
// First detection — mark as suspected, defer removal to next cycle
logger.warn(String.format("management node[uuid:%s] not found in database but still in hash ring," +
" marking as suspected (will remove on next heartbeat if still missing)", nodeUuid));
currentSuspected.add(nodeUuid);
}
}
}.run();
});
});
// Update suspected set: only keep nodes that are newly suspected this round
suspectedMissingFromDb.clear();
suspectedMissingFromDb.addAll(currentSuspected);

// check if any node missing in our hash ring
nodesInDb.forEach(n -> {
if (n.getUuid().equals(node().getUuid()) || suspects.contains(n)) {
return;
}

new Runnable() {
@Override
@AsyncThread
public void run() {
nodeLifeCycle.nodeJoin(ManagementNodeInventory.valueOf(n));
}
}.run();
});
}
}

@Override
Expand Down