Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -15,34 +15,41 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

import org.flowable.bpmn.model.Activity;
import org.flowable.bpmn.model.FlowElement;
import org.flowable.bpmn.model.FlowNode;
import org.flowable.bpmn.model.ParallelGateway;
import org.flowable.bpmn.model.SequenceFlow;
import org.flowable.common.engine.api.FlowableException;
import org.flowable.engine.HistoryService;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.history.HistoricActivityInstance;
import org.flowable.engine.impl.HistoricActivityInstanceQueryProperty;
import org.flowable.engine.impl.context.Context;
import org.flowable.engine.impl.persistence.entity.ExecutionEntity;
import org.flowable.engine.impl.persistence.entity.ExecutionEntityManager;
import org.flowable.engine.impl.persistence.entity.HistoricActivityInstanceEntityImpl;
import org.flowable.engine.impl.util.CommandContextUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of the Parallel Gateway/AND gateway as defined in the BPMN 2.0 specification.
*
*
* The Parallel Gateway can be used for splitting a path of execution into multiple paths of executions (AND-split/fork behavior), one for every outgoing sequence flow.
*
*
* The Parallel Gateway can also be used for merging or joining paths of execution (AND-join). In this case, on every incoming sequence flow an execution needs to arrive, before leaving the Parallel
* Gateway (and potentially then doing the fork behavior in case of multiple outgoing sequence flow).
*
* Note that there is a slight difference to spec (p. 436): "The parallel gateway is activated if there is at least one Token on each incoming sequence flow." We only check the number of incoming
* tokens to the number of sequenceflow. So if two tokens would arrive through the same sequence flow, our implementation would activate the gateway.
*
*
* Note that a Parallel Gateway having one incoming and multiple outgoing sequence flow, is the same as having multiple outgoing sequence flow on a given activity. However, a parallel gateway does NOT
* check conditions on the outgoing sequence flow.
*
*
* @author Joram Barrez
* @author Tom Baeyens
*/
Expand All @@ -52,6 +59,8 @@ public class ParallelGatewayActivityBehavior extends GatewayActivityBehavior {

private static final Logger LOGGER = LoggerFactory.getLogger(ParallelGatewayActivityBehavior.class);

private HistoryService historyService = Context.getProcessEngineConfiguration().getHistoryService();

@Override
public void execute(DelegateExecution execution) {

Expand Down Expand Up @@ -83,19 +92,90 @@ public void execute(DelegateExecution execution) {
}

int nbrOfExecutionsToJoin = parallelGateway.getIncomingFlows().size();
int nbrOfExecutionsCurrentlyJoined = joinedExecutions.size();

Set<String> unmatchedIncomingFlowIds = new HashSet<>();

// Add all incoming flows to the unmatchedIncomingFlowIds Set
for (SequenceFlow incomingFlow : parallelGateway.getIncomingFlows()) {
if (incomingFlow.getId() != null) {
unmatchedIncomingFlowIds.add(incomingFlow.getId());
} else {
// If incoming flow has no id, then we build the string used as the flowable default id
// See getArtificialSequenceFlowId() in org.flowable.engine.impl.persistence.entity.ActivityInstanceEntityManagerImpl
unmatchedIncomingFlowIds.add(new StringBuilder("_flow_").append(incomingFlow.getSourceRef()).append("__").append(incomingFlow.getTargetRef()).toString());
}
}

// Collect HistoricActivity cache because historyService is not flushed between the sequenceFlow completing and reaching parallelGateway
// Sort output so that the most recent activity is first in the list
List<HistoricActivityInstance> historicActivitiesCache = CommandContextUtil.getEntityCache()
.findInCache(HistoricActivityInstanceEntityImpl.class)
.stream().map(historicActivity -> (HistoricActivityInstance) historicActivity)
.sorted(Comparator.comparing(HistoricActivityInstance::getEndTime, Comparator.nullsFirst(Comparator.reverseOrder()))).toList();

// For each execution, get the most recent sequenceflow activity and remove it from the Set of incomingFlow ids
// If the Set becomes empty, then we know that we have all the incoming flows fulfilled in our joinedExecutions
// Ignore the execution unless the most recent activity is the join gateway, and the previous activity was the relevant sequenceFlow
joinedExecutions:
for (ExecutionEntity joinedExecution : joinedExecutions) {
List<HistoricActivityInstance> latestActivities = new ArrayList<>();

// Add relevant latest activities (i.e. 2 most recent activities) to the latestActivities List & sort using most recent activity first
latestActivities.addAll(historyService
.createHistoricActivityInstanceQuery()
.executionId(joinedExecution.getId())
.orderBy(HistoricActivityInstanceQueryProperty.END).desc()
.listPage(0, 2));

// Add any relevant cached entities to the start of the list
latestActivities.addAll(0, historicActivitiesCache.stream().filter(historicActivity -> Objects.equals(historicActivity.getExecutionId(), joinedExecution.getId())).toList());

if (latestActivities.isEmpty()) {
continue;
}

// Walk back through the latest activities for this execution & check if they include a sequenceFlow that leads to this gateway
latestActivities:
for (int i = 0; i < latestActivities.size(); i++) {
HistoricActivityInstance latestActivity = latestActivities.get(i);

// Latest activity is parallelGateway, but not this parallelGateway - i.e. its not the execution we are looking for
if (Objects.equals(latestActivity.getActivityType(), "parallelGateway") && !Objects.equals(latestActivity.getActivityId(), parallelGateway.getId())) {
continue joinedExecutions;
}
// Latest activity is parallelGateway && id matches. Its the execution we are looking for. Our sequenceFlow should be the next activity
if (Objects.equals(latestActivity.getActivityType(), "parallelGateway") && Objects.equals(latestActivity.getActivityId(), parallelGateway.getId())) {
continue;
}
// If we for some reason get to here and the activity is not a sequence flow, then we are in the wrong execution
if (!Objects.equals(latestActivity.getActivityType(), "sequenceFlow")) {
continue joinedExecutions;
}

unmatchedIncomingFlowIds.remove(latestActivity.getActivityId());

// Once the unmatchedIncomingFlowIds Set is empty, then break and avoid running unneeded historyService queries
if (unmatchedIncomingFlowIds.isEmpty()) {
break joinedExecutions;
}

// if we get to here, then we have already completed analysing this execution. Continue to the next execution
continue joinedExecutions;
}
}

// Fork

// Is needed to set the endTime for all historic activity joins
CommandContextUtil.getActivityInstanceEntityManager().recordActivityEnd((ExecutionEntity) execution, null);

if (nbrOfExecutionsCurrentlyJoined == nbrOfExecutionsToJoin) {
// if all incoming flows were executed, then the gateway can continue
if (unmatchedIncomingFlowIds.isEmpty()) {

// Fork
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("parallel gateway '{}' ({}) activates: {} of {} joined", execution.getCurrentActivityId(),
execution.getId(), nbrOfExecutionsCurrentlyJoined, nbrOfExecutionsToJoin);
LOGGER.debug("parallel gateway '{}' ({}) activates: {} of {} joined", execution.getCurrentActivityId(),
execution.getId(), nbrOfExecutionsToJoin, nbrOfExecutionsToJoin);
}

if (parallelGateway.getIncomingFlows().size() > 1) {
Expand All @@ -116,8 +196,8 @@ public void execute(DelegateExecution execution) {
CommandContextUtil.getAgenda().planTakeOutgoingSequenceFlowsOperation((ExecutionEntity) execution, false); // false -> ignoring conditions on parallel gw

} else if (LOGGER.isDebugEnabled()) {
LOGGER.debug("parallel gateway '{}' ({}) does not activate: {} of {} joined", execution.getCurrentActivityId(),
execution.getId(), nbrOfExecutionsCurrentlyJoined, nbrOfExecutionsToJoin);
LOGGER.debug("parallel gateway '{}' ({}) does not activate: {} of {} incoming flows were not completed", execution.getCurrentActivityId(),
execution.getId(), unmatchedIncomingFlowIds.size(), nbrOfExecutionsToJoin);
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -84,4 +84,37 @@ public void testUnbalancedForkJoin() {
assertProcessEnded(pi.getId());
}

@Test
@Deployment
public void testMultipleTokensJoin() {

ProcessInstance pi = runtimeService.startProcessInstanceByKey("multipleTokensJoin");
TaskQuery query = taskService.createTaskQuery().processInstanceId(pi.getId()).orderByTaskName().asc();

// There should be two tokens waiting at Wait Task 1 (two sequence flows lead to it in bpmn)
List<org.flowable.task.api.Task> tasks = query.list();
assertThat(tasks)
.extracting(Task::getName)
.containsExactly("Wait Task 1", "Wait Task 1", "Wait Task 2");

// Completing both Wait Task 1s should not satisfy the joining parallel gateway, and Final Task should still not be reached
// We should still be waiting at Wait Task 2
tasks.stream().filter(task -> task.getName().equals("Wait Task 1")).forEach(task -> {
taskService.complete(task.getId());
});

tasks = query.list();
assertThat(tasks)
.extracting(Task::getName)
.containsExactly("Wait Task 2");

// Completing Wait Task 2 should satisfy the Parallel Gateway, and we progress to the final task
taskService.complete(tasks.get(0).getId());

tasks = query.list();
assertThat(tasks)
.extracting(Task::getName)
.containsExactly("Final Task");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions id="definitions"
xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:activiti="http://activiti.org/bpmn"
targetNamespace="Examples">

<process id="multipleTokensJoin">

<startEvent id="theStart" />
<sequenceFlow id="flow1" sourceRef="theStart" targetRef="fork" />

<parallelGateway id="fork" />
<!-- Push 2 tokens down the waitTask1 sequenceFlow -->
<sequenceFlow sourceRef="fork" targetRef="waitTask1" />
<sequenceFlow sourceRef="fork" targetRef="waitTask1" />
<sequenceFlow sourceRef="fork" targetRef="waitTask2" />

<userTask id="waitTask1" name="Wait Task 1" />
<sequenceFlow sourceRef="waitTask1" targetRef="join" />

<userTask id="waitTask2" name="Wait Task 2" />
<!-- ensure that gateway logic works when sequenceFlow has id set -->
<sequenceFlow id="waitTask2ToJoinGateway" sourceRef="waitTask2" targetRef="join" />

<parallelGateway id="join" />
<sequenceFlow sourceRef="join" targetRef="finalTask" />

<userTask id="finalTask" name="Final Task" />
<sequenceFlow sourceRef="finalTask" targetRef="theEnd" />

<endEvent id="theEnd" />

</process>

</definitions>