Skip to content

Commit 36cb209

Browse files
committed
[Fix #1354] ForExecutor was not properly implemented for multithread
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 861ca31 commit 36cb209

3 files changed

Lines changed: 60 additions & 16 deletions

File tree

experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ void testForEachEmit() {
7575
.build();
7676

7777
List<CloudEvent> publishedEvents = new CopyOnWriteArrayList<>();
78-
InMemoryEvents eventBroker = new InMemoryEvents();
78+
InMemoryEvents eventBroker = new LaggedInMemoryEvent();
7979
eventBroker.register(eventType, ce -> publishedEvents.add(ce));
8080

8181
try (WorkflowApplication app =
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.fluent.test;
17+
18+
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.events.InMemoryEvents;
20+
import java.util.concurrent.CompletableFuture;
21+
22+
public class LaggedInMemoryEvent extends InMemoryEvents {
23+
24+
@Override
25+
public CompletableFuture<Void> publish(CloudEvent ce) {
26+
27+
return super.publish(ce)
28+
.thenRun(
29+
() -> {
30+
try {
31+
Thread.sleep(50);
32+
} catch (InterruptedException e) {
33+
Thread.currentThread().interrupt();
34+
}
35+
});
36+
}
37+
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,23 +75,30 @@ protected ForExecutor(ForExecutorBuilder builder) {
7575
@Override
7676
protected CompletableFuture<WorkflowModel> internalExecute(
7777
WorkflowContext workflow, TaskContext taskContext) {
78-
Iterator<?> iter = collectionExpr.apply(workflow, taskContext, taskContext.input()).iterator();
79-
int i = 0;
80-
CompletableFuture<WorkflowModel> future =
81-
CompletableFuture.completedFuture(taskContext.input());
82-
while (iter.hasNext()) {
78+
return buildLoopFuture(
79+
workflow,
80+
taskContext,
81+
taskContext.input(),
82+
collectionExpr.apply(workflow, taskContext, taskContext.input()).iterator(),
83+
-1);
84+
}
85+
86+
private CompletableFuture<WorkflowModel> buildLoopFuture(
87+
WorkflowContext workflow,
88+
TaskContext taskContext,
89+
WorkflowModel input,
90+
Iterator<?> iter,
91+
int index) {
92+
final int newIndex = index + 1;
93+
if (iter.hasNext()) {
8394
taskContext.variables().put(task.getFor().getEach(), iter.next());
84-
taskContext.variables().put(task.getFor().getAt(), i++);
85-
if (whileExpr.map(w -> w.test(workflow, taskContext, taskContext.input())).orElse(true)) {
86-
future =
87-
future.thenCompose(
88-
input ->
89-
TaskExecutorHelper.processTaskList(
90-
taskExecutor, workflow, Optional.of(taskContext), input));
91-
} else {
92-
break;
95+
taskContext.variables().put(task.getFor().getAt(), newIndex);
96+
if (whileExpr.map(w -> w.test(workflow, taskContext, input)).orElse(true)) {
97+
return TaskExecutorHelper.processTaskList(
98+
taskExecutor, workflow, Optional.of(taskContext), input)
99+
.thenCompose(output -> buildLoopFuture(workflow, taskContext, output, iter, newIndex));
93100
}
94101
}
95-
return future;
102+
return CompletableFuture.completedFuture(input);
96103
}
97104
}

0 commit comments

Comments
 (0)