forked from deepnoodle-ai/workflow
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcheckpointer_file.go
More file actions
196 lines (168 loc) · 6.2 KB
/
checkpointer_file.go
File metadata and controls
196 lines (168 loc) · 6.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package workflow
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"
)
// FileCheckpointer is a file-based implementation that persists checkpoints to disk
type FileCheckpointer struct {
dataDir string
}
// NewFileCheckpointer creates a new file-based checkpointer
func NewFileCheckpointer(dataDir string) (*FileCheckpointer, error) {
if dataDir == "" {
homeDir, err := os.UserHomeDir()
if err != nil {
return nil, fmt.Errorf("failed to get user home directory: %w", err)
}
dataDir = filepath.Join(homeDir, ".deepnoodle", "workflows", "executions")
}
// Ensure the data directory exists
if err := os.MkdirAll(dataDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create data directory %s: %w", dataDir, err)
}
return &FileCheckpointer{dataDir: dataDir}, nil
}
// SaveCheckpoint saves the execution checkpoint to disk
func (c *FileCheckpointer) SaveCheckpoint(ctx context.Context, checkpoint *Checkpoint) error {
if err := validateExecutionID(checkpoint.ExecutionID); err != nil {
return fmt.Errorf("FileCheckpointer.SaveCheckpoint: %w", err)
}
executionDir := filepath.Join(c.dataDir, checkpoint.ExecutionID)
if err := os.MkdirAll(executionDir, 0755); err != nil {
return fmt.Errorf("failed to create execution directory: %w", err)
}
// Save the checkpoint as JSON
checkpointPath := filepath.Join(executionDir, fmt.Sprintf("checkpoint-%s.json", checkpoint.ID))
data, err := json.MarshalIndent(checkpoint, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal checkpoint: %w", err)
}
if err := os.WriteFile(checkpointPath, data, 0644); err != nil {
return fmt.Errorf("failed to write checkpoint file: %w", err)
}
// Update the latest checkpoint symlink
latestPath := filepath.Join(executionDir, "latest.json")
if err := c.updateLatestSymlink(checkpointPath, latestPath); err != nil {
return fmt.Errorf("failed to update latest symlink: %w", err)
}
return nil
}
// LoadCheckpoint loads the latest checkpoint for an execution
func (c *FileCheckpointer) LoadCheckpoint(ctx context.Context, executionID string) (*Checkpoint, error) {
if err := validateExecutionID(executionID); err != nil {
return nil, fmt.Errorf("FileCheckpointer.LoadCheckpoint: %w", err)
}
latestPath := filepath.Join(c.dataDir, executionID, "latest.json")
// Check if latest checkpoint exists
if _, err := os.Stat(latestPath); os.IsNotExist(err) {
return nil, nil // No checkpoint found
}
// Read the checkpoint file
data, err := os.ReadFile(latestPath)
if err != nil {
return nil, fmt.Errorf("failed to read checkpoint file: %w", err)
}
var checkpoint Checkpoint
if err := json.Unmarshal(data, &checkpoint); err != nil {
return nil, fmt.Errorf("failed to unmarshal checkpoint: %w", err)
}
if checkpoint.SchemaVersion < 1 || checkpoint.SchemaVersion > CheckpointSchemaVersion {
return nil, fmt.Errorf("checkpoint schema version %d is not supported (supported: 1..%d)",
checkpoint.SchemaVersion, CheckpointSchemaVersion)
}
return &checkpoint, nil
}
// DeleteCheckpoint removes all checkpoint data for an execution
func (c *FileCheckpointer) DeleteCheckpoint(ctx context.Context, executionID string) error {
if err := validateExecutionID(executionID); err != nil {
return fmt.Errorf("FileCheckpointer.DeleteCheckpoint: %w", err)
}
executionDir := filepath.Join(c.dataDir, executionID)
if err := os.RemoveAll(executionDir); err != nil {
return fmt.Errorf("failed to delete execution directory: %w", err)
}
return nil
}
// ListExecutions returns a list of all executions with their latest checkpoint info
func (c *FileCheckpointer) ListExecutions(ctx context.Context) ([]*ExecutionSummary, error) {
entries, err := os.ReadDir(c.dataDir)
if err != nil {
if os.IsNotExist(err) {
return []*ExecutionSummary{}, nil // No executions directory yet
}
return nil, fmt.Errorf("failed to read executions directory: %w", err)
}
var summaries []*ExecutionSummary
for _, entry := range entries {
if !entry.IsDir() {
continue
}
executionID := entry.Name()
summary, err := c.getExecutionSummary(executionID)
if err != nil {
// Skip executions we can't read
continue
}
if summary != nil {
summaries = append(summaries, summary)
}
}
// Sort by start time (newest first)
sort.Slice(summaries, func(i, j int) bool {
return summaries[i].StartTime.After(summaries[j].StartTime)
})
return summaries, nil
}
// getExecutionSummary reads the latest checkpoint and creates a summary
func (c *FileCheckpointer) getExecutionSummary(executionID string) (*ExecutionSummary, error) {
checkpoint, err := c.LoadCheckpoint(context.Background(), executionID)
if err != nil || checkpoint == nil {
return nil, err
}
return &ExecutionSummary{
ExecutionID: checkpoint.ExecutionID,
WorkflowName: checkpoint.WorkflowName,
Status: string(checkpoint.Status),
StartTime: checkpoint.StartTime,
EndTime: checkpoint.EndTime,
Duration: c.calculateDuration(checkpoint),
Error: checkpoint.Error,
}, nil
}
// calculateDuration calculates the execution duration
func (c *FileCheckpointer) calculateDuration(checkpoint *Checkpoint) time.Duration {
if !checkpoint.EndTime.IsZero() {
return checkpoint.EndTime.Sub(checkpoint.StartTime)
}
// If still running, calculate duration from start to checkpoint time
return checkpoint.CheckpointAt.Sub(checkpoint.StartTime)
}
// updateLatestSymlink updates the symlink to point to the latest checkpoint
func (c *FileCheckpointer) updateLatestSymlink(checkpointPath, latestPath string) error {
// Remove existing symlink if it exists
if _, err := os.Lstat(latestPath); err == nil {
if err := os.Remove(latestPath); err != nil {
return fmt.Errorf("failed to remove existing latest symlink: %w", err)
}
}
// On Windows, copy the file instead of creating a symlink
if strings.Contains(os.Getenv("OS"), "Windows") {
data, err := os.ReadFile(checkpointPath)
if err != nil {
return fmt.Errorf("failed to read checkpoint for copy: %w", err)
}
return os.WriteFile(latestPath, data, 0644)
}
// Create relative symlink
rel, err := filepath.Rel(filepath.Dir(latestPath), checkpointPath)
if err != nil {
return fmt.Errorf("failed to create relative path: %w", err)
}
return os.Symlink(rel, latestPath)
}