Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ else if (U.isToStringMethod(mtd))
}

if (waitTimeout > 0 && U.currentTimeMillis() - startTime >= waitTimeout)
throw new IgniteException("Service acquire timeout was reached, stopping. [timeout=" + waitTimeout + "]");
throw new IgniteException("Service acquire timeout was reached, stopping [timeout=" + waitTimeout + "]");
}
}
finally {
Expand Down Expand Up @@ -397,7 +397,9 @@ private ClusterNode randomNodeForService(String name) throws IgniteCheckedExcept
if (snapshot.size() == 1) {
UUID nodeId = snapshot.keySet().iterator().next();

return prj.node(nodeId);
ClusterNode node = getAliveNode(nodeId);

return prj.predicate().apply(node) ? node : null;
}

Collection<ClusterNode> nodes = prj.nodes();
Expand All @@ -419,7 +421,7 @@ private ClusterNode randomNodeForService(String name) throws IgniteCheckedExcept
for (Map.Entry<UUID, Integer> e : snapshot.entrySet()) {
if (i++ >= idx) {
if (e.getValue() > 0)
return ctx.discovery().node(e.getKey());
return getAliveNode(e.getKey());
}
}

Expand All @@ -428,7 +430,7 @@ private ClusterNode randomNodeForService(String name) throws IgniteCheckedExcept
// Circle back.
for (Map.Entry<UUID, Integer> e : snapshot.entrySet()) {
if (e.getValue() > 0)
return ctx.discovery().node(e.getKey());
return getAliveNode(e.getKey());

if (i++ == idx)
return null;
Expand Down Expand Up @@ -462,6 +464,16 @@ T proxy() {
return proxy;
}

/** */
public ClusterNode getAliveNode(UUID nodeId) throws ClusterTopologyCheckedException {
ClusterNode node = ctx.discovery().node(nodeId);

if (node == null)
throw new ClusterTopologyCheckedException("The node holding the service left the cluster [nodeId=" + nodeId + ']');

return node;
}

/**
* @param mtd Method to invoke.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ public IgniteInternalFuture<?> cancelAll(@NotNull Collection<String> servicesNam
if (timeout == 0 && desc == null)
return null;

if (desc != null && desc.topologyInitialized())
if (desc != null && !desc.serviceTopology().isTransitional())
return desc.topologySnapshot();

long wait = 0;
Expand Down Expand Up @@ -1566,7 +1566,7 @@ void completeInitiatingFuture(boolean deploy, IgniteUuid reqSrvcId, Throwable er
*
* @param fullTops Deployment topologies.
*/
void updateServicesTopologies(@NotNull final Map<IgniteUuid, Map<UUID, Integer>> fullTops) {
void updateServicesTopologies(@NotNull final Map<IgniteUuid, ServiceTopology> fullTops) {
if (!enterBusy())
return;

Expand Down Expand Up @@ -1936,7 +1936,7 @@ private void processDynamicCacheChangeRequest(DynamicCacheChangeBatch msg) {
* @param msg Message.
*/
private void processServicesFullDeployments(ServiceClusterDeploymentResultBatch msg) {
final Map<IgniteUuid, Map<UUID, Integer>> fullTops = new HashMap<>();
final Map<IgniteUuid, ServiceTopology> fullTops = new HashMap<>();
final Map<IgniteUuid, Collection<Throwable>> fullErrors = new HashMap<>();

for (ServiceClusterDeploymentResult depRes : msg.results()) {
Expand All @@ -1959,7 +1959,7 @@ private void processServicesFullDeployments(ServiceClusterDeploymentResultBatch
if (!errors.isEmpty())
fullErrors.computeIfAbsent(srvcId, e -> new ArrayList<>()).addAll(errors);

fullTops.put(srvcId, top);
fullTops.put(srvcId, new ServiceTopology(top, depRes.isServiceTopologyTransitional()));
}

synchronized (servicesTopsUpdateMux) {
Expand Down Expand Up @@ -1990,14 +1990,12 @@ private void processServicesFullDeployments(ServiceClusterDeploymentResultBatch
* @param services Services info to update.
* @param tops Deployment topologies.
*/
private void updateServicesMap(Map<IgniteUuid, ServiceInfo> services,
Map<IgniteUuid, Map<UUID, Integer>> tops) {

private void updateServicesMap(Map<IgniteUuid, ServiceInfo> services, Map<IgniteUuid, ServiceTopology> tops) {
tops.forEach((srvcId, top) -> {
ServiceInfo desc = services.get(srvcId);

if (desc != null)
desc.topologySnapshot(top);
desc.updateServiceTopology(top);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ public class ServiceClusterDeploymentResult implements Message {
@GridToStringInclude
Map<UUID, ServiceSingleNodeDeploymentResult> results;

/**
* Whether topology is transitional. Nodes may leave the cluster while the service topology is being recalculated.
* In this case, the resulting service topology may be incomplete. We consider the mentioned service topology
* transitional and expect it to be recalculated soon.
*/
@Order(2)
boolean isSvcTopTransitional;

/** Default constructor for {@link MessageFactory}. */
public ServiceClusterDeploymentResult() {
}
Expand All @@ -51,8 +59,10 @@ public ServiceClusterDeploymentResult() {
* @param srvcId Service id.
* @param results Deployments results.
*/
public ServiceClusterDeploymentResult(@NotNull IgniteUuid srvcId,
@NotNull Map<UUID, ServiceSingleNodeDeploymentResult> results) {
public ServiceClusterDeploymentResult(
@NotNull IgniteUuid srvcId,
@NotNull Map<UUID, ServiceSingleNodeDeploymentResult> results
) {
this.srvcId = srvcId;
this.results = results;
}
Expand All @@ -71,6 +81,20 @@ public Map<UUID, ServiceSingleNodeDeploymentResult> results() {
return Collections.unmodifiableMap(results);
}

/** */
public boolean isServiceTopologyTransitional() {
return isSvcTopTransitional;
}

/**
* Marks topology as transitional. Nodes may leave the cluster while the service topology is being recalculated.
* In this case, the resulting service topology may be incomplete. We consider the mentioned service topology
* transitional and expect it to be recalculated soon.
*/
public void markServiceTopologyTransitional() {
isSvcTopTransitional = true;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ServiceClusterDeploymentResult.class, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.util.typedef.F;
Expand All @@ -43,7 +42,7 @@ public class ServiceDeploymentActions {
private Map<IgniteUuid, ServiceInfo> servicesToUndeploy;

/** Services deployment topologies. */
private Map<IgniteUuid, Map<UUID, Integer>> depTops;
private Map<IgniteUuid, ServiceTopology> depTops;

/** Services deployment errors. */
private Map<IgniteUuid, Collection<Throwable>> depErrors;
Expand Down Expand Up @@ -118,15 +117,15 @@ public boolean deactivate() {
/**
* @return Deployment topologies.
*/
@NotNull public Map<IgniteUuid, Map<UUID, Integer>> deploymentTopologies() {
@NotNull public Map<IgniteUuid, ServiceTopology> deploymentTopologies() {
return depTops != null ? depTops : Collections.emptyMap();
}

/**
* @param depTops Deployment topologies.
*/
public void deploymentTopologies(@NotNull Map<IgniteUuid, Map<UUID, Integer>> depTops) {
this.depTops = Collections.unmodifiableMap(new HashMap<>(depTops));
public void deploymentTopologies(@NotNull Map<IgniteUuid, ServiceTopology> depTops) {
this.depTops = Collections.unmodifiableMap(depTops);
}

/**
Expand All @@ -140,6 +139,6 @@ public void deploymentTopologies(@NotNull Map<IgniteUuid, Map<UUID, Integer>> de
* @param depErrors Deployment errors.
*/
public void deploymentErrors(@NotNull Map<IgniteUuid, Collection<Throwable>> depErrors) {
this.depErrors = Collections.unmodifiableMap(new HashMap<>(depErrors));
this.depErrors = Collections.unmodifiableMap(depErrors);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -79,6 +80,9 @@ class ServiceDeploymentTask {
@GridToStringInclude
private final Set<UUID> remaining = new HashSet<>();

/** Nodes that did not respond with single message because they left the cluster during distributed process. */
private final Set<UUID> failedToReply = new HashSet<>();

/** Added in deployment queue flag. */
private final AtomicBoolean addedInQueue = new AtomicBoolean(false);

Expand Down Expand Up @@ -219,8 +223,13 @@ protected void init() throws IgniteCheckedException {

if (evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED) {
deployedServices.forEach((srvcId, desc) -> {
if (desc.topologySnapshot().containsKey(evtNode.id()) ||
(desc.cacheName() != null && !evtNode.isClient())) // If affinity service
ServiceTopology top = desc.serviceTopology();

if (
top.isTransitional() ||
top.containsNode(evtNode.id()) ||
desc.cacheName() != null && !evtNode.isClient() // If affinity service
)
toDeploy.put(srvcId, desc);
});
}
Expand Down Expand Up @@ -342,8 +351,13 @@ private void initCoordinator(AffinityTopologyVersion topVer) {

try {
for (ClusterNode node : ctx.discovery().nodes(topVer)) {
if (ctx.discovery().alive(node) && !singleDepsMsgs.containsKey(node.id()))
if (singleDepsMsgs.containsKey(node.id()))
continue;

if (ctx.discovery().alive(node))
remaining.add(node.id());
else
failedToReply.add(node.id());
}
}
catch (Exception e) {
Expand Down Expand Up @@ -462,7 +476,7 @@ protected void onReceiveFullDeploymentsMessage(ServiceClusterDeploymentResultBat

assert depResults != null : "Services deployment actions should be attached.";

final Map<IgniteUuid, Map<UUID, Integer>> fullTops = depResults.deploymentTopologies();
final Map<IgniteUuid, ServiceTopology> fullTops = depResults.deploymentTopologies();
final Map<IgniteUuid, Collection<Throwable>> fullErrors = depResults.deploymentErrors();

depActions.deploymentTopologies(fullTops);
Expand All @@ -473,7 +487,7 @@ protected void onReceiveFullDeploymentsMessage(ServiceClusterDeploymentResultBat
final Map<IgniteUuid, ServiceInfo> services = srvcProc.deployedServices();

fullTops.forEach((srvcId, top) -> {
Integer expCnt = top.getOrDefault(ctx.localNodeId(), 0);
Integer expCnt = top.snapshot().getOrDefault(ctx.localNodeId(), 0);

if (expCnt < srvcProc.localInstancesCount(srvcId)) { // Undeploy exceed instances
ServiceInfo desc = services.get(srvcId);
Expand All @@ -483,7 +497,7 @@ protected void onReceiveFullDeploymentsMessage(ServiceClusterDeploymentResultBat
ServiceConfiguration cfg = desc.configuration();

try {
srvcProc.redeploy(srvcId, cfg, top);
srvcProc.redeploy(srvcId, cfg, top.snapshot());
}
catch (IgniteCheckedException e) {
log.error("Error occured during cancel exceed service instances: " +
Expand Down Expand Up @@ -649,15 +663,41 @@ private Collection<ServiceClusterDeploymentResult> buildFullDeploymentsResults(

final Collection<ServiceClusterDeploymentResult> fullResults = new ArrayList<>();

Set<IgniteUuid> transitionalSrvcTops = collectTransitionalTopologies();

singleResults.forEach((srvcId, dep) -> {
ServiceClusterDeploymentResult res = new ServiceClusterDeploymentResult(srvcId, dep);

if (transitionalSrvcTops.contains(srvcId))
res.markServiceTopologyTransitional();

fullResults.add(res);
});

return fullResults;
}

/**
* Nodes may leave the cluster while the service topology is being recalculated. In this case, the resulting service
* topology may be incomplete. We consider the mentioned service topology transitional and expect it to be recalculated
* soon.
*/
private Set<IgniteUuid> collectTransitionalTopologies() {
if (failedToReply.isEmpty())
return Collections.emptySet();

Set<IgniteUuid> res = new HashSet<>();

for (UUID nodeId : failedToReply) {
expDeps.forEach((srvcId, top) -> {
if (top.containsKey(nodeId))
res.add(srvcId);
});
}

return res;
}

/**
* Handles a node leaves topology.
*
Expand Down Expand Up @@ -688,10 +728,14 @@ else if (ctx.localNodeId().equals(crdId)) {
synchronized (initCrdMux) {
boolean rmvd = remaining.remove(nodeId);

if (rmvd && remaining.isEmpty()) {
singleDepsMsgs.remove(nodeId);
if (rmvd) {
failedToReply.add(nodeId);

onAllReceived();
if (remaining.isEmpty()) {
singleDepsMsgs.remove(nodeId);

onAllReceived();
}
}
}
}
Expand Down
Loading
Loading