Skip to content

Commit bb8f679

Browse files
author
Michal Tichák
committed
[core] changes to distribution of kafka messages to partitions
- added key to messages containing environment id or run number - changed Balancer from LeastBytes to Hash which makes kafka-go puts messages with same key to the same partition, fixing ordering problems
1 parent c71657a commit bb8f679

File tree

1 file changed

+29
-5
lines changed

1 file changed

+29
-5
lines changed

common/event/writer.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
6262
Writer: &kafka.Writer{
6363
Addr: kafka.TCP(viper.GetStringSlice("kafkaEndpoints")...),
6464
Topic: string(topic),
65-
Balancer: &kafka.LeastBytes{},
65+
Balancer: &kafka.Hash{},
6666
AllowAutoTopicCreation: true,
6767
},
6868
}
@@ -80,6 +80,18 @@ func (w *KafkaWriter) WriteEvent(e interface{}) {
8080
}
8181
}
8282

83+
type HasEnvID interface {
84+
GetEnvironmentId() string
85+
}
86+
87+
func extractAndConvertEnvID[T HasEnvID](object T) []byte {
88+
envID := []byte(object.GetEnvironmentId())
89+
if len(envID) > 0 {
90+
return envID
91+
}
92+
return nil
93+
}
94+
8395
func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time) {
8496
if w == nil {
8597
return
@@ -89,6 +101,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
89101
var (
90102
err error
91103
wrappedEvent *pb.Event
104+
key []byte = nil
92105
)
93106

94107
switch e := e.(type) {
@@ -111,36 +124,42 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
111124
Payload: &pb.Event_FrameworkEvent{FrameworkEvent: e},
112125
}
113126
case *pb.Ev_TaskEvent:
127+
key = extractAndConvertEnvID(e)
114128
wrappedEvent = &pb.Event{
115129
Timestamp: timestamp.UnixMilli(),
116130
TimestampNano: timestamp.UnixNano(),
117131
Payload: &pb.Event_TaskEvent{TaskEvent: e},
118132
}
119133
case *pb.Ev_RoleEvent:
134+
key = extractAndConvertEnvID(e)
120135
wrappedEvent = &pb.Event{
121136
Timestamp: timestamp.UnixMilli(),
122137
TimestampNano: timestamp.UnixNano(),
123138
Payload: &pb.Event_RoleEvent{RoleEvent: e},
124139
}
125140
case *pb.Ev_EnvironmentEvent:
141+
key = extractAndConvertEnvID(e)
126142
wrappedEvent = &pb.Event{
127143
Timestamp: timestamp.UnixMilli(),
128144
TimestampNano: timestamp.UnixNano(),
129145
Payload: &pb.Event_EnvironmentEvent{EnvironmentEvent: e},
130146
}
131147
case *pb.Ev_CallEvent:
148+
key = extractAndConvertEnvID(e)
132149
wrappedEvent = &pb.Event{
133150
Timestamp: timestamp.UnixMilli(),
134151
TimestampNano: timestamp.UnixNano(),
135152
Payload: &pb.Event_CallEvent{CallEvent: e},
136153
}
137154
case *pb.Ev_IntegratedServiceEvent:
155+
key = extractAndConvertEnvID(e)
138156
wrappedEvent = &pb.Event{
139157
Timestamp: timestamp.UnixMilli(),
140158
TimestampNano: timestamp.UnixNano(),
141159
Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e},
142160
}
143161
case *pb.Ev_RunEvent:
162+
key = extractAndConvertEnvID(e)
144163
wrappedEvent = &pb.Event{
145164
Timestamp: timestamp.UnixMilli(),
146165
TimestampNano: timestamp.UnixNano(),
@@ -151,7 +170,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
151170
if wrappedEvent == nil {
152171
err = fmt.Errorf("unsupported event type")
153172
} else {
154-
err = w.doWriteEvent(wrappedEvent)
173+
err = w.doWriteEvent(key, wrappedEvent)
155174
}
156175

157176
if err != nil {
@@ -162,7 +181,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
162181
}()
163182
}
164183

165-
func (w *KafkaWriter) doWriteEvent(e *pb.Event) error {
184+
func (w *KafkaWriter) doWriteEvent(key []byte, e *pb.Event) error {
166185
if w == nil {
167186
return nil
168187
}
@@ -172,10 +191,15 @@ func (w *KafkaWriter) doWriteEvent(e *pb.Event) error {
172191
return fmt.Errorf("failed to marshal event: %w", err)
173192
}
174193

175-
err = w.WriteMessages(context.Background(), kafka.Message{
194+
message := kafka.Message{
176195
Value: data,
177-
})
196+
}
197+
198+
if key != nil {
199+
message.Key = key
200+
}
178201

202+
err = w.WriteMessages(context.Background(), message)
179203
if err != nil {
180204
return fmt.Errorf("failed to write event: %w", err)
181205
}

0 commit comments

Comments
 (0)