-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtask_queue.go
More file actions
87 lines (74 loc) · 1.49 KB
/
task_queue.go
File metadata and controls
87 lines (74 loc) · 1.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main
import (
"fmt"
"sync"
"time"
)
// Task represents a unit of work.
type Task struct {
ID int
Payload string
}
// Result holds the outcome of a processed task.
type Result struct {
TaskID int
Output string
Elapsed time.Duration
}
// WorkerPool processes tasks concurrently.
type WorkerPool struct {
workers int
jobs chan Task
results chan Result
wg sync.WaitGroup
}
func NewWorkerPool(workers, bufferSize int) *WorkerPool {
return &WorkerPool{
workers: workers,
jobs: make(chan Task, bufferSize),
results: make(chan Result, bufferSize),
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workers; i++ {
wp.wg.Add(1)
go wp.work()
}
}
func (wp *WorkerPool) work() {
defer wp.wg.Done()
for task := range wp.jobs {
start := time.Now()
output := fmt.Sprintf("processed: %s", task.Payload)
time.Sleep(10 * time.Millisecond) // simulate work
wp.results <- Result{
TaskID: task.ID,
Output: output,
Elapsed: time.Since(start),
}
}
}
func (wp *WorkerPool) Submit(t Task) {
wp.jobs <- t
}
func (wp *WorkerPool) Close() {
close(wp.jobs)
wp.wg.Wait()
close(wp.results)
}
func (wp *WorkerPool) Results() <-chan Result {
return wp.results
}
func main() {
pool := NewWorkerPool(4, 20)
pool.Start()
go func() {
for i := 1; i <= 10; i++ {
pool.Submit(Task{ID: i, Payload: fmt.Sprintf("task-%d", i)})
}
pool.Close()
}()
for r := range pool.Results() {
fmt.Printf("[task %d] %s (took %s)\n", r.TaskID, r.Output, r.Elapsed)
}
}