-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecutor.go
More file actions
104 lines (89 loc) · 2.75 KB
/
executor.go
File metadata and controls
104 lines (89 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package async
import "sync"
// An Executor is a coroutine spawner, and a coroutine runner.
//
// When a coroutine is spawned or resumed, it is added into an internal queue.
// The Run method then pops and runs each of them from the queue until
// the queue is emptied.
// It is done in a single-threaded manner.
// If one coroutine blocks, no other coroutines can run.
// The best practice is not to block.
//
// The internal queue is a priority queue.
// Coroutines added in the queue are sorted by their weights.
// Coroutines with the same weight are sorted by their levels
// (child coroutines have one level higher than their parent ones).
// Coroutines with the same weight and level are sorted by their arrival
// order (FIFO).
// Popping the queue removes the first coroutine with the highest weight or
// the least level.
//
// Manually calling the Run method is usually not desired.
// One would instead use the Autorun method to set up an autorun function to
// calling the Run method automatically whenever a coroutine is spawned or
// resumed.
// An Executor never calls the autorun function twice at the same time.
type Executor struct {
mu sync.Mutex
pq priorityqueue[*Coroutine]
ps panicstack
running bool
autorun func()
}
// Autorun sets up an autorun function to calling the Run method automatically
// whenever a coroutine is spawned or resumed.
//
// One must pass a function that calls the Run method.
//
// If f blocks, the Spawn method may block too.
// The best practice is not to block.
func (e *Executor) Autorun(f func()) {
e.mu.Lock()
e.autorun = f
e.mu.Unlock()
}
// Run pops and runs every coroutine in the queue until the queue is emptied.
//
// Run must not be called twice at the same time.
func (e *Executor) Run() {
e.mu.Lock()
e.running = true
for !e.pq.Empty() {
co := e.pq.Pop()
e.runCoroutine(co)
}
ps := e.ps
e.ps = nil
e.running = false
e.mu.Unlock()
ps.Repanic()
}
// Spawn creates a coroutine with default weight to work on t.
//
// The coroutine is added in a queue. To run it, either call the Run method, or
// call the Autorun method to set up an autorun function beforehand.
//
// Spawn is safe for concurrent use.
func (e *Executor) Spawn(t Task) {
e.SpawnWeighted(0, t)
}
// SpawnWeighted creates a coroutine with weight w to work on t.
//
// The coroutine is added in a queue. To run it, either call the Run method, or
// call the Autorun method to set up an autorun function beforehand.
//
// SpawnWeighted is safe for concurrent use.
func (e *Executor) SpawnWeighted(w Weight, t Task) {
var autorun func()
co := newCoroutine().init(e, t).withWeight(w)
e.mu.Lock()
if !e.running && e.autorun != nil {
e.running = true
autorun = e.autorun
}
e.pq.Push(co)
e.mu.Unlock()
if autorun != nil {
autorun()
}
}