Skip to content

Commit 926ea9b

Browse files
Michal Tichákjustonedev1
authored andcommitted
[core] refactor kafka writing loop
1 parent b1df1a8 commit 926ea9b

File tree

1 file changed

+45
-75
lines changed

1 file changed

+45
-75
lines changed

common/event/writer.go

Lines changed: 45 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ func (w *KafkaWriter) writingLoop() {
9191
for message := range w.toWriteChan {
9292
err := w.WriteMessages(context.Background(), message)
9393
if err != nil {
94-
log.Errorf("failed to write async kafka message: %w", err)
94+
log.WithField("level", infologger.IL_Support).
95+
Errorf("failed to write async kafka message: %w", err)
9596
}
9697
}
9798
}
@@ -108,104 +109,51 @@ func extractAndConvertEnvID[T HasEnvID](object T) []byte {
108109
return nil
109110
}
110111

111-
func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time) {
112-
if w == nil {
113-
return
112+
func internalEventToKafkaEvent(internalEvent interface{}, timestamp time.Time) (kafkaEvent *pb.Event, key []byte, err error) {
113+
kafkaEvent = &pb.Event{
114+
Timestamp: timestamp.UnixMilli(),
115+
TimestampNano: timestamp.UnixNano(),
114116
}
115117

116-
var (
117-
err error
118-
wrappedEvent *pb.Event
119-
key []byte = nil
120-
)
121-
122-
switch e := e.(type) {
118+
switch e := internalEvent.(type) {
123119
case *pb.Ev_MetaEvent_CoreStart:
124-
wrappedEvent = &pb.Event{
125-
Timestamp: timestamp.UnixMilli(),
126-
TimestampNano: timestamp.UnixNano(),
127-
Payload: &pb.Event_CoreStartEvent{CoreStartEvent: e},
128-
}
120+
kafkaEvent.Payload = &pb.Event_CoreStartEvent{CoreStartEvent: e}
129121
case *pb.Ev_MetaEvent_MesosHeartbeat:
130-
wrappedEvent = &pb.Event{
131-
Timestamp: timestamp.UnixMilli(),
132-
TimestampNano: timestamp.UnixNano(),
133-
Payload: &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e},
134-
}
122+
kafkaEvent.Payload = &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e}
135123
case *pb.Ev_MetaEvent_FrameworkEvent:
136-
wrappedEvent = &pb.Event{
137-
Timestamp: timestamp.UnixMilli(),
138-
TimestampNano: timestamp.UnixNano(),
139-
Payload: &pb.Event_FrameworkEvent{FrameworkEvent: e},
140-
}
124+
kafkaEvent.Payload = &pb.Event_FrameworkEvent{FrameworkEvent: e}
141125
case *pb.Ev_TaskEvent:
142126
key = []byte(e.Taskid)
143127
if len(key) == 0 {
144128
key = nil
145129
}
146-
wrappedEvent = &pb.Event{
147-
Timestamp: timestamp.UnixMilli(),
148-
TimestampNano: timestamp.UnixNano(),
149-
Payload: &pb.Event_TaskEvent{TaskEvent: e},
150-
}
130+
kafkaEvent.Payload = &pb.Event_TaskEvent{TaskEvent: e}
151131
case *pb.Ev_RoleEvent:
152132
key = extractAndConvertEnvID(e)
153-
wrappedEvent = &pb.Event{
154-
Timestamp: timestamp.UnixMilli(),
155-
TimestampNano: timestamp.UnixNano(),
156-
Payload: &pb.Event_RoleEvent{RoleEvent: e},
157-
}
133+
kafkaEvent.Payload = &pb.Event_RoleEvent{RoleEvent: e}
158134
case *pb.Ev_EnvironmentEvent:
159135
key = extractAndConvertEnvID(e)
160-
wrappedEvent = &pb.Event{
161-
Timestamp: timestamp.UnixMilli(),
162-
TimestampNano: timestamp.UnixNano(),
163-
Payload: &pb.Event_EnvironmentEvent{EnvironmentEvent: e},
164-
}
136+
kafkaEvent.Payload = &pb.Event_EnvironmentEvent{EnvironmentEvent: e}
165137
case *pb.Ev_CallEvent:
166138
key = extractAndConvertEnvID(e)
167-
wrappedEvent = &pb.Event{
168-
Timestamp: timestamp.UnixMilli(),
169-
TimestampNano: timestamp.UnixNano(),
170-
Payload: &pb.Event_CallEvent{CallEvent: e},
171-
}
139+
kafkaEvent.Payload = &pb.Event_CallEvent{CallEvent: e}
172140
case *pb.Ev_IntegratedServiceEvent:
173141
key = extractAndConvertEnvID(e)
174-
wrappedEvent = &pb.Event{
175-
Timestamp: timestamp.UnixMilli(),
176-
TimestampNano: timestamp.UnixNano(),
177-
Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e},
178-
}
142+
kafkaEvent.Payload = &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e}
179143
case *pb.Ev_RunEvent:
180144
key = extractAndConvertEnvID(e)
181-
wrappedEvent = &pb.Event{
182-
Timestamp: timestamp.UnixMilli(),
183-
TimestampNano: timestamp.UnixNano(),
184-
Payload: &pb.Event_RunEvent{RunEvent: e},
185-
}
186-
}
187-
188-
if wrappedEvent == nil {
145+
kafkaEvent.Payload = &pb.Event_RunEvent{RunEvent: e}
146+
default:
189147
err = fmt.Errorf("unsupported event type")
190-
} else {
191-
err = w.doWriteEvent(key, wrappedEvent)
192148
}
193149

194-
if err != nil {
195-
log.WithField("event", e).
196-
WithField("level", infologger.IL_Support).
197-
Error(err.Error())
198-
}
150+
return
199151
}
200152

201-
func (w *KafkaWriter) doWriteEvent(key []byte, e *pb.Event) error {
202-
if w == nil {
203-
return nil
204-
}
205-
206-
data, err := proto.Marshal(e)
153+
func kafkaEventToKafkaMessage(kafkaEvent *pb.Event, key []byte) (kafka.Message, error) {
154+
data, err := proto.Marshal(kafkaEvent)
207155
if err != nil {
208-
return fmt.Errorf("failed to marshal event: %w", err)
156+
return kafka.Message{}, fmt.Errorf("failed to marshal event: %w", err)
209157
}
210158

211159
message := kafka.Message{
@@ -216,11 +164,33 @@ func (w *KafkaWriter) doWriteEvent(key []byte, e *pb.Event) error {
216164
message.Key = key
217165
}
218166

167+
return message, nil
168+
}
169+
170+
func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time) {
171+
if w == nil {
172+
return
173+
}
174+
175+
wrappedEvent, key, err := internalEventToKafkaEvent(e, timestamp)
176+
if err != nil {
177+
log.WithField("event", e).
178+
WithField("level", infologger.IL_Support).
179+
Errorf("Failed to convert event to kafka event: %s", err.Error())
180+
return
181+
}
182+
183+
message, err := kafkaEventToKafkaMessage(wrappedEvent, key)
184+
if err != nil {
185+
log.WithField("event", e).
186+
WithField("level", infologger.IL_Support).
187+
Errorf("Failed to convert kafka event to message: %s", err.Error())
188+
return
189+
}
190+
219191
select {
220192
case w.toWriteChan <- message:
221193
default:
222194
log.Warnf("Writer of kafka topic [%s] cannot write because channel is full, discarding a message", w.Writer.Topic)
223195
}
224-
225-
return nil
226196
}

0 commit comments

Comments
 (0)