@@ -62,7 +62,7 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
6262 Writer : & kafka.Writer {
6363 Addr : kafka .TCP (viper .GetStringSlice ("kafkaEndpoints" )... ),
6464 Topic : string (topic ),
65- Balancer : & kafka.LeastBytes {},
65+ Balancer : & kafka.Hash {},
6666 AllowAutoTopicCreation : true ,
6767 },
6868 }
@@ -80,6 +80,18 @@ func (w *KafkaWriter) WriteEvent(e interface{}) {
8080 }
8181}
8282
83+ type HasEnvID interface {
84+ GetEnvironmentId () string
85+ }
86+
87+ func extractAndConvertEnvID [T HasEnvID ](object T ) []byte {
88+ envID := []byte (object .GetEnvironmentId ())
89+ if len (envID ) > 0 {
90+ return envID
91+ }
92+ return nil
93+ }
94+
8395func (w * KafkaWriter ) WriteEventWithTimestamp (e interface {}, timestamp time.Time ) {
8496 if w == nil {
8597 return
@@ -89,6 +101,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
89101 var (
90102 err error
91103 wrappedEvent * pb.Event
104+ key []byte = nil
92105 )
93106
94107 switch e := e .(type ) {
@@ -111,36 +124,42 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
111124 Payload : & pb.Event_FrameworkEvent {FrameworkEvent : e },
112125 }
113126 case * pb.Ev_TaskEvent :
127+ key = extractAndConvertEnvID (e )
114128 wrappedEvent = & pb.Event {
115129 Timestamp : timestamp .UnixMilli (),
116130 TimestampNano : timestamp .UnixNano (),
117131 Payload : & pb.Event_TaskEvent {TaskEvent : e },
118132 }
119133 case * pb.Ev_RoleEvent :
134+ key = extractAndConvertEnvID (e )
120135 wrappedEvent = & pb.Event {
121136 Timestamp : timestamp .UnixMilli (),
122137 TimestampNano : timestamp .UnixNano (),
123138 Payload : & pb.Event_RoleEvent {RoleEvent : e },
124139 }
125140 case * pb.Ev_EnvironmentEvent :
141+ key = extractAndConvertEnvID (e )
126142 wrappedEvent = & pb.Event {
127143 Timestamp : timestamp .UnixMilli (),
128144 TimestampNano : timestamp .UnixNano (),
129145 Payload : & pb.Event_EnvironmentEvent {EnvironmentEvent : e },
130146 }
131147 case * pb.Ev_CallEvent :
148+ key = extractAndConvertEnvID (e )
132149 wrappedEvent = & pb.Event {
133150 Timestamp : timestamp .UnixMilli (),
134151 TimestampNano : timestamp .UnixNano (),
135152 Payload : & pb.Event_CallEvent {CallEvent : e },
136153 }
137154 case * pb.Ev_IntegratedServiceEvent :
155+ key = extractAndConvertEnvID (e )
138156 wrappedEvent = & pb.Event {
139157 Timestamp : timestamp .UnixMilli (),
140158 TimestampNano : timestamp .UnixNano (),
141159 Payload : & pb.Event_IntegratedServiceEvent {IntegratedServiceEvent : e },
142160 }
143161 case * pb.Ev_RunEvent :
162+ key = extractAndConvertEnvID (e )
144163 wrappedEvent = & pb.Event {
145164 Timestamp : timestamp .UnixMilli (),
146165 TimestampNano : timestamp .UnixNano (),
@@ -151,7 +170,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
151170 if wrappedEvent == nil {
152171 err = fmt .Errorf ("unsupported event type" )
153172 } else {
154- err = w .doWriteEvent (wrappedEvent )
173+ err = w .doWriteEvent (key , wrappedEvent )
155174 }
156175
157176 if err != nil {
@@ -162,7 +181,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
162181 }()
163182}
164183
165- func (w * KafkaWriter ) doWriteEvent (e * pb.Event ) error {
184+ func (w * KafkaWriter ) doWriteEvent (key [] byte , e * pb.Event ) error {
166185 if w == nil {
167186 return nil
168187 }
@@ -172,10 +191,15 @@ func (w *KafkaWriter) doWriteEvent(e *pb.Event) error {
172191 return fmt .Errorf ("failed to marshal event: %w" , err )
173192 }
174193
175- err = w . WriteMessages ( context . Background (), kafka.Message {
194+ message := kafka.Message {
176195 Value : data ,
177- })
196+ }
197+
198+ if key != nil {
199+ message .Key = key
200+ }
178201
202+ err = w .WriteMessages (context .Background (), message )
179203 if err != nil {
180204 return fmt .Errorf ("failed to write event: %w" , err )
181205 }
0 commit comments