Skip to content

Commit eaac207

Browse files
authored
Merge pull request #17 from koykov/parallel_fifo
FIFO: implement parallel FIFO engine.
2 parents 030991a + a7efbe9 commit eaac207

6 files changed

Lines changed: 88 additions & 13 deletions

File tree

config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ type Config struct {
2727
// Queue capacity.
2828
// Mandatory param if QoS config omitted. QoS (if provided) summing capacity will overwrite this field.
2929
Capacity uint64
30+
// Streams allows to avoid mutex starvation by sharing items among Streams sub-channels instead of one singe
31+
// channel.
32+
Streams uint32
3033
// MaxRetries determines the maximum number of item processing retries.
3134
// If MaxRetries is exceeded, the item will send to DLQ (if possible).
3235
// The initial attempt is not counted as a retry.

pfifo.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package queue
2+
3+
import (
4+
"math"
5+
"sync/atomic"
6+
)
7+
8+
// Parallel FIFO engine implementation.
9+
type pfifo struct {
10+
pool []chan item
11+
c, o, m uint64
12+
}
13+
14+
func (e *pfifo) init(config *Config) error {
15+
var inst uint64
16+
if inst = uint64(config.Streams); inst == 0 {
17+
inst = 1
18+
}
19+
cap_ := config.Capacity / inst
20+
for i := uint64(0); i < inst; i++ {
21+
e.pool = append(e.pool, make(chan item, cap_))
22+
}
23+
e.c, e.o, e.m = math.MaxUint64, math.MaxUint64, inst
24+
return nil
25+
}
26+
27+
func (e *pfifo) enqueue(itm *item, block bool) bool {
28+
idx := atomic.AddUint64(&e.c, 1) % e.m
29+
if !block {
30+
select {
31+
case e.pool[idx] <- *itm:
32+
return true
33+
default:
34+
return false
35+
}
36+
}
37+
e.pool[idx] <- *itm
38+
return true
39+
}
40+
41+
func (e *pfifo) dequeue() (item, bool) {
42+
idx := atomic.AddUint64(&e.o, 1) % e.m
43+
itm, ok := <-e.pool[idx]
44+
return itm, ok
45+
}
46+
47+
func (e *pfifo) dequeueSQ(_ uint32) (item, bool) {
48+
return e.dequeue()
49+
}
50+
51+
func (e *pfifo) size() (r int) {
52+
for i := uint64(0); i < e.m; i++ {
53+
r += len(e.pool[i])
54+
}
55+
return
56+
}
57+
58+
func (e *pfifo) cap() (r int) {
59+
for i := uint64(0); i < e.m; i++ {
60+
r += cap(e.pool[i])
61+
}
62+
return
63+
}
64+
65+
func (e *pfifo) close(_ bool) error {
66+
for i := uint64(0); i < e.m; i++ {
67+
close(e.pool[i])
68+
}
69+
return nil
70+
}

pq.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -336,15 +336,15 @@ type egress struct {
336336
}
337337

338338
func (e *egress) init(conf *qos.EgressConfig) error {
339-
for i := uint32(0); i < conf.Instances; i++ {
339+
for i := uint32(0); i < conf.Streams; i++ {
340340
e.pool = append(e.pool, make(chan item, conf.Capacity))
341341
name := qos.Egress
342-
if conf.Instances > 1 {
342+
if conf.Streams > 1 {
343343
name = fmt.Sprintf("%s%d", qos.Egress, i)
344344
}
345345
e.name = append(e.name, name)
346346
}
347-
e.c, e.o, e.m = math.MaxUint64, math.MaxUint64, uint64(conf.Instances)
347+
e.c, e.o, e.m = math.MaxUint64, math.MaxUint64, uint64(conf.Streams)
348348
return nil
349349
}
350350

qos/config.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ const (
2222
)
2323
const (
2424
defaultEgressCapacity = uint64(64)
25-
defaultEgressInstances = uint32(1)
25+
defaultEgressStreams = uint32(1)
2626
defaultEgressWorkers = uint32(1)
2727
defaultEgressIdleThreshold = uint32(1000)
2828
defaultEgressIdleTimeout = time.Millisecond
@@ -46,8 +46,8 @@ type EgressConfig struct {
4646
// If this param omit defaultEgressCapacity (64) will use instead.
4747
Capacity uint64
4848
// Amount of separate egress sub-queues.
49-
// If this param omit defaultEgressInstances (1) will use instead.
50-
Instances uint32
49+
// If this param omit defaultEgressStreams (1) will use instead.
50+
Streams uint32
5151
// Count of transit workers between sub-queues and egress sud-queue.
5252
// If this param omit defaultEgressWorkers (1) will use instead.
5353
// Use with caution!
@@ -65,8 +65,8 @@ func New(algo Algo, eval PriorityEvaluator) *Config {
6565
q := Config{
6666
Algo: algo,
6767
Egress: EgressConfig{
68-
Capacity: defaultEgressCapacity,
69-
Instances: defaultEgressInstances,
68+
Capacity: defaultEgressCapacity,
69+
Streams: defaultEgressStreams,
7070
},
7171
Evaluator: eval,
7272
}
@@ -88,8 +88,8 @@ func (q *Config) SetEgressCapacity(cap uint64) *Config {
8888
return q
8989
}
9090

91-
func (q *Config) SetEgressInstances(inst uint32) *Config {
92-
q.Egress.Instances = inst
91+
func (q *Config) SetEgressStreams(streams uint32) *Config {
92+
q.Egress.Streams = streams
9393
return q
9494
}
9595

@@ -127,8 +127,8 @@ func (q *Config) Validate() error {
127127
if q.Egress.Capacity == 0 {
128128
q.Egress.Capacity = defaultEgressCapacity
129129
}
130-
if q.Egress.Instances == 0 {
131-
q.Egress.Instances = defaultEgressInstances
130+
if q.Egress.Streams == 0 {
131+
q.Egress.Streams = defaultEgressStreams
132132
}
133133
if q.Egress.Workers == 0 {
134134
q.Egress.Workers = defaultEgressWorkers

qos/readme.ru.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ conf := Config{
3939
`egress` это специальная под-очередь, куда перемещаются элементы согласно алгоритму приоретизации. Настраивается
4040
посредством заполнения под-структуры `EgressConfig` с параметрами:
4141
* `Capacity` - ёмкость, обязательный параметр.
42-
* `Instances` - количество egress очередей для случаев, когда egress является бутылочным горлышком.
42+
* `Streams` - количество egress очередей для случаев, когда egress является бутылочным горлышком.
4343
* `Workers` - количество воркеров, которые перемещают элементы в egress из прочих под-очередей.
4444
* `IdleThreshold` - сколько попыток чтения допустимо предпринять из пустых под-очередей.
4545
* `IdleTimeout` - сколько ждать после превышения `IdleThreshold` прежде чем предпринять ещё одну попытку чтения.

queue.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ func (q *Queue) init() {
176176
}
177177
c.Capacity = c.QoS.SummingCapacity()
178178
q.engine = &pq{}
179+
case c.Streams > 0:
180+
q.engine = &pfifo{}
179181
default:
180182
q.engine = &fifo{}
181183
}

0 commit comments

Comments
 (0)