-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfast_queue_impl.go
More file actions
141 lines (125 loc) · 2.97 KB
/
fast_queue_impl.go
File metadata and controls
141 lines (125 loc) · 2.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package queue
import "sync"
type fastQueue[T any] struct {
head *queueItem[T]
tail *queueItem[T]
cond *sync.Cond
lock *sync.Mutex
deBlock bool
ignoreEnqueue bool
}
// NewFastQueue creates a new instance of a linked list implementer of the Queue interface
// where only non-read-only operations are synchronised and unreachable data is not cleared directly
func NewFastQueue[T any]() Queue[T] {
lock := &sync.Mutex{}
return &fastQueue[T]{
head: nil,
tail: nil,
cond: sync.NewCond(lock),
lock: lock,
}
}
// Enqueue a value of type T
func (q *fastQueue[T]) Enqueue(value T) {
if q.ignoreEnqueue {
return
}
q.lock.Lock()
defer q.lock.Unlock()
if q.head == nil {
q.head = &queueItem[T]{value: value}
q.tail = q.head
} else {
q.tail.next = &queueItem[T]{value: value}
q.tail = q.tail.next
}
q.cond.Signal()
}
// Pop a value of a pointer of type T blocking for a value when IsEmpty
// (nil when nothing to dequeue and IsUnBlocking is false)
func (q *fastQueue[T]) Pop() *T {
q.lock.Lock()
defer q.lock.Unlock()
for q.head == nil && !q.deBlock {
q.cond.Wait()
}
if q.head == nil {
return nil
}
value := q.head.value
if q.head == q.tail {
q.tail = nil
}
q.head = q.head.next
return &value
}
// Dequeue a value of type T blocking for a value when IsEmpty
// (default value of type T when nothing to dequeue and IsUnBlocking is false)
func (q *fastQueue[T]) Dequeue() T {
var value T
pval := q.Pop()
if pval != nil {
value = *pval
}
return value
}
// IsEmpty of items in queue
func (q *fastQueue[T]) IsEmpty() bool {
return q.head == nil
}
// Peek first queue value (nil when empty)
func (q *fastQueue[T]) Peek() *T {
if q.head == nil {
return nil
}
return &q.head.value
}
// PeekLast queue value (nil when empty)
func (q *fastQueue[T]) PeekLast() *T {
if q.tail == nil {
return nil
}
return &q.tail.value
}
// StartUnBlocking the queue allowing Dequeue and Pop to return when no items are present and sets IsBlockingEnqueue
func (q *fastQueue[T]) StartUnBlocking() {
q.lock.Lock()
defer q.lock.Unlock()
q.deBlock = true
q.ignoreEnqueue = true
q.cond.Broadcast()
}
// EndUnBlocking the queue restores normal operations and unsets IsBlockingEnqueue
func (q *fastQueue[T]) EndUnBlocking() {
q.lock.Lock()
defer q.lock.Unlock()
q.deBlock = false
q.ignoreEnqueue = false
}
// IsUnBlocking the queue, whether Dequeue and Pop return when no items are present
func (q *fastQueue[T]) IsUnBlocking() bool {
return q.deBlock
}
// BlockEnqueue operations
func (q *fastQueue[T]) BlockEnqueue() {
q.lock.Lock()
defer q.lock.Unlock()
q.ignoreEnqueue = true
}
// UnBlockEnqueue operations
func (q *fastQueue[T]) UnBlockEnqueue() {
q.lock.Lock()
defer q.lock.Unlock()
q.ignoreEnqueue = false
}
// IsBlockingEnqueue operations
func (q *fastQueue[T]) IsBlockingEnqueue() bool {
return q.ignoreEnqueue
}
// Clear the queue
func (q *fastQueue[T]) Clear() {
q.lock.Lock()
defer q.lock.Unlock()
q.head = nil
q.tail = nil
}