-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathasync_request.go
More file actions
290 lines (270 loc) · 8.88 KB
/
async_request.go
File metadata and controls
290 lines (270 loc) · 8.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
package middleware
import (
"context"
"crypto/md5"
"encoding/hex"
"errors"
"net/http"
"sync"
"time"
timap "github.com/tiny-go/timap"
)
const (
// StatusWaiting is initial status - job is not started yet.
StatusWaiting JobStatus = iota
// StatusInProgress indicates that job is started but not finished yet.
StatusInProgress
// StatusDone indicates that task was done.
StatusDone
)
// asyncKey is the unique private key which is used (internally) to store/retrieve
// async task from the request context.
type asyncKey struct{}
// JobStatus represents the status of asynchronous task.
type JobStatus int
const (
asyncHeader = "Async-Request"
asyncRequestID = "Async-Request-ID"
asyncRequestAccepted = "Async-Request-Started-At"
asyncRequestKeepUntil = "Async-Request-Keep-Until"
)
var (
// ErrNotCompleted - current job was not completed.
ErrNotCompleted = errors.New("task has not been completed")
// ErrNotStarted - current job was not started.
ErrNotStarted = errors.New("job has not been started")
// ErrAlreadyDone - current job has been already done.
ErrAlreadyDone = errors.New("job already completed")
)
// HandlerTask represents sync/async handler task.
type HandlerTask interface {
// Do should execute provided closure.
Do(context.Context, func(<-chan struct{}) error)
// Status should return the status of current job/task.
Status() JobStatus
// Complete is supposed to be called inside Do's closure when job is done in
// order to change job status and be able to return the result by calling
// Resolve() func.
Complete(interface{}, error) error
// Resolve returns the result (which should be returned from the clousere using
// Complete()) and error.
Resolve() (interface{}, error)
}
// base task for sync/async jobs.
type task struct {
sync.Mutex
status JobStatus
started time.Time
finished time.Time
asyncTimeout time.Duration
// returning params
data interface{}
error error
}
// Status returns status of the current task.
func (t *task) Status() JobStatus {
t.Lock()
defer t.Unlock()
return t.status
}
// Resolve returns the result of handler execution and an error.
func (t *task) Resolve() (interface{}, error) {
t.Lock()
defer t.Unlock()
if t.status != StatusDone {
return nil, ErrNotCompleted
}
return t.data, t.error
}
// Complete the task with some result and error, change status to "done".
func (t *task) Complete(data interface{}, err error) error {
t.Lock()
defer t.Unlock()
switch t.status {
case StatusWaiting:
return ErrNotStarted
case StatusDone:
return ErrAlreadyDone
default:
t.data, t.error, t.status, t.finished = data, err, StatusDone, time.Now()
return err
}
}
// syncTask represents synchronous handler job.
type syncTask struct {
*task
}
// newAsyncTask is a constructor func for synchronous job.
func newSyncTask(reqTimeout time.Duration) *syncTask {
return &syncTask{task: &task{}}
}
// Do executes handler (handler should be a closure - otherwise you will not be
// able to pass arguments to it or call Complete() in order to return some value).
func (st *syncTask) Do(ctx context.Context, handler func(stop <-chan struct{}) error) {
// memorize start time and change job status
st.Lock()
st.status, st.started = StatusInProgress, time.Now()
st.Unlock()
// error chan
errChan := make(chan error, 1)
// call handler in goroutine
go func() { errChan <- handler(ctx.Done()) }()
// wait until context deadline or job is done
select {
// job was done
case err := <-errChan:
st.Complete(nil, err)
// or timeout is reached
case <-ctx.Done():
// ignore - 408 response will be sent by AsyncRequest func
}
return
}
// asyncTask represents asynchronous handler job.
type asyncTask struct {
*task
// unique request ID
ID string
}
// newAsyncTask is a constructor func for asynchronous job.
func newAsyncTask(execTimeout time.Duration) *asyncTask {
id := md5.Sum([]byte(time.Now().String()))
return &asyncTask{
ID: hex.EncodeToString(id[:]),
task: &task{
asyncTimeout: execTimeout,
},
}
}
// Do handles asynchronous execution of the handler.
func (at *asyncTask) Do(ctx context.Context, handler func(stop <-chan struct{}) error) {
// memorize start time and change job status
at.Lock()
at.status, at.started = StatusInProgress, time.Now()
at.Unlock()
// error chan
errChan := make(chan error, 1)
// call handler in goroutine
go func() {
// call the handler with actual (execution) timeout channel
errChan <- handler(func() <-chan struct{} {
// context deadline channel
ch := make(chan struct{}, 1)
// run timer in a new goroutine
go func() {
<-time.NewTimer(at.asyncTimeout).C
// channel may be closed after job is done (in some time)
close(ch)
// complete the task with context deadline error
at.Complete(nil, context.DeadlineExceeded)
}()
return ch
}())
}()
// wait until context deadline or job is done
select {
// job was done
case err := <-errChan:
// task should be completed in case if Complete has not been called in the
// handler (for instance error was returned without wrapping with Complete)
at.Complete(nil, err)
// timeout
case <-ctx.Done():
// request timeout, this is not the error for async requests because handler
// probably is still running
}
return
}
// AsyncRequest func creates a middleware that provides a mechanism to run the
// handler in a background (if HTTP request timeout was reached) and keep result
// until it is demanded again or result expires. Function parameters:
//
// reqTimeout - time allotted for processing HTTP request, if request has not been
// processed completely - returns an ID of request (to retrieve result later).
//
// asyncTimeout - maximum time for async job to be done (actual context deadline),
// this logic should be implemented in asynchronous handler or skipped - in that case
// handler cannot be interrupted.
//
// keepResult - at the expiration of a given period of time the result will be
// unavailable (deleted).
//
// NOTE: Do not use defer statements to check the status of task, send error or
// any response when using PanicRecover middleware.
func AsyncRequest(reqTimeout, asyncTimeout, keepResult time.Duration) Middleware {
// no sense to use this middleware if the following condition is not satisfied
if !(reqTimeout < asyncTimeout && asyncTimeout < keepResult) {
panic("request timeout should be less than async timeout and keep result should be greater than async timeout")
}
// each handler has its own task list
var asyncJobs = timap.New(keepResult)
// create a new Middleware
return func(next http.Handler) http.Handler {
// set timeout with ContextDeadline middleware func
return ContextDeadline(reqTimeout)(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// check if async request (should contain async header)
if _, ok := r.Header[asyncHeader]; ok {
// current request
var async *asyncTask
// if contains ID - it is not a new request
if requestID := r.Header.Get(asyncRequestID); requestID != "" {
// find async job
val, ok := asyncJobs.Load(requestID)
if !ok {
// async request is expired or has invalid ID
http.Error(w, "invalid or expired request", http.StatusBadRequest)
// skip next middleware/handlers
return
}
async = val.(*asyncTask)
} else {
// create new async task
async = newAsyncTask(asyncTimeout)
// and store in the list
asyncJobs.Store(async.ID, async)
}
// get context from request
ctx := r.Context()
// put async task to the context
ctx = context.WithValue(ctx, asyncKey{}, async)
// replace request
r = r.WithContext(ctx)
// call next handler
next.ServeHTTP(w, r)
// check the status of async task
if async.Status() == StatusDone {
asyncJobs.Delete(async.ID)
} else {
// return request ID
w.Header().Set(asyncRequestID, async.ID)
w.Header().Set(asyncRequestAccepted, async.started.Format(DefaultTimeFormat))
w.Header().Set(asyncRequestKeepUntil, async.started.Add(keepResult).Format(DefaultTimeFormat))
// the status ot request is "accepted"
w.WriteHeader(http.StatusAccepted)
// provide a basic info message to the client
w.Write([]byte("request is in progress\n"))
}
} else {
// create synchronous job
sync := newSyncTask(reqTimeout)
// get context from request
ctx := r.Context()
// put async task to the context
ctx = context.WithValue(ctx, asyncKey{}, sync)
// replace request
r = r.WithContext(ctx)
// call next handler
next.ServeHTTP(w, r)
// send timeout code on exit if synchronous job was not done
if _, err := sync.Resolve(); err == ErrNotCompleted {
http.Error(w, context.DeadlineExceeded.Error(), http.StatusRequestTimeout)
}
}
}))
}
}
// GetHandlerTask extracts current job from context.
func GetHandlerTask(ctx context.Context) (HandlerTask, bool) {
async, ok := ctx.Value(asyncKey{}).(HandlerTask)
return async, ok
}