Skip to content

Commit e73a809

Browse files
author
Michal Tichák
committed
[core] async kafka writing with error reporting
1 parent 5f84cef commit e73a809

File tree

1 file changed

+20
-6
lines changed

1 file changed

+20
-6
lines changed

common/event/writer.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,22 +55,28 @@ func (*DummyWriter) Close() {}
5555

5656
type KafkaWriter struct {
5757
*kafka.Writer
58+
toWriteChan chan kafka.Message
5859
}
5960

6061
func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
61-
return &KafkaWriter{
62+
writer := &KafkaWriter{
6263
Writer: &kafka.Writer{
6364
Addr: kafka.TCP(viper.GetStringSlice("kafkaEndpoints")...),
6465
Topic: string(topic),
6566
Balancer: &kafka.Hash{},
6667
AllowAutoTopicCreation: true,
6768
},
69+
toWriteChan: make(chan kafka.Message, 1000),
6870
}
71+
72+
go writer.writingLoop()
73+
return writer
6974
}
7075

7176
func (w *KafkaWriter) Close() {
7277
if w != nil {
73-
w.Close()
78+
close(w.toWriteChan)
79+
w.Writer.Close()
7480
}
7581
}
7682

@@ -80,6 +86,16 @@ func (w *KafkaWriter) WriteEvent(e interface{}) {
8086
}
8187
}
8288

89+
// TODO: we can optimise this to write multiple message at once
90+
func (w *KafkaWriter) writingLoop() {
91+
for message := range w.toWriteChan {
92+
err := w.WriteMessages(context.Background(), message)
93+
if err != nil {
94+
log.Errorf("failed to write async kafka message: %w", err)
95+
}
96+
}
97+
}
98+
8399
type HasEnvID interface {
84100
GetEnvironmentId() string
85101
}
@@ -200,9 +216,7 @@ func (w *KafkaWriter) doWriteEvent(key []byte, e *pb.Event) error {
200216
message.Key = key
201217
}
202218

203-
err = w.WriteMessages(context.Background(), message)
204-
if err != nil {
205-
return fmt.Errorf("failed to write event: %w", err)
206-
}
219+
w.toWriteChan <- message
220+
207221
return nil
208222
}

0 commit comments

Comments
 (0)