Skip to content

Commit bad6744

Browse files
author
Michal Tichák
committed
[core] changes to distribution of kafka messages to partitions
- added key to messages containing environment id or run number - changed Balancer from LeastBytes to Hash which makes kafka-go puts messages with same key to the same partition, fixing ordering problems
1 parent c71657a commit bad6744

File tree

1 file changed

+32
-5
lines changed

1 file changed

+32
-5
lines changed

common/event/writer.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ package event
2626

2727
import (
2828
"context"
29+
"encoding/binary"
2930
"fmt"
3031
"time"
3132

@@ -62,7 +63,7 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
6263
Writer: &kafka.Writer{
6364
Addr: kafka.TCP(viper.GetStringSlice("kafkaEndpoints")...),
6465
Topic: string(topic),
65-
Balancer: &kafka.LeastBytes{},
66+
Balancer: &kafka.Hash{},
6667
AllowAutoTopicCreation: true,
6768
},
6869
}
@@ -80,6 +81,18 @@ func (w *KafkaWriter) WriteEvent(e interface{}) {
8081
}
8182
}
8283

84+
type HasEnvID interface {
85+
GetEnvironmentId() string
86+
}
87+
88+
func extractAndConvertEnvID[T HasEnvID](object T) []byte {
89+
envID := []byte(object.GetEnvironmentId())
90+
if len(envID) > 0 {
91+
return envID
92+
}
93+
return nil
94+
}
95+
8396
func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time) {
8497
if w == nil {
8598
return
@@ -89,6 +102,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
89102
var (
90103
err error
91104
wrappedEvent *pb.Event
105+
key []byte = nil
92106
)
93107

94108
switch e := e.(type) {
@@ -111,36 +125,44 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
111125
Payload: &pb.Event_FrameworkEvent{FrameworkEvent: e},
112126
}
113127
case *pb.Ev_TaskEvent:
128+
key = extractAndConvertEnvID(e)
114129
wrappedEvent = &pb.Event{
115130
Timestamp: timestamp.UnixMilli(),
116131
TimestampNano: timestamp.UnixNano(),
117132
Payload: &pb.Event_TaskEvent{TaskEvent: e},
118133
}
119134
case *pb.Ev_RoleEvent:
135+
key = extractAndConvertEnvID(e)
120136
wrappedEvent = &pb.Event{
121137
Timestamp: timestamp.UnixMilli(),
122138
TimestampNano: timestamp.UnixNano(),
123139
Payload: &pb.Event_RoleEvent{RoleEvent: e},
124140
}
125141
case *pb.Ev_EnvironmentEvent:
142+
key = extractAndConvertEnvID(e)
126143
wrappedEvent = &pb.Event{
127144
Timestamp: timestamp.UnixMilli(),
128145
TimestampNano: timestamp.UnixNano(),
129146
Payload: &pb.Event_EnvironmentEvent{EnvironmentEvent: e},
130147
}
131148
case *pb.Ev_CallEvent:
149+
key = extractAndConvertEnvID(e)
132150
wrappedEvent = &pb.Event{
133151
Timestamp: timestamp.UnixMilli(),
134152
TimestampNano: timestamp.UnixNano(),
135153
Payload: &pb.Event_CallEvent{CallEvent: e},
136154
}
137155
case *pb.Ev_IntegratedServiceEvent:
156+
key = extractAndConvertEnvID(e)
138157
wrappedEvent = &pb.Event{
139158
Timestamp: timestamp.UnixMilli(),
140159
TimestampNano: timestamp.UnixNano(),
141160
Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e},
142161
}
143162
case *pb.Ev_RunEvent:
163+
// we are using RunNumber as a key here, requested by M.Boulais
164+
key = make([]byte, 4)
165+
binary.BigEndian.PutUint32(key, e.RunNumber)
144166
wrappedEvent = &pb.Event{
145167
Timestamp: timestamp.UnixMilli(),
146168
TimestampNano: timestamp.UnixNano(),
@@ -151,7 +173,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
151173
if wrappedEvent == nil {
152174
err = fmt.Errorf("unsupported event type")
153175
} else {
154-
err = w.doWriteEvent(wrappedEvent)
176+
err = w.doWriteEvent(key, wrappedEvent)
155177
}
156178

157179
if err != nil {
@@ -162,7 +184,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
162184
}()
163185
}
164186

165-
func (w *KafkaWriter) doWriteEvent(e *pb.Event) error {
187+
func (w *KafkaWriter) doWriteEvent(key []byte, e *pb.Event) error {
166188
if w == nil {
167189
return nil
168190
}
@@ -172,10 +194,15 @@ func (w *KafkaWriter) doWriteEvent(e *pb.Event) error {
172194
return fmt.Errorf("failed to marshal event: %w", err)
173195
}
174196

175-
err = w.WriteMessages(context.Background(), kafka.Message{
197+
message := kafka.Message{
176198
Value: data,
177-
})
199+
}
200+
201+
if key != nil {
202+
message.Key = key
203+
}
178204

205+
err = w.WriteMessages(context.Background(), message)
179206
if err != nil {
180207
return fmt.Errorf("failed to write event: %w", err)
181208
}

0 commit comments

Comments
 (0)