@@ -42,7 +42,11 @@ import (
4242 "google.golang.org/protobuf/proto"
4343)
4444
45- var log = logger .New (logrus .StandardLogger (), "event" )
45+ var (
46+ log = logger .New (logrus .StandardLogger (), "event" )
47+ KAFKAWRITER = "kafka_writer"
48+ KAFKAPREPARE = "kafka_prepare"
49+ )
4650
4751type Writer interface {
4852 WriteEvent (e interface {})
@@ -73,9 +77,15 @@ type KafkaWriter struct {
7377 toBatchMessagesChan chan kafka.Message
7478 messageBuffer FifoBuffer [kafka.Message ]
7579 // NOTE: we use settable callback in order to be able to test writing of messages via KafkaWriter, without necessity of setting up cluster
76- writeFunction func ([]kafka.Message )
77- runningWorkers sync.WaitGroup
78- batchingDoneCh chan struct {}
80+ writeFunction func ([]kafka.Message , * monitoring.Metric )
81+ runningWorkers sync.WaitGroup
82+ batchingLoopDoneCh chan struct {}
83+ }
84+
85+ func (w * KafkaWriter ) newMetric (name string ) monitoring.Metric {
86+ metric := ecsmetrics .NewMetric (name )
87+ metric .AddTag ("topic" , w .Topic )
88+ return metric
7989}
8090
8191func NewWriterWithTopic (topic topic.Topic ) * KafkaWriter {
@@ -86,18 +96,16 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
8696 Balancer : & kafka.Hash {},
8797 AllowAutoTopicCreation : true ,
8898 },
89- toBatchMessagesChan : make (chan kafka.Message , 100 ),
99+ toBatchMessagesChan : make (chan kafka.Message , 10000 ),
90100 messageBuffer : NewFifoBuffer [kafka.Message ](),
91101 runningWorkers : sync.WaitGroup {},
92- batchingDoneCh : make (chan struct {}, 1 ),
102+ batchingLoopDoneCh : make (chan struct {}, 1 ),
93103 }
94104
95- writer .writeFunction = func (messages []kafka.Message ) {
105+ writer .writeFunction = func (messages []kafka.Message , metric * monitoring.Metric ) {
106+ defer ecsmetrics .TimerNS (metric )()
96107 if err := writer .WriteMessages (context .Background (), messages ... ); err != nil {
97- metric := ecsmetrics .NewMetric ("kafka" )
98- metric .AddTag ("topic" , writer .Topic )
99- metric .AddValue ("failedsentmessages" , len (messages ))
100- monitoring .Send (metric )
108+ metric .AddValue ("messages_failed" , len (messages ))
101109 log .Errorf ("failed to write %d messages to kafka with error: %v" , len (messages ), err )
102110 }
103111 }
@@ -110,6 +118,7 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
110118
111119func (w * KafkaWriter ) Close () {
112120 if w != nil {
121+ // We are waiting until both loops (batching and writing) are done
113122 w .runningWorkers .Add (2 )
114123 close (w .toBatchMessagesChan )
115124 w .runningWorkers .Wait ()
@@ -126,7 +135,7 @@ func (w *KafkaWriter) WriteEvent(e interface{}) {
126135func (w * KafkaWriter ) writingLoop () {
127136 for {
128137 select {
129- case <- w .batchingDoneCh :
138+ case <- w .batchingLoopDoneCh :
130139 w .runningWorkers .Done ()
131140 return
132141 default :
@@ -135,11 +144,12 @@ func (w *KafkaWriter) writingLoop() {
135144 continue
136145 }
137146
138- w .writeFunction (messagesToSend )
147+ metric := w .newMetric (KAFKAWRITER )
148+ metric .AddValue ("messages_sent" , len (messagesToSend ))
149+ metric .AddValue ("messages_failed" , 0 )
150+
151+ w .writeFunction (messagesToSend , & metric )
139152
140- metric := ecsmetrics .NewMetric ("kafka" )
141- metric .AddTag ("topic" , w .Topic )
142- metric .AddValue ("sentmessages" , len (messagesToSend ))
143153 monitoring .Send (metric )
144154 }
145155 }
@@ -149,7 +159,7 @@ func (w *KafkaWriter) batchingLoop() {
149159 for message := range w .toBatchMessagesChan {
150160 w .messageBuffer .Push (message )
151161 }
152- w .batchingDoneCh <- struct {}{}
162+ w .batchingLoopDoneCh <- struct {}{}
153163 w .messageBuffer .ReleaseGoroutines ()
154164 w .runningWorkers .Done ()
155165}
@@ -230,21 +240,27 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
230240 return
231241 }
232242
233- wrappedEvent , key , err := internalEventToKafkaEvent (e , timestamp )
234- if err != nil {
235- log .WithField ("event" , e ).
236- WithField ("level" , infologger .IL_Support ).
237- Errorf ("Failed to convert event to kafka event: %s" , err .Error ())
238- return
239- }
243+ metric := w .newMetric (KAFKAPREPARE )
240244
241- message , err := kafkaEventToKafkaMessage (wrappedEvent , key )
242- if err != nil {
243- log .WithField ("event" , e ).
244- WithField ("level" , infologger .IL_Support ).
245- Errorf ("Failed to convert kafka event to message: %s" , err .Error ())
246- return
247- }
245+ func () {
246+ defer ecsmetrics .TimerNS (& metric )()
247+ wrappedEvent , key , err := internalEventToKafkaEvent (e , timestamp )
248+ if err != nil {
249+ log .WithField ("event" , e ).
250+ WithField ("level" , infologger .IL_Support ).
251+ Errorf ("Failed to convert event to kafka event: %s" , err .Error ())
252+ return
253+ }
254+
255+ message , err := kafkaEventToKafkaMessage (wrappedEvent , key )
256+ if err != nil {
257+ log .WithField ("event" , e ).
258+ WithField ("level" , infologger .IL_Support ).
259+ Errorf ("Failed to convert kafka event to message: %s" , err .Error ())
260+ return
261+ }
262+ w .toBatchMessagesChan <- message
263+ }()
248264
249- w . toBatchMessagesChan <- message
265+ monitoring . Send ( metric )
250266}
0 commit comments