forked from deepnoodle-ai/workflow
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecution.go
More file actions
1581 lines (1394 loc) · 53.7 KB
/
execution.go
File metadata and controls
1581 lines (1394 loc) · 53.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package workflow
import (
"context"
"crypto/rand"
"encoding/base32"
"errors"
"fmt"
"io"
"log/slog"
"strings"
"sync"
"time"
"github.com/deepnoodle-ai/workflow/script"
)
// NewExecutionID returns a new opaque ID suitable for identifying an
// execution. Format: "exec_" followed by 16 bytes of base32-encoded
// entropy (26 chars, lowercased, no padding).
func NewExecutionID() string {
var b [16]byte
if _, err := rand.Read(b[:]); err != nil {
panic(fmt.Errorf("workflow: reading entropy for execution ID: %w", err))
}
enc := base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(b[:])
return "exec_" + strings.ToLower(enc)
}
// ExecutionStatus represents the execution status
type ExecutionStatus string
const (
ExecutionStatusPending ExecutionStatus = "pending"
ExecutionStatusRunning ExecutionStatus = "running"
// ExecutionStatusWaiting is for branches that are blocked mid-run on a
// join — their goroutine is parked on an in-process channel and the
// execution is still live. Waiting is not a terminal state.
ExecutionStatusWaiting ExecutionStatus = "waiting"
// ExecutionStatusSuspended is for branches hard-suspended on a durable
// wait (signal-wait, durable sleep). Their goroutine has exited and
// they only live in the checkpoint. The execution cannot make
// progress without external input (signal, wall-clock). When all
// active branches are suspended, the execution loop exits and the
// execution's final status is Suspended.
ExecutionStatusSuspended ExecutionStatus = "suspended"
// ExecutionStatusPaused is for branches parked by an explicit pause —
// either an external PauseBranch call or a declarative Pause step.
// Unlike Suspended, a paused branch has no declared resumption
// condition; an external actor must clear the flag via UnpauseBranch
// before the branch can continue. Paused is reported independently
// from Suspended in SuspensionInfo so operators can distinguish the
// two when deciding what action to take.
ExecutionStatusPaused ExecutionStatus = "paused"
ExecutionStatusCompleted ExecutionStatus = "completed"
ExecutionStatusFailed ExecutionStatus = "failed"
)
// ExecutionOption is a functional option for NewExecution.
type ExecutionOption func(*executionConfig)
// executionConfig collects all optional parameters. It is an internal
// implementation detail; consumers compose it through With* options.
type executionConfig struct {
inputs map[string]any
activityLogger ActivityLogger
checkpointer Checkpointer
logger *slog.Logger
executionID string
scriptCompiler script.Compiler
executionCallbacks ExecutionCallbacks
stepProgressStore StepProgressStore
signalStore SignalStore
}
// WithInputs sets the workflow input values for this execution. Values
// not present here fall back to the Input.Default declared on the
// workflow. Extra keys not declared on the workflow are rejected.
func WithInputs(m map[string]any) ExecutionOption {
return func(c *executionConfig) { c.inputs = m }
}
// WithCheckpointer configures where checkpoint snapshots are saved.
// Defaults to a null checkpointer that discards everything.
func WithCheckpointer(cp Checkpointer) ExecutionOption {
return func(c *executionConfig) { c.checkpointer = cp }
}
// WithSignalStore configures the signal delivery rendezvous used by
// workflow.Wait and declarative WaitSignal steps. Required for any
// workflow that uses signals.
func WithSignalStore(ss SignalStore) ExecutionOption {
return func(c *executionConfig) { c.signalStore = ss }
}
// WithLogger sets the structured logger. Defaults to a discard logger.
func WithLogger(l *slog.Logger) ExecutionOption {
return func(c *executionConfig) { c.logger = l }
}
// WithExecutionID sets a fixed execution ID. When omitted, a new ID is
// generated via NewExecutionID. Use this when your orchestration layer
// (queue, DB) needs to know the ID before NewExecution is called.
func WithExecutionID(id string) ExecutionOption {
return func(c *executionConfig) { c.executionID = id }
}
// WithExecutionCallbacks installs lifecycle callbacks. Defaults to no-op.
func WithExecutionCallbacks(cb ExecutionCallbacks) ExecutionOption {
return func(c *executionConfig) { c.executionCallbacks = cb }
}
// WithStepProgressStore configures a store that receives progress
// updates as steps transition between states. Calls are async and
// store latency does not affect execution speed.
func WithStepProgressStore(s StepProgressStore) ExecutionOption {
return func(c *executionConfig) { c.stepProgressStore = s }
}
// WithActivityLogger configures where per-activity invocation logs
// are written. Defaults to a null logger.
func WithActivityLogger(al ActivityLogger) ExecutionOption {
return func(c *executionConfig) { c.activityLogger = al }
}
// WithScriptCompiler overrides the default script compiler used to
// evaluate parameter templates and edge conditions. Defaults to the
// built-in expr compiler.
func WithScriptCompiler(sc script.Compiler) ExecutionOption {
return func(c *executionConfig) { c.scriptCompiler = sc }
}
// ExecuteOption configures a single call to Execution.Execute.
type ExecuteOption func(*executeConfig)
type executeConfig struct {
priorExecutionID string
}
// ResumeFrom tells Execute to load the checkpoint for priorID and
// resume. If no checkpoint is found, Execute proceeds with a fresh
// run — the semantics of the deleted RunOrResume.
func ResumeFrom(priorID string) ExecuteOption {
return func(c *executeConfig) { c.priorExecutionID = priorID }
}
// Execution represents a simplified workflow execution with checkpointing
type Execution struct {
workflow *Workflow
// Unified state management - replaces scattered fields
state *executionState
// Runtime branch tracking (not checkpointed). activeBranches is
// written by the orchestrator goroutine (runBranches,
// processBranchSnapshot, loadCheckpoint) and read by external
// callers of PauseBranch/UnpauseBranch. All reads and writes go
// through the activeBranchesMu-protected helpers defined below so
// concurrent external pause calls cannot race with orchestrator
// mutations.
activeBranchesMu sync.Mutex
activeBranches map[string]*branch
branchSnapshots chan branchSnapshot
// Branch options template (reused for all branches)
branchOptions branchOptions
// Infrastructure dependencies
activityLogger ActivityLogger
compiler script.Compiler
checkpointer Checkpointer
activities map[string]Activity
executionCallbacks ExecutionCallbacks
signalStore SignalStore
adapter *executionAdapter
logger *slog.Logger
// Step progress tracking
stepProgressTracker *stepProgressTracker
// Single mutex for orchestration data
mutex sync.RWMutex
doneWg sync.WaitGroup
started bool
ran bool // true once run() begins; distinguishes start() reuse from run() failure
checkpointCounter int
// checkpointMu serialises saveCheckpoint calls so concurrent
// writers (activity goroutines under executeActivity + the
// orchestrator goroutine under processBranchSnapshot) cannot race
// on checkpointCounter or the underlying Checkpointer. Distinct
// from mutex to avoid interacting with the existing RWMutex
// protocol around activeBranches and started/ran.
checkpointMu sync.Mutex
}
// NewExecution creates a new execution for the given workflow and
// activity registry. Every configurable knob is a functional option.
func NewExecution(wf *Workflow, reg *ActivityRegistry, opts ...ExecutionOption) (*Execution, error) {
if wf == nil {
return nil, fmt.Errorf("workflow: workflow is required")
}
if reg == nil {
return nil, fmt.Errorf("workflow: activity registry is required")
}
cfg := &executionConfig{}
for _, opt := range opts {
opt(cfg)
}
if cfg.scriptCompiler == nil {
cfg.scriptCompiler = DefaultScriptCompiler()
}
if cfg.logger == nil {
cfg.logger = slog.New(slog.NewTextHandler(io.Discard, nil))
}
if cfg.activityLogger == nil {
cfg.activityLogger = NewNullActivityLogger()
}
if cfg.checkpointer == nil {
cfg.checkpointer = NewNullCheckpointer()
}
if cfg.executionID == "" {
cfg.executionID = NewExecutionID()
}
if cfg.executionCallbacks == nil {
cfg.executionCallbacks = &BaseExecutionCallbacks{}
}
// Binding-level validation: activity references, templates,
// expressions, and store-path shape. Runs after defaults are
// applied so the compiler and logger are always present.
if err := wf.validateBinding(reg, cfg.scriptCompiler, cfg.signalStore != nil, cfg.logger); err != nil {
return nil, err
}
// Determine input values from inputs map or defaults.
inputs := make(map[string]any, len(cfg.inputs))
for _, input := range wf.Inputs() {
if v, ok := cfg.inputs[input.Name]; ok {
inputs[input.Name] = v
} else {
if input.Default == nil {
return nil, fmt.Errorf("input %q is required", input.Name)
}
inputs[input.Name] = input.Default
}
}
for k := range cfg.inputs {
if _, ok := inputs[k]; !ok {
return nil, fmt.Errorf("unknown input %q", k)
}
}
activities := reg.asMap()
state := newExecutionState(cfg.executionID, wf.Name(), inputs)
execution := &Execution{
workflow: wf,
state: state,
activityLogger: cfg.activityLogger,
checkpointer: cfg.checkpointer,
activeBranches: map[string]*branch{},
branchSnapshots: make(chan branchSnapshot, 100),
activities: activities,
logger: cfg.logger.With("execution_id", cfg.executionID),
compiler: cfg.scriptCompiler,
executionCallbacks: cfg.executionCallbacks,
signalStore: cfg.signalStore,
}
execution.adapter = &executionAdapter{execution: execution}
// Wire step progress tracker if a store is configured.
if cfg.stepProgressStore != nil {
tracker := newStepProgressTracker(cfg.executionID, cfg.stepProgressStore, execution.logger)
execution.stepProgressTracker = tracker
chain := NewCallbackChain(execution.executionCallbacks, tracker)
execution.executionCallbacks = chain
}
// Set up branch options template. ExecutionID is populated per-call in
// createBranch* from e.state.ID() so that a resumed execution whose ID
// was restored from a checkpoint sees the right value.
execution.branchOptions = branchOptions{
Workflow: wf,
ActivityRegistry: activities,
Logger: cfg.logger,
Inputs: copyMap(inputs),
Variables: copyMap(wf.InitialState()),
activityExecutor: execution.adapter,
UpdatesChannel: execution.branchSnapshots,
ScriptCompiler: cfg.scriptCompiler,
SignalStore: cfg.signalStore,
// Hand the (chain-wrapped) execution callbacks to every branch
// so branch-level failures that never reach the activity adapter
// — e.g. a parameter-template evaluation error in
// executeStepOnce — can still synthesise an
// AfterActivityExecution(error) event and keep the
// step-progress tracker honest.
ExecutionCallbacks: execution.executionCallbacks,
}
return execution, nil
}
// --- activeBranches helpers (mutex-protected) ---
// addActiveBranch registers a running branch under activeBranchesMu.
func (e *Execution) addActiveBranch(branchID string, p *branch) {
e.activeBranchesMu.Lock()
defer e.activeBranchesMu.Unlock()
e.activeBranches[branchID] = p
}
// removeActiveBranch removes a branch under activeBranchesMu.
func (e *Execution) removeActiveBranch(branchID string) {
e.activeBranchesMu.Lock()
defer e.activeBranchesMu.Unlock()
delete(e.activeBranches, branchID)
}
// getActiveBranch looks up a running branch by ID under activeBranchesMu.
func (e *Execution) getActiveBranch(branchID string) (*branch, bool) {
e.activeBranchesMu.Lock()
defer e.activeBranchesMu.Unlock()
p, ok := e.activeBranches[branchID]
return p, ok
}
// activeBranchCount returns the number of running branches under
// activeBranchesMu. Used by the orchestrator loop condition and by
// callbacks that report branch counts.
func (e *Execution) activeBranchCount() int {
e.activeBranchesMu.Lock()
defer e.activeBranchesMu.Unlock()
return len(e.activeBranches)
}
// activeBranchesSnapshot returns a slice of the current active branches
// suitable for iteration without holding activeBranchesMu. Used by the
// resume branch to hand off branches to runBranches.
func (e *Execution) activeBranchesSnapshot() []*branch {
e.activeBranchesMu.Lock()
defer e.activeBranchesMu.Unlock()
out := make([]*branch, 0, len(e.activeBranches))
for _, p := range e.activeBranches {
out = append(out, p)
}
return out
}
// resetActiveBranches reinitialises the active branches map under
// activeBranchesMu. Used by loadCheckpoint.
func (e *Execution) resetActiveBranches() {
e.activeBranchesMu.Lock()
defer e.activeBranchesMu.Unlock()
e.activeBranches = make(map[string]*branch)
}
// ID returns the execution ID
func (e *Execution) ID() string {
return e.state.ID()
}
// Status returns the current execution status
func (e *Execution) Status() ExecutionStatus {
return e.state.GetStatus()
}
// GetOutputs returns the current execution outputs
func (e *Execution) GetOutputs() map[string]any {
return e.state.GetOutputs()
}
// saveCheckpoint saves the current execution state. Safe to call
// concurrently from the orchestrator goroutine and from activity
// goroutines; calls are serialised via checkpointMu so writers cannot
// race on the counter or the backing Checkpointer.
func (e *Execution) saveCheckpoint(ctx context.Context) error {
e.checkpointMu.Lock()
defer e.checkpointMu.Unlock()
e.checkpointCounter++
checkpoint := e.state.ToCheckpoint()
checkpoint.ID = fmt.Sprintf("%d", e.checkpointCounter)
checkpoint.SchemaVersion = CheckpointSchemaVersion
return e.checkpointer.SaveCheckpoint(ctx, checkpoint)
}
// loadCheckpoint loads execution state from the latest checkpoint.
//
// The checkpoint's execution ID is preserved as the execution's identity.
// Callers that need to resume into a specific ID should pass it via
// ExecutionOptions.ExecutionID when constructing the execution; that ID must
// match the checkpoint's ID. Rotating the ID on resume would silently break
// SignalStore lookups keyed on (executionID, topic).
func (e *Execution) loadCheckpoint(ctx context.Context, priorExecutionID string) error {
// Load state from checkpoint
checkpoint, err := e.checkpointer.LoadCheckpoint(ctx, priorExecutionID)
if err != nil {
return fmt.Errorf("loading checkpoint: %w", err)
}
if checkpoint == nil {
return fmt.Errorf("%w: execution %q", ErrNoCheckpoint, priorExecutionID)
}
if checkpoint.SchemaVersion < 1 || checkpoint.SchemaVersion > CheckpointSchemaVersion {
return fmt.Errorf("checkpoint schema version %d is not supported (supported: 1..%d)",
checkpoint.SchemaVersion, CheckpointSchemaVersion)
}
e.state.FromCheckpoint(checkpoint)
// Preserve the checkpoint's execution ID so signals keyed on
// (executionID, topic) remain discoverable across resumes.
lastStatus := e.state.GetStatus()
// If the prior execution completed, there's nothing to do
if lastStatus == ExecutionStatusCompleted {
return nil
}
// Handle failed executions
if lastStatus == ExecutionStatusFailed {
// Reset failed branches for resumption
if err := e.resetFailedBranches(); err != nil {
return fmt.Errorf("failed to reset failed branches for resumption: %w", err)
}
originalErr := e.state.GetError()
if originalErr != nil {
e.logger.Info("resuming execution from failure", "original_error", originalErr.Error())
}
// Clear any previous error and reset status to running
e.state.SetError(nil)
e.state.SetStatus(ExecutionStatusRunning)
}
// Rebuild active branches for branches that should be running. Suspended
// and Paused branches rejoin the run loop too: a suspended branch can
// replay its activity and either consume a pending signal or
// re-suspend; a paused branch immediately re-parks at its first
// step boundary unless UnpauseBranch has cleared the flag prior
// to the Resume call.
branchStates := e.state.GetBranchStates()
e.resetActiveBranches()
for id, branchState := range branchStates {
switch branchState.Status {
case ExecutionStatusRunning, ExecutionStatusPending, ExecutionStatusWaiting, ExecutionStatusSuspended, ExecutionStatusPaused:
currentStep, ok := e.workflow.GetStep(branchState.CurrentStep)
if !ok {
return fmt.Errorf("step %q not found in workflow for branch %s", branchState.CurrentStep, id)
}
// Restore branch with its stored variables from checkpoint
e.addActiveBranch(id, e.createBranchWithVariables(id, currentStep, branchState.Variables))
}
}
e.logger.Info("loaded execution from checkpoint",
"status", e.state.GetStatus(),
"branches", len(branchStates),
"active_paths", e.activeBranchCount(),
"branch_counter", e.state.pathCounter)
return nil
}
func (e *Execution) start() error {
e.mutex.Lock()
defer e.mutex.Unlock()
if e.started {
return ErrAlreadyStarted
}
e.started = true
return nil
}
// Execute runs the workflow and returns a structured ExecutionResult.
//
// By default, Execute starts a fresh run. Pass ResumeFrom(priorID) to
// resume from a previous execution's checkpoint. If ResumeFrom is set
// and no checkpoint exists for priorID, Execute proceeds with a fresh
// run.
//
// An error return means the execution could not be attempted
// (infrastructure failure). When error is nil, result is non-nil and
// contains the execution outcome — including workflow-level failures,
// which are represented in result.Error rather than the error return.
func (e *Execution) Execute(ctx context.Context, opts ...ExecuteOption) (*ExecutionResult, error) {
cfg := &executeConfig{}
for _, opt := range opts {
opt(cfg)
}
err := e.runExecution(ctx, cfg.priorExecutionID)
return e.buildResult(err)
}
// runExecution is the internal run-or-resume implementation.
func (e *Execution) runExecution(ctx context.Context, priorExecutionID string) error {
e.ran = false
if priorExecutionID != "" {
err := e.resumeFromCheckpoint(ctx, priorExecutionID)
if err == nil {
return nil
}
if !errors.Is(err, ErrNoCheckpoint) {
return err
}
// No checkpoint found — fall through to a fresh run.
}
if err := e.start(); err != nil {
return err
}
return e.run(ctx)
}
// resumeFromCheckpoint loads the prior checkpoint, marks the execution
// as started, and runs it to completion. Returns ErrNoCheckpoint if no
// checkpoint exists for priorExecutionID.
func (e *Execution) resumeFromCheckpoint(ctx context.Context, priorExecutionID string) error {
// Load checkpoint FIRST, before marking as started.
// This way a failed load (e.g., no checkpoint) leaves the execution
// object clean for a subsequent fresh run.
if err := e.loadCheckpoint(ctx, priorExecutionID); err != nil {
return err
}
// Return early if already completed.
if e.state.GetStatus() == ExecutionStatusCompleted {
e.logger.Info("execution already completed from checkpoint")
e.mutex.Lock()
e.started = true
e.mutex.Unlock()
return nil
}
if err := e.start(); err != nil {
return err
}
return e.run(ctx)
}
func (e *Execution) buildResult(runErr error) (*ExecutionResult, error) {
// If the execution was never started, this is an infrastructure error.
if !e.started {
return nil, runErr
}
// If start() succeeded on a prior call but run() never executed in this
// call, the error is an infrastructure failure (e.g., "already started").
if runErr != nil && !e.ran {
return nil, runErr
}
result := &ExecutionResult{
WorkflowName: e.workflow.Name(),
Status: e.state.GetStatus(),
Outputs: e.state.GetOutputs(),
Timing: ExecutionTiming{
StartedAt: e.state.GetStartTime(),
FinishedAt: e.state.GetEndTime(),
},
}
// If execution returned an error but didn't reach a terminal state
// (e.g., context canceled during run), classify it as failed.
if runErr != nil && result.Status != ExecutionStatusCompleted && result.Status != ExecutionStatusFailed {
result.Status = ExecutionStatusFailed
result.Error = ClassifyError(runErr)
if result.Timing.FinishedAt.IsZero() {
result.Timing.FinishedAt = time.Now()
}
} else if result.Status == ExecutionStatusFailed && runErr != nil {
result.Error = ClassifyError(runErr)
if result.Timing.FinishedAt.IsZero() {
result.Timing.FinishedAt = time.Now()
}
}
// Populate SuspensionInfo for dormant terminations (hard-suspended
// on a wait, or paused) so the consumer can schedule resume
// without re-reading the checkpoint.
if result.Status == ExecutionStatusSuspended || result.Status == ExecutionStatusPaused {
result.Suspension = e.buildSuspensionInfo()
}
result.Timing.Duration = result.Timing.FinishedAt.Sub(result.Timing.StartedAt)
return result, nil
}
// buildSuspensionInfo collects the suspension state of every hard-
// suspended or paused branch into a SuspensionInfo. Returns nil if no
// branches are in a dormant state.
//
// Dominant-reason precedence when multiple branches are dormant for
// different reasons: Paused > Sleeping > WaitingSignal. Operators care
// most about "someone has to unpause this"; wall-clock wakeups are
// next; signal waits are the most passive.
func (e *Execution) buildSuspensionInfo() *SuspensionInfo {
branchStates := e.state.GetBranchStates()
info := &SuspensionInfo{}
topicSet := map[string]struct{}{}
reasonRank := map[SuspensionReason]int{
SuspensionReasonWaitingSignal: 1,
SuspensionReasonSleeping: 2,
SuspensionReasonPaused: 3,
}
for _, ps := range branchStates {
if ps.Status != ExecutionStatusSuspended && ps.Status != ExecutionStatusPaused {
continue
}
sp := SuspendedBranch{
BranchID: ps.ID,
StepName: ps.CurrentStep,
}
switch ps.Status {
case ExecutionStatusPaused:
sp.Reason = SuspensionReasonPaused
sp.PauseReason = ps.PauseReason
case ExecutionStatusSuspended:
if ps.Wait != nil {
switch ps.Wait.Kind {
case WaitKindSignal:
sp.Reason = SuspensionReasonWaitingSignal
sp.Topic = ps.Wait.Topic
if ps.Wait.Topic != "" {
topicSet[ps.Wait.Topic] = struct{}{}
}
case WaitKindSleep:
sp.Reason = SuspensionReasonSleeping
}
sp.WakeAt = ps.Wait.WakeAt
if !ps.Wait.WakeAt.IsZero() && (info.WakeAt.IsZero() || ps.Wait.WakeAt.Before(info.WakeAt)) {
info.WakeAt = ps.Wait.WakeAt
}
}
}
if reasonRank[sp.Reason] > reasonRank[info.Reason] {
info.Reason = sp.Reason
}
info.SuspendedBranches = append(info.SuspendedBranches, sp)
}
if len(info.SuspendedBranches) == 0 {
return nil
}
for t := range topicSet {
info.Topics = append(info.Topics, t)
}
return info
}
// run the workflow execution, blocking until completion or error
func (e *Execution) run(ctx context.Context) error {
e.ran = true
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Set initial running status and start time
e.state.SetStatus(ExecutionStatusRunning)
if e.state.GetStartTime().IsZero() {
e.state.SetTiming(time.Now(), time.Time{})
}
// Trigger workflow start callback
e.executionCallbacks.BeforeWorkflowExecution(ctx, &WorkflowExecutionEvent{
ExecutionID: e.state.ID(),
WorkflowName: e.workflow.Name(),
Status: e.state.GetStatus(),
StartTime: e.state.GetStartTime(),
Inputs: copyMap(e.state.GetInputs()),
PathCount: e.activeBranchCount(),
})
// Start execution branches
if e.activeBranchCount() == 0 {
// Starting fresh - create initial branch
startStep := e.workflow.Start()
e.runBranches(ctx, e.createBranch("main", startStep))
} else {
// Resuming from checkpoint - restart active branches
resumingBranches := e.activeBranchesSnapshot()
e.logger.Info("resuming execution from checkpoint", "active_paths", len(resumingBranches))
for _, branch := range resumingBranches {
e.runBranches(ctx, branch)
}
}
// Process branch snapshots
var executionErr error
for e.activeBranchCount() > 0 && executionErr == nil {
select {
case <-ctx.Done():
return ctx.Err()
case snapshot := <-e.branchSnapshots:
if err := e.processBranchSnapshot(ctx, snapshot); err != nil {
executionErr = err
cancel() // cancel any other branches
}
}
}
// Wait for all branches to complete
e.doneWg.Wait()
endTime := time.Now()
duration := endTime.Sub(e.state.GetStartTime())
// Check for failed branches
failedIDs := e.state.GetFailedBranchIDs()
// Check for branches hard-suspended on a durable wait (signal/sleep).
suspendedIDs := e.state.GetSuspendedBranchIDs()
// Check for branches paused by an explicit pause trigger.
pausedIDs := e.state.GetPausedBranchIDs()
// Update final status. Precedence: Failed > Paused > Suspended >
// Completed. Paused outranks Suspended because a paused branch
// requires explicit operator action to clear, while a suspended
// branch has a declared resumption trigger (signal or wall-clock).
//
// An orchestrator-side error (e.g., a checkpoint save failure in
// processBranchSnapshot) forces Failed regardless of branch-level
// state — we must never silently drop an internal error by
// reporting Paused/Suspended/Completed.
finalErr := executionErr
var finalStatus ExecutionStatus
switch {
case finalErr != nil && len(failedIDs) == 0:
// Orchestrator-side failure with no per-branch failure recorded
// (e.g., checkpoint save error). Classify as Failed.
finalStatus = ExecutionStatusFailed
e.logger.Error("execution failed",
"error", finalErr,
"paused_paths", pausedIDs,
"suspended_paths", suspendedIDs)
case len(failedIDs) > 0:
finalStatus = ExecutionStatusFailed
if finalErr == nil {
finalErr = fmt.Errorf("execution failed: %v", failedIDs)
}
e.logger.Error("execution failed", "failed_paths", failedIDs, "error", finalErr)
case len(pausedIDs) > 0:
// Execution is dormant on an explicit pause. Do not extract
// outputs, do not mark failed. Caller clears the pause via
// UnpauseBranch and resumes.
finalStatus = ExecutionStatusPaused
e.logger.Info("execution paused",
"paused_paths", pausedIDs,
"suspended_paths", suspendedIDs,
"duration", duration)
case len(suspendedIDs) > 0:
// Execution is dormant: one or more branches are parked on a durable
// wait. Do not extract outputs, do not mark failed. Caller resumes
// when an external trigger (signal, wall-clock) arrives.
finalStatus = ExecutionStatusSuspended
e.logger.Info("execution suspended",
"suspended_paths", suspendedIDs,
"duration", duration)
default:
finalStatus = ExecutionStatusCompleted
// Extract workflow outputs from final branch variables
if err := e.extractWorkflowOutputs(); err != nil {
e.logger.Error("failed to extract workflow outputs", "error", err)
finalErr = err
finalStatus = ExecutionStatusFailed
}
e.logger.Info("execution completed",
"outputs", e.state.GetOutputs(),
"duration", duration)
}
e.state.SetFinished(finalStatus, time.Now(), finalErr)
// Trigger workflow completion/failure callback
e.executionCallbacks.AfterWorkflowExecution(ctx, &WorkflowExecutionEvent{
ExecutionID: e.state.ID(),
WorkflowName: e.workflow.Name(),
Status: finalStatus,
StartTime: e.state.GetStartTime(),
EndTime: endTime,
Duration: duration,
Inputs: e.state.GetInputs(),
Outputs: e.state.GetOutputs(),
PathCount: len(e.state.GetBranchStates()),
Error: finalErr,
})
// Final checkpoint
if checkpointErr := e.saveCheckpoint(ctx); checkpointErr != nil {
e.logger.Error("failed to save final checkpoint", "error", checkpointErr)
}
return finalErr
}
// extractWorkflowOutputs extracts workflow outputs from final branch variables.
func (e *Execution) extractWorkflowOutputs() error {
branchStates := e.state.GetBranchStates()
outputs := e.workflow.Outputs()
for _, outputDef := range outputs {
outputName := outputDef.Name
variableName := outputDef.Variable
if variableName == "" {
variableName = outputName
}
targetBranch := outputDef.Branch
if targetBranch == "" {
targetBranch = "main"
}
branchState, found := branchStates[targetBranch]
if !found {
return fmt.Errorf("output branch %q not found for output %q", targetBranch, outputName)
}
if value, exists := getNestedField(branchState.Variables, variableName); exists {
e.state.SetOutput(outputName, value)
} else {
return fmt.Errorf("workflow output variable %q not found in branch %q", variableName, targetBranch)
}
}
return nil
}
// runBranches begins executing one or more new execution branches in goroutines.
// It does not wait for the branches to complete.
func (e *Execution) runBranches(ctx context.Context, branches ...*branch) {
for _, br := range branches {
branchID := br.ID()
e.addActiveBranch(branchID, br)
startTime := time.Now()
// Preserve prior BranchState fields (step outputs, pending Wait,
// pause flag, activity history) when a resumed branch is being
// restarted. A freshly-created branch has no prior state, so
// this collapses to the initial set.
existing := e.state.GetBranchStates()[branchID]
var (
stepOutputs map[string]any
pendingWait *WaitState
priorStart time.Time
pauseRequested bool
pauseReason string
activityHistory map[string]any
activityHistoryStep string
)
if existing != nil {
stepOutputs = existing.StepOutputs
pendingWait = existing.Wait
priorStart = existing.StartTime
pauseRequested = existing.PauseRequested
pauseReason = existing.PauseReason
activityHistory = existing.ActivityHistory
activityHistoryStep = existing.ActivityHistoryStep
}
if stepOutputs == nil {
stepOutputs = map[string]any{}
}
if priorStart.IsZero() {
priorStart = startTime
}
e.state.SetBranchState(branchID, &BranchState{
ID: branchID,
Status: ExecutionStatusRunning,
CurrentStep: br.CurrentStep().Name,
StartTime: priorStart,
StepOutputs: stepOutputs,
Variables: br.Variables(), // Store branch's current variables
Wait: pendingWait,
PauseRequested: pauseRequested,
PauseReason: pauseReason,
ActivityHistory: activityHistory,
ActivityHistoryStep: activityHistoryStep,
})
// Trigger branch start callback
e.executionCallbacks.BeforeBranchExecution(ctx, &BranchExecutionEvent{
ExecutionID: e.state.ID(),
WorkflowName: e.workflow.Name(),
BranchID: branchID,
Status: ExecutionStatusRunning,
StartTime: startTime,
CurrentStep: br.CurrentStep().Name,
StepOutputs: map[string]any{},
})
e.doneWg.Add(1)
go func(p *branch) {
defer e.doneWg.Done()
p.Run(ctx)
}(br)
}
}
func (e *Execution) processBranchSnapshot(ctx context.Context, snapshot branchSnapshot) error {
if snapshot.Error != nil {
e.state.UpdateBranchState(snapshot.BranchID, func(state *BranchState) {
state.Status = ExecutionStatusFailed
state.ErrorMessage = snapshot.Error.Error()
state.EndTime = snapshot.EndTime
})
// Trigger branch failure callback
duration := snapshot.EndTime.Sub(snapshot.StartTime)
branchState := e.state.GetBranchStates()[snapshot.BranchID]
e.executionCallbacks.AfterBranchExecution(ctx, &BranchExecutionEvent{
ExecutionID: e.state.ID(),
WorkflowName: e.workflow.Name(),
BranchID: snapshot.BranchID,
Status: ExecutionStatusFailed,
StartTime: snapshot.StartTime,
EndTime: snapshot.EndTime,
Duration: duration,
CurrentStep: snapshot.StepName,
StepOutputs: copyMap(branchState.StepOutputs),
Error: snapshot.Error,
})
return snapshot.Error
}
// Handle join requests
if snapshot.joinRequest != nil {
return e.processJoinRequest(ctx, snapshot)
}
// Handle wait requests: branch parking on a durable wait (signal/sleep).
if snapshot.waitRequest != nil {
e.state.UpdateBranchState(snapshot.BranchID, func(state *BranchState) {
state.Status = ExecutionStatusSuspended
state.CurrentStep = snapshot.waitRequest.StepName
state.Wait = snapshot.waitRequest.Wait
state.EndTime = snapshot.EndTime
if activeBranch, exists := e.getActiveBranch(snapshot.BranchID); exists {
state.Variables = activeBranch.Variables()
}
})
// Hard-suspend: remove from active branches so the run loop exits
// once no running branches remain.
e.removeActiveBranch(snapshot.BranchID)
// Checkpoint the parked state synchronously so resume can find it.
if err := e.saveCheckpoint(ctx); err != nil {
e.logger.Error("failed to save wait checkpoint", "error", err)
return err
}
return nil
}
// Handle pause requests: branch parking due to a pause trigger
// (external PauseBranch or declarative Pause step). The branch's
// pause flag stays set across the checkpoint so a subsequent
// Resume re-parks the branch until UnpauseBranch clears it.
if snapshot.pauseRequest != nil {
e.state.UpdateBranchState(snapshot.BranchID, func(state *BranchState) {
state.Status = ExecutionStatusPaused
state.CurrentStep = snapshot.pauseRequest.StepName
state.PauseRequested = true
state.PauseReason = snapshot.pauseRequest.Reason
state.EndTime = snapshot.EndTime
if activeBranch, exists := e.getActiveBranch(snapshot.BranchID); exists {
state.Variables = activeBranch.Variables()
}
})
e.removeActiveBranch(snapshot.BranchID)
if err := e.saveCheckpoint(ctx); err != nil {
e.logger.Error("failed to save pause checkpoint", "error", err)
return err
}
return nil
}
// Store step output and update status
e.state.UpdateBranchState(snapshot.BranchID, func(state *BranchState) {
state.StepOutputs[snapshot.StepName] = snapshot.StepOutput
state.Status = snapshot.Status
if snapshot.Status == ExecutionStatusCompleted {
state.EndTime = snapshot.EndTime
}
// Advancing past a wait clears any pending wait state on the branch.
state.Wait = nil
// Advancing past a step clears the activity history — no
// cross-step leakage per FR-16. The step-name scope check in
// executeActivity is the primary correctness guarantee; this
// clear keeps checkpoints from accumulating stale history.
state.ActivityHistory = nil
state.ActivityHistoryStep = ""
// Update branch variables from the active branch (if it still exists)
if activeBranch, exists := e.getActiveBranch(snapshot.BranchID); exists {
state.Variables = activeBranch.Variables()
}
})