@@ -27,11 +27,14 @@ package event
2727import (
2828 "context"
2929 "fmt"
30+ "sync"
3031 "time"
3132
33+ "github.com/AliceO2Group/Control/common/ecsmetrics"
3234 "github.com/AliceO2Group/Control/common/event/topic"
3335 "github.com/AliceO2Group/Control/common/logger"
3436 "github.com/AliceO2Group/Control/common/logger/infologger"
37+ "github.com/AliceO2Group/Control/common/monitoring"
3538 pb "github.com/AliceO2Group/Control/common/protos"
3639 "github.com/segmentio/kafka-go"
3740 "github.com/sirupsen/logrus"
@@ -53,9 +56,26 @@ func (*DummyWriter) WriteEvent(interface{}) {}
5356func (* DummyWriter ) WriteEventWithTimestamp (interface {}, time.Time ) {}
5457func (* DummyWriter ) Close () {}
5558
59+ // Kafka writer is used to convert events from events.proto into kafka messages and to write them.
60+ // it is built with 2 workers:
61+ //
62+ // #1 is gathering kafka.Message from any goroutine which sends message into buffered channel and puts them into FifoBuffer.
63+ // #2 is poping any messages from FifoBuffer and sends them to Kafka
64+ //
65+ // The reason for this setup over setting Async: true in kafka.Writer is the ability to have some error handling
66+ // of failed messages. Moreover if we used only one worker that gathers messages from channel and then sends them directly to Kafka,
67+ // we would block whole core if we receive lot of messages at once. So we split functionality into two workers: one is
68+ // putting all messages into the buffer, so if we have a lot of messages buffer just grows without blocking whole core and the
69+ // second does all the sending. This setup allows us to gather messages from any amount of goroutines without blocking/losing messages.
70+ // Another benefit is batching messages instead of writing them one by one.
5671type KafkaWriter struct {
5772 * kafka.Writer
58- toWriteChan chan kafka.Message
73+ toBatchMessagesChan chan kafka.Message
74+ messageBuffer FifoBuffer [kafka.Message ]
75+ // NOTE: we use settable callback in order to be able to test writing of messages via KafkaWriter, without necessity of setting up cluster
76+ writeFunction func ([]kafka.Message )
77+ runningWorkers sync.WaitGroup
78+ batchingDoneCh chan struct {}
5979}
6080
6181func NewWriterWithTopic (topic topic.Topic ) * KafkaWriter {
@@ -66,16 +86,33 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
6686 Balancer : & kafka.Hash {},
6787 AllowAutoTopicCreation : true ,
6888 },
69- toWriteChan : make (chan kafka.Message , 1000 ),
89+ toBatchMessagesChan : make (chan kafka.Message , 100 ),
90+ messageBuffer : NewFifoBuffer [kafka.Message ](),
91+ runningWorkers : sync.WaitGroup {},
92+ batchingDoneCh : make (chan struct {}, 1 ),
93+ }
94+
95+ writer .writeFunction = func (messages []kafka.Message ) {
96+ if err := writer .WriteMessages (context .Background (), messages ... ); err != nil {
97+ metric := ecsmetrics .NewMetric ("kafka" )
98+ metric .AddTag ("topic" , writer .Topic )
99+ metric .AddValue ("failedsentmessages" , len (messages ))
100+ monitoring .Send (metric )
101+ log .Errorf ("failed to write %d messages to kafka with error: %v" , len (messages ), err )
102+ }
70103 }
71104
72105 go writer .writingLoop ()
106+ go writer .batchingLoop ()
107+
73108 return writer
74109}
75110
76111func (w * KafkaWriter ) Close () {
77112 if w != nil {
78- close (w .toWriteChan )
113+ w .runningWorkers .Add (2 )
114+ close (w .toBatchMessagesChan )
115+ w .runningWorkers .Wait ()
79116 w .Writer .Close ()
80117 }
81118}
@@ -86,17 +123,37 @@ func (w *KafkaWriter) WriteEvent(e interface{}) {
86123 }
87124}
88125
89- // TODO: we can optimise this to write multiple message at once
90126func (w * KafkaWriter ) writingLoop () {
91- for message := range w .toWriteChan {
92- err := w .WriteMessages (context .Background (), message )
93- if err != nil {
94- log .WithField ("level" , infologger .IL_Support ).
95- Errorf ("failed to write async kafka message: %w" , err )
127+ for {
128+ select {
129+ case <- w .batchingDoneCh :
130+ w .runningWorkers .Done ()
131+ return
132+ default :
133+ messagesToSend := w .messageBuffer .PopMultiple (100 )
134+ if len (messagesToSend ) == 0 {
135+ continue
136+ }
137+
138+ w .writeFunction (messagesToSend )
139+
140+ metric := ecsmetrics .NewMetric ("kafka" )
141+ metric .AddTag ("topic" , w .Topic )
142+ metric .AddValue ("sentmessages" , len (messagesToSend ))
143+ monitoring .Send (metric )
96144 }
97145 }
98146}
99147
148+ func (w * KafkaWriter ) batchingLoop () {
149+ for message := range w .toBatchMessagesChan {
150+ w .messageBuffer .Push (message )
151+ }
152+ w .batchingDoneCh <- struct {}{}
153+ w .messageBuffer .ReleaseGoroutines ()
154+ w .runningWorkers .Done ()
155+ }
156+
100157type HasEnvID interface {
101158 GetEnvironmentId () string
102159}
@@ -109,6 +166,7 @@ func extractAndConvertEnvID[T HasEnvID](object T) []byte {
109166 return nil
110167}
111168
169+ // TODO: there should be written test to verify converting all of these messages
112170func internalEventToKafkaEvent (internalEvent interface {}, timestamp time.Time ) (kafkaEvent * pb.Event , key []byte , err error ) {
113171 kafkaEvent = & pb.Event {
114172 Timestamp : timestamp .UnixMilli (),
@@ -188,9 +246,5 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
188246 return
189247 }
190248
191- select {
192- case w .toWriteChan <- message :
193- default :
194- log .Warnf ("Writer of kafka topic [%s] cannot write because channel is full, discarding a message" , w .Writer .Topic )
195- }
249+ w .toBatchMessagesChan <- message
196250}
0 commit comments