forked from deepnoodle-ai/workflow
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecution_callbacks.go
More file actions
148 lines (124 loc) · 4.37 KB
/
execution_callbacks.go
File metadata and controls
148 lines (124 loc) · 4.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package workflow
import (
"context"
"time"
)
// ExecutionCallbacks defines the callback interface for workflow execution events
type ExecutionCallbacks interface {
// Workflow-level callbacks
BeforeWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)
AfterWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent)
// Path-level callbacks
BeforeBranchExecution(ctx context.Context, event *BranchExecutionEvent)
AfterBranchExecution(ctx context.Context, event *BranchExecutionEvent)
// Activity-level callbacks
BeforeActivityExecution(ctx context.Context, event *ActivityExecutionEvent)
AfterActivityExecution(ctx context.Context, event *ActivityExecutionEvent)
}
// WorkflowExecutionEvent provides context for workflow-level execution events
type WorkflowExecutionEvent struct {
ExecutionID string
WorkflowName string
Status ExecutionStatus
StartTime time.Time
EndTime time.Time
Duration time.Duration
Inputs map[string]any
Outputs map[string]any
PathCount int
Error error
}
// BranchExecutionEvent provides context for branch-level execution events
type BranchExecutionEvent struct {
ExecutionID string
WorkflowName string
BranchID string
Status ExecutionStatus
StartTime time.Time
EndTime time.Time
Duration time.Duration
CurrentStep string
StepOutputs map[string]any
Error error
}
// ActivityExecutionEvent provides context for activity execution events
type ActivityExecutionEvent struct {
ExecutionID string
WorkflowName string
BranchID string
StepName string
ActivityName string
Parameters map[string]any
Result any
StartTime time.Time
EndTime time.Time
Duration time.Duration
Error error
}
// BaseExecutionCallbacks provides a default implementation that does nothing
type BaseExecutionCallbacks struct{}
func (n *BaseExecutionCallbacks) BeforeWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent) {
// noop
}
func (n *BaseExecutionCallbacks) AfterWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent) {
// noop
}
func (n *BaseExecutionCallbacks) BeforeBranchExecution(ctx context.Context, event *BranchExecutionEvent) {
// noop
}
func (n *BaseExecutionCallbacks) AfterBranchExecution(ctx context.Context, event *BranchExecutionEvent) {
// noop
}
func (n *BaseExecutionCallbacks) BeforeActivityExecution(ctx context.Context, event *ActivityExecutionEvent) {
// noop
}
func (n *BaseExecutionCallbacks) AfterActivityExecution(ctx context.Context, event *ActivityExecutionEvent) {
// noop
}
// NewBaseExecutionCallbacks creates a new no-op callbacks implementation.
// Embed this in your own callbacks to get a default implementation that does nothing.
func NewBaseExecutionCallbacks() ExecutionCallbacks {
return &BaseExecutionCallbacks{}
}
// CallbackChain allows chaining multiple callback implementations
type CallbackChain struct {
callbacks []ExecutionCallbacks
}
// NewCallbackChain creates a new callback chain
func NewCallbackChain(callbacks ...ExecutionCallbacks) *CallbackChain {
return &CallbackChain{callbacks: callbacks}
}
// Add adds a callback to the chain
func (c *CallbackChain) Add(callback ExecutionCallbacks) {
c.callbacks = append(c.callbacks, callback)
}
func (c *CallbackChain) BeforeWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent) {
for _, callback := range c.callbacks {
callback.BeforeWorkflowExecution(ctx, event)
}
}
func (c *CallbackChain) AfterWorkflowExecution(ctx context.Context, event *WorkflowExecutionEvent) {
for _, callback := range c.callbacks {
callback.AfterWorkflowExecution(ctx, event)
}
}
func (c *CallbackChain) BeforeBranchExecution(ctx context.Context, event *BranchExecutionEvent) {
for _, callback := range c.callbacks {
callback.BeforeBranchExecution(ctx, event)
}
}
func (c *CallbackChain) AfterBranchExecution(ctx context.Context, event *BranchExecutionEvent) {
for _, callback := range c.callbacks {
callback.AfterBranchExecution(ctx, event)
}
}
func (c *CallbackChain) BeforeActivityExecution(ctx context.Context, event *ActivityExecutionEvent) {
for _, callback := range c.callbacks {
callback.BeforeActivityExecution(ctx, event)
}
}
func (c *CallbackChain) AfterActivityExecution(ctx context.Context, event *ActivityExecutionEvent) {
for _, callback := range c.callbacks {
callback.AfterActivityExecution(ctx, event)
}
}