1717
1818import static io .serverlessworkflow .fluent .func .dsl .FuncDSL .*;
1919import static org .assertj .core .api .Assertions .assertThat ;
20+ import static org .awaitility .Awaitility .await ;
2021
22+ import io .cloudevents .CloudEvent ;
23+ import io .cloudevents .core .data .PojoCloudEventData ;
2124import io .serverlessworkflow .api .types .Workflow ;
2225import io .serverlessworkflow .fluent .func .FuncWorkflowBuilder ;
2326import io .serverlessworkflow .impl .WorkflowApplication ;
2427import io .serverlessworkflow .impl .WorkflowModel ;
28+ import io .serverlessworkflow .impl .events .InMemoryEvents ;
29+ import java .time .Duration ;
30+ import java .util .Collection ;
2531import java .util .List ;
32+ import java .util .Map ;
33+ import java .util .concurrent .CopyOnWriteArrayList ;
2634import org .junit .jupiter .api .Test ;
2735
2836public class ForEachFuncTest {
@@ -33,8 +41,10 @@ private static record EnhancedOrder(String id, int salary) {}
3341
3442 private static record OrdersPayload (List <Order > orders ) {}
3543
44+ private static record OrderName (String id , String name ) {}
45+
3646 @ Test
37- void testForEachIteration () throws Exception {
47+ void testForEachIteration () {
3848
3949 Workflow workflow =
4050 FuncWorkflowBuilder .workflow ("foreach-workflow" )
@@ -51,6 +61,55 @@ void testForEachIteration() throws Exception {
5161 }
5262 }
5363
64+ @ Test
65+ void testForEachEmit () {
66+
67+ String eventType = "test.item.emitted" ;
68+ Workflow workflow =
69+ FuncWorkflowBuilder .workflow ("forEach-bug-reproducer" )
70+ .tasks (
71+ // ForEach should emit 3 events, one per item
72+ forEach (
73+ (Collection <Map <String , String >> items ) -> items ,
74+ emitJson (eventType , Map .class ).inputFrom ("$item" )))
75+ .build ();
76+
77+ List <CloudEvent > publishedEvents = new CopyOnWriteArrayList <>();
78+ InMemoryEvents eventBroker = new InMemoryEvents ();
79+ eventBroker .register (eventType , ce -> publishedEvents .add (ce ));
80+
81+ try (WorkflowApplication app =
82+ WorkflowApplication .builder ()
83+ .withEventConsumer (eventBroker )
84+ .withEventPublisher (eventBroker )
85+ .build ()) {
86+ app .workflowDefinition (workflow )
87+ .instance (
88+ List .of (
89+ new OrderName ("item-1" , "first" ),
90+ new OrderName ("item-2" , "second" ),
91+ new OrderName ("item-3" , "third" )))
92+ .start ()
93+ .join ();
94+ await ()
95+ .atMost (Duration .ofSeconds (2 ))
96+ .pollInterval (Duration .ofMillis (10 ))
97+ .until (() -> publishedEvents .size () == 3 );
98+
99+ assertThat (
100+ publishedEvents .stream ()
101+ .map (CloudEvent ::getData )
102+ .map (PojoCloudEventData .class ::cast )
103+ .map (p -> p .getValue ())
104+ .toList ())
105+ .isEqualTo (
106+ List .of (
107+ Map .of ("id" , "item-1" , "name" , "first" ),
108+ Map .of ("id" , "item-2" , "name" , "second" ),
109+ Map .of ("id" , "item-3" , "name" , "third" )));
110+ }
111+ }
112+
54113 private static EnhancedOrder enhace (Order order ) {
55114 return new EnhancedOrder (order .id (), 1000 );
56115 }
0 commit comments