2020import static io .serverlessworkflow .impl .WorkflowUtils .safeClose ;
2121
2222import io .serverlessworkflow .api .types .Input ;
23- import io .serverlessworkflow .api .types .ListenTo ;
2423import io .serverlessworkflow .api .types .Output ;
2524import io .serverlessworkflow .api .types .Schedule ;
2625import io .serverlessworkflow .api .types .Workflow ;
2726import io .serverlessworkflow .impl .events .EventRegistrationBuilderInfo ;
2827import io .serverlessworkflow .impl .executors .TaskExecutor ;
2928import io .serverlessworkflow .impl .executors .TaskExecutorHelper ;
3029import io .serverlessworkflow .impl .resources .ResourceLoader ;
30+ import io .serverlessworkflow .impl .scheduler .Cancellable ;
3131import io .serverlessworkflow .impl .scheduler .ScheduledEventConsumer ;
32+ import io .serverlessworkflow .impl .scheduler .WorkflowScheduler ;
3233import io .serverlessworkflow .impl .schema .SchemaValidator ;
3334import java .nio .file .Path ;
3435import java .util .HashMap ;
3536import java .util .Map ;
37+ import java .util .Objects ;
3638import java .util .Optional ;
3739
3840public class WorkflowDefinition implements AutoCloseable , WorkflowDefinitionData {
3941
4042 private final Workflow workflow ;
43+ private final WorkflowDefinitionId definitionId ;
4144 private Optional <SchemaValidator > inputSchemaValidator = Optional .empty ();
4245 private Optional <SchemaValidator > outputSchemaValidator = Optional .empty ();
4346 private Optional <WorkflowFilter > inputFilter = Optional .empty ();
@@ -47,10 +50,13 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
4750 private final ResourceLoader resourceLoader ;
4851 private final Map <String , TaskExecutor <?>> executors = new HashMap <>();
4952 private ScheduledEventConsumer scheculedConsumer ;
53+ private Cancellable everySchedule ;
54+ private Cancellable cronSchedule ;
5055
5156 private WorkflowDefinition (
5257 WorkflowApplication application , Workflow workflow , ResourceLoader resourceLoader ) {
5358 this .workflow = workflow ;
59+ this .definitionId = WorkflowDefinitionId .of (workflow );
5460 this .application = application ;
5561 this .resourceLoader = resourceLoader ;
5662
@@ -84,15 +90,28 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
8490 application .resourceLoaderFactory ().getResourceLoader (application , path ));
8591 Schedule schedule = workflow .getSchedule ();
8692 if (schedule != null ) {
87- ListenTo to = schedule . getOn ();
88- if (to != null ) {
93+ WorkflowScheduler scheduler = application . scheduler ();
94+ if (schedule . getOn () != null ) {
8995 definition .scheculedConsumer =
90- application
91- .scheduler ()
92- .eventConsumer (
93- definition ,
94- application .modelFactory ()::from ,
95- EventRegistrationBuilderInfo .from (application , to , x -> null ));
96+ scheduler .eventConsumer (
97+ definition ,
98+ application .modelFactory ()::from ,
99+ EventRegistrationBuilderInfo .from (application , schedule .getOn (), x -> null ));
100+ }
101+ if (schedule .getAfter () != null ) {
102+ application
103+ .schedulerListener ()
104+ .addAfter (definition , WorkflowUtils .fromTimeoutAfter (application , schedule .getAfter ()));
105+ }
106+ if (schedule .getCron () != null ) {
107+ definition .cronSchedule = scheduler .scheduleCron (definition , schedule .getCron ());
108+ }
109+ if (schedule .getEvery () != null ) {
110+ definition .everySchedule =
111+ scheduler .scheduleEvery (
112+ definition ,
113+ WorkflowUtils .fromTimeoutAfter (application , schedule .getEvery ())
114+ .apply (null , null , application .modelFactory ().fromNull ()));
96115 }
97116 }
98117 return definition ;
@@ -148,7 +167,28 @@ public void addTaskExecutor(WorkflowMutablePosition position, TaskExecutor<?> ta
148167
149168 @ Override
150169 public void close () {
151- safeClose (scheculedConsumer );
152170 safeClose (resourceLoader );
171+ safeClose (scheculedConsumer );
172+ application .schedulerListener ().removeAfter (this );
173+ if (everySchedule != null ) {
174+ everySchedule .cancel ();
175+ }
176+ if (cronSchedule != null ) {
177+ cronSchedule .cancel ();
178+ }
179+ }
180+
181+ @ Override
182+ public int hashCode () {
183+ return Objects .hash (definitionId );
184+ }
185+
186+ @ Override
187+ public boolean equals (Object obj ) {
188+ if (this == obj ) return true ;
189+ if (obj == null ) return false ;
190+ if (getClass () != obj .getClass ()) return false ;
191+ WorkflowDefinition other = (WorkflowDefinition ) obj ;
192+ return Objects .equals (definitionId , other .definitionId );
153193 }
154194}
0 commit comments