Skip to content

Commit e34c82a

Browse files
author
Michal Tichák
committed
[core] kafka writer does not discard messages
See OCTRL-1001 for more details
1 parent fdc6fd4 commit e34c82a

File tree

5 files changed

+357
-14
lines changed

5 files changed

+357
-14
lines changed

common/event/event_suite_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package event_test
2+
3+
import (
4+
"testing"
5+
6+
. "github.com/onsi/ginkgo/v2"
7+
. "github.com/onsi/gomega"
8+
)
9+
10+
func TestEvent(t *testing.T) {
11+
RegisterFailHandler(Fail)
12+
RunSpecs(t, "Event Suite")
13+
}

common/event/fifobuffer.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2025 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package event
26+
27+
import (
28+
"math"
29+
"sync"
30+
)
31+
32+
// This structure is meant to be used as a threadsafe FIFO with builtin waiting for new data
33+
// in its Pop and PopMultiple functions. It is meant to be used with multiple goroutines, it is a
34+
// waste of synchronization mechanisms if used synchronously.
35+
type FifoBuffer[T any] struct {
36+
lock sync.Mutex
37+
cond sync.Cond
38+
39+
buffer []T
40+
}
41+
42+
func NewFifoBuffer[T any]() (result FifoBuffer[T]) {
43+
result = FifoBuffer[T]{
44+
lock: sync.Mutex{},
45+
}
46+
result.cond = *sync.NewCond(&result.lock)
47+
return
48+
}
49+
50+
func (this *FifoBuffer[T]) Push(value T) {
51+
this.cond.L.Lock()
52+
this.buffer = append(this.buffer, value)
53+
this.cond.Signal()
54+
this.cond.L.Unlock()
55+
}
56+
57+
// Blocks until it has some value in internal buffer
58+
func (this *FifoBuffer[T]) PopMultiple(numberToPop uint) (result []T) {
59+
this.cond.L.Lock()
60+
defer this.cond.L.Unlock()
61+
62+
for len(this.buffer) == 0 {
63+
this.cond.Wait()
64+
// this check is used when ReleaseGoroutines is called on waiting goroutine
65+
if len(this.buffer) == 0 {
66+
return
67+
}
68+
}
69+
70+
result = make([]T, int(math.Min(float64(numberToPop), float64(len(this.buffer)))))
71+
copy(result, this.buffer[0:len(result)])
72+
this.buffer = this.buffer[len(result):]
73+
74+
return
75+
}
76+
77+
func (this *FifoBuffer[T]) Length() int {
78+
this.cond.L.Lock()
79+
defer this.cond.L.Unlock()
80+
return len(this.buffer)
81+
}
82+
83+
func (this *FifoBuffer[T]) ReleaseGoroutines() {
84+
this.cond.L.Lock()
85+
this.cond.Broadcast()
86+
this.cond.L.Unlock()
87+
}

common/event/fifobuffer_test.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2025 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package event
26+
27+
import (
28+
"sync"
29+
"time"
30+
31+
. "github.com/onsi/ginkgo/v2"
32+
. "github.com/onsi/gomega"
33+
)
34+
35+
var _ = Describe("FifoBuffer", func() {
36+
When("Poping lower amount of items than inside of a buffer", func() {
37+
It("returns requested items", func() {
38+
buffer := NewFifoBuffer[int]()
39+
buffer.Push(1)
40+
buffer.Push(2)
41+
buffer.Push(3)
42+
43+
Expect(buffer.Length()).To(Equal(3))
44+
45+
results := buffer.PopMultiple(2)
46+
Expect(results).To(Equal([]int{1, 2}))
47+
})
48+
})
49+
50+
When("Poping higher amount of items than inside of a buffer", func() {
51+
It("returns only available items", func() {
52+
buffer := NewFifoBuffer[int]()
53+
buffer.Push(1)
54+
55+
results := buffer.PopMultiple(2)
56+
Expect(results).To(Equal([]int{1}))
57+
})
58+
})
59+
60+
When("We use buffer with multiple goroutines pushing first (PopMultiple)", func() {
61+
It("is synchronised properly", func() {
62+
buffer := NewFifoBuffer[int]()
63+
channel := make(chan struct{})
64+
65+
wg := sync.WaitGroup{}
66+
wg.Add(2)
67+
68+
go func() {
69+
buffer.Push(1)
70+
channel <- struct{}{}
71+
wg.Done()
72+
}()
73+
74+
go func() {
75+
<-channel
76+
result := buffer.PopMultiple(42)
77+
Expect(result, 1)
78+
wg.Done()
79+
}()
80+
81+
wg.Wait()
82+
})
83+
})
84+
85+
When("We use buffer with multiple goroutines popping first", func() {
86+
It("is synchronised properly", func() {
87+
buffer := NewFifoBuffer[int]()
88+
channel := make(chan struct{})
89+
90+
wg := sync.WaitGroup{}
91+
wg.Add(2)
92+
93+
go func() {
94+
// Pop is blocking is we have empty buffer, so we notify before
95+
channel <- struct{}{}
96+
result := buffer.PopMultiple(42)
97+
Expect(result, 1)
98+
wg.Done()
99+
}()
100+
101+
go func() {
102+
<-channel
103+
buffer.Push(1)
104+
wg.Done()
105+
}()
106+
107+
wg.Wait()
108+
})
109+
})
110+
111+
When("We block FifoBuffer without data and call Release", func() {
112+
It("releases goroutines properly", func() {
113+
buffer := NewFifoBuffer[int]()
114+
everythingDone := sync.WaitGroup{}
115+
channel := make(chan struct{})
116+
117+
everythingDone.Add(1)
118+
go func() {
119+
channel <- struct{}{}
120+
buffer.PopMultiple(42)
121+
everythingDone.Done()
122+
}()
123+
<-channel
124+
time.Sleep(100 * time.Millisecond)
125+
buffer.ReleaseGoroutines()
126+
everythingDone.Wait()
127+
})
128+
})
129+
})

common/event/writer.go

Lines changed: 61 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,14 @@ package event
2727
import (
2828
"context"
2929
"fmt"
30+
"sync"
3031
"time"
3132

33+
"github.com/AliceO2Group/Control/common/ecsmetrics"
3234
"github.com/AliceO2Group/Control/common/event/topic"
3335
"github.com/AliceO2Group/Control/common/logger"
3436
"github.com/AliceO2Group/Control/common/logger/infologger"
37+
"github.com/AliceO2Group/Control/common/monitoring"
3538
pb "github.com/AliceO2Group/Control/common/protos"
3639
"github.com/segmentio/kafka-go"
3740
"github.com/sirupsen/logrus"
@@ -53,9 +56,26 @@ func (*DummyWriter) WriteEvent(interface{}) {}
5356
func (*DummyWriter) WriteEventWithTimestamp(interface{}, time.Time) {}
5457
func (*DummyWriter) Close() {}
5558

59+
// Kafka writer is used to convert events from events.proto into kafka messages and to write them.
60+
// it is built with 2 workers:
61+
//
62+
// #1 is gathering kafka.Message from any goroutine which sends message into buffered channel and puts them into FifoBuffer.
63+
// #2 is poping any messages from FifoBuffer and sends them to Kafka
64+
//
65+
// The reason for this setup over setting Async: true in kafka.Writer is the ability to have some error handling
66+
// of failed messages. Moreover if we used only one worker that gathers messages from channel and then sends them directly to Kafka,
67+
// we would block whole core if we receive lot of messages at once. So we split functionality into two workers: one is
68+
// putting all messages into the buffer, so if we have a lot of messages buffer just grows without blocking whole core and the
69+
// second does all the sending. This setup allows us to gather messages from any amount of goroutines without blocking/losing messages.
70+
// Another benefit is batching messages instead of writing them one by one.
5671
type KafkaWriter struct {
5772
*kafka.Writer
58-
toWriteChan chan kafka.Message
73+
toBatchMessagesChan chan kafka.Message
74+
messageBuffer FifoBuffer[kafka.Message]
75+
// NOTE: existence of this is to be able to test the writer without actually setting up kafka
76+
writeFunction func([]kafka.Message)
77+
runningWorkers sync.WaitGroup
78+
batchingDoneCh chan struct{}
5979
}
6080

6181
func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
@@ -66,16 +86,27 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
6686
Balancer: &kafka.Hash{},
6787
AllowAutoTopicCreation: true,
6888
},
69-
toWriteChan: make(chan kafka.Message, 1000),
89+
toBatchMessagesChan: make(chan kafka.Message, 100),
90+
messageBuffer: NewFifoBuffer[kafka.Message](),
91+
runningWorkers: sync.WaitGroup{},
92+
batchingDoneCh: make(chan struct{}, 1),
93+
}
94+
95+
writer.writeFunction = func(messages []kafka.Message) {
96+
writer.WriteMessages(context.Background(), messages...)
7097
}
7198

7299
go writer.writingLoop()
100+
go writer.batchingLoop()
101+
73102
return writer
74103
}
75104

76105
func (w *KafkaWriter) Close() {
77106
if w != nil {
78-
close(w.toWriteChan)
107+
w.runningWorkers.Add(2)
108+
close(w.toBatchMessagesChan)
109+
w.runningWorkers.Wait()
79110
w.Writer.Close()
80111
}
81112
}
@@ -86,17 +117,36 @@ func (w *KafkaWriter) WriteEvent(e interface{}) {
86117
}
87118
}
88119

89-
// TODO: we can optimise this to write multiple message at once
90120
func (w *KafkaWriter) writingLoop() {
91-
for message := range w.toWriteChan {
92-
err := w.WriteMessages(context.Background(), message)
93-
if err != nil {
94-
log.WithField("level", infologger.IL_Support).
95-
Errorf("failed to write async kafka message: %w", err)
121+
for {
122+
select {
123+
case <-w.batchingDoneCh:
124+
w.runningWorkers.Done()
125+
return
126+
default:
127+
messagesToSend := w.messageBuffer.PopMultiple(100)
128+
if len(messagesToSend) == 0 {
129+
continue
130+
}
131+
w.writeFunction(messagesToSend)
132+
133+
metric := ecsmetrics.NewMetric("kafka")
134+
metric.AddTag("topic", w.Topic)
135+
metric.AddValue("sentmessages", len(messagesToSend))
136+
monitoring.Send(metric)
96137
}
97138
}
98139
}
99140

141+
func (w *KafkaWriter) batchingLoop() {
142+
for message := range w.toBatchMessagesChan {
143+
w.messageBuffer.Push(message)
144+
}
145+
w.batchingDoneCh <- struct{}{}
146+
w.messageBuffer.ReleaseGoroutines()
147+
w.runningWorkers.Done()
148+
}
149+
100150
type HasEnvID interface {
101151
GetEnvironmentId() string
102152
}
@@ -109,6 +159,7 @@ func extractAndConvertEnvID[T HasEnvID](object T) []byte {
109159
return nil
110160
}
111161

162+
// TODO: there should be written test to convert all of these messages
112163
func internalEventToKafkaEvent(internalEvent interface{}, timestamp time.Time) (kafkaEvent *pb.Event, key []byte, err error) {
113164
kafkaEvent = &pb.Event{
114165
Timestamp: timestamp.UnixMilli(),
@@ -188,9 +239,5 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
188239
return
189240
}
190241

191-
select {
192-
case w.toWriteChan <- message:
193-
default:
194-
log.Warnf("Writer of kafka topic [%s] cannot write because channel is full, discarding a message", w.Writer.Topic)
195-
}
242+
w.toBatchMessagesChan <- message
196243
}

0 commit comments

Comments
 (0)