-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueue_impl.go
More file actions
165 lines (149 loc) · 3.26 KB
/
queue_impl.go
File metadata and controls
165 lines (149 loc) · 3.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package queue
import "sync"
type queue[T any] struct {
head *queueItem[T]
tail *queueItem[T]
cond *sync.Cond
lock *sync.RWMutex
deBlock bool
ignoreEnqueue bool
}
// NewQueue creates a new instance of a linked list implementer of the Queue interface
func NewQueue[T any]() Queue[T] {
lock := &sync.RWMutex{}
return &queue[T]{
head: nil,
tail: nil,
cond: sync.NewCond(lock),
lock: lock,
}
}
// Enqueue a value of type T
func (q *queue[T]) Enqueue(value T) {
q.lock.Lock()
defer q.lock.Unlock()
if q.ignoreEnqueue {
return
}
if q.head == nil {
q.head = &queueItem[T]{value: value}
q.tail = q.head
} else {
q.tail.next = &queueItem[T]{value: value}
q.tail = q.tail.next
}
q.cond.Signal()
}
// Pop a value of a pointer of type T blocking for a value when IsEmpty
// (nil when nothing to dequeue and IsUnBlocking is false)
func (q *queue[T]) Pop() *T {
q.lock.Lock()
defer q.lock.Unlock()
for q.head == nil && !q.deBlock {
q.cond.Wait()
}
var value T
if q.head == nil {
return nil
}
var dValue = value
value = q.head.value
if q.head == q.tail {
q.tail = nil
}
oHead := q.head
q.head = q.head.next
oHead.value = dValue
oHead.next = nil
return &value
}
// Dequeue a value of type T blocking for a value when IsEmpty
// (default value of type T when nothing to dequeue and IsUnBlocking is false)
func (q *queue[T]) Dequeue() T {
var value T
pval := q.Pop()
if pval != nil {
value = *pval
}
return value
}
// IsEmpty of items in queue
func (q *queue[T]) IsEmpty() bool {
q.lock.RLock()
defer q.lock.RUnlock()
return q.head == nil
}
// Peek first queue value (nil when empty)
func (q *queue[T]) Peek() *T {
q.lock.RLock()
defer q.lock.RUnlock()
if q.head == nil {
return nil
}
var value = q.head.value
return &value
}
// PeekLast queue value (nil when empty)
func (q *queue[T]) PeekLast() *T {
q.lock.RLock()
defer q.lock.RUnlock()
if q.tail == nil {
return nil
}
var value = q.tail.value
return &value
}
// StartUnBlocking the queue allowing Dequeue and Pop to return when no items are present and sets IsBlockingEnqueue
func (q *queue[T]) StartUnBlocking() {
q.lock.Lock()
defer q.lock.Unlock()
q.deBlock = true
q.ignoreEnqueue = true
q.cond.Broadcast()
}
// EndUnBlocking the queue restores normal operations and unsets IsBlockingEnqueue
func (q *queue[T]) EndUnBlocking() {
q.lock.Lock()
defer q.lock.Unlock()
q.deBlock = false
q.ignoreEnqueue = false
}
// IsUnBlocking the queue, whether Dequeue and Pop return when no items are present
func (q *queue[T]) IsUnBlocking() bool {
q.lock.RLock()
defer q.lock.RUnlock()
return q.deBlock
}
// BlockEnqueue operations
func (q *queue[T]) BlockEnqueue() {
q.lock.Lock()
defer q.lock.Unlock()
q.ignoreEnqueue = true
}
// UnBlockEnqueue operations
func (q *queue[T]) UnBlockEnqueue() {
q.lock.Lock()
defer q.lock.Unlock()
q.ignoreEnqueue = false
}
// IsBlockingEnqueue operations
func (q *queue[T]) IsBlockingEnqueue() bool {
q.lock.RLock()
defer q.lock.RUnlock()
return q.ignoreEnqueue
}
// Clear the queue
func (q *queue[T]) Clear() {
q.lock.Lock()
defer q.lock.Unlock()
var dValue T
cHead := q.head
for cHead != nil {
oHead := cHead
cHead = oHead.next
oHead.value = dValue
oHead.next = nil
}
q.head = nil
q.tail = nil
}