Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 103 additions & 87 deletions common/event/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,28 @@ func (*DummyWriter) Close() {}

type KafkaWriter struct {
*kafka.Writer
toWriteChan chan kafka.Message
}

func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
return &KafkaWriter{
writer := &KafkaWriter{
Writer: &kafka.Writer{
Addr: kafka.TCP(viper.GetStringSlice("kafkaEndpoints")...),
Topic: string(topic),
Balancer: &kafka.Hash{},
AllowAutoTopicCreation: true,
},
toWriteChan: make(chan kafka.Message, 1000),
}

go writer.writingLoop()
return writer
}

func (w *KafkaWriter) Close() {
if w != nil {
w.Close()
close(w.toWriteChan)
w.Writer.Close()
}
}

Expand All @@ -80,6 +86,16 @@ func (w *KafkaWriter) WriteEvent(e interface{}) {
}
}

// TODO: we can optimise this to write multiple message at once
func (w *KafkaWriter) writingLoop() {
for message := range w.toWriteChan {
err := w.WriteMessages(context.Background(), message)
if err != nil {
log.Errorf("failed to write async kafka message: %w", err)
}
}
}

type HasEnvID interface {
GetEnvironmentId() string
}
Expand All @@ -97,91 +113,89 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
return
}

go func() {
var (
err error
wrappedEvent *pb.Event
key []byte = nil
)

switch e := e.(type) {
case *pb.Ev_MetaEvent_CoreStart:
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_CoreStartEvent{CoreStartEvent: e},
}
case *pb.Ev_MetaEvent_MesosHeartbeat:
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e},
}
case *pb.Ev_MetaEvent_FrameworkEvent:
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_FrameworkEvent{FrameworkEvent: e},
}
case *pb.Ev_TaskEvent:
key = []byte(e.Taskid)
if len(key) == 0 {
key = nil
}
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_TaskEvent{TaskEvent: e},
}
case *pb.Ev_RoleEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_RoleEvent{RoleEvent: e},
}
case *pb.Ev_EnvironmentEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_EnvironmentEvent{EnvironmentEvent: e},
}
case *pb.Ev_CallEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_CallEvent{CallEvent: e},
}
case *pb.Ev_IntegratedServiceEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e},
}
case *pb.Ev_RunEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_RunEvent{RunEvent: e},
}
var (
err error
wrappedEvent *pb.Event
key []byte = nil
)

switch e := e.(type) {
case *pb.Ev_MetaEvent_CoreStart:
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_CoreStartEvent{CoreStartEvent: e},
}

if wrappedEvent == nil {
err = fmt.Errorf("unsupported event type")
} else {
err = w.doWriteEvent(key, wrappedEvent)
case *pb.Ev_MetaEvent_MesosHeartbeat:
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e},
}

if err != nil {
log.WithField("event", e).
WithField("level", infologger.IL_Support).
Error(err.Error())
case *pb.Ev_MetaEvent_FrameworkEvent:
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_FrameworkEvent{FrameworkEvent: e},
}
case *pb.Ev_TaskEvent:
key = []byte(e.Taskid)
if len(key) == 0 {
key = nil
}
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_TaskEvent{TaskEvent: e},
}
case *pb.Ev_RoleEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_RoleEvent{RoleEvent: e},
}
}()
case *pb.Ev_EnvironmentEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_EnvironmentEvent{EnvironmentEvent: e},
}
case *pb.Ev_CallEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_CallEvent{CallEvent: e},
}
case *pb.Ev_IntegratedServiceEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e},
}
case *pb.Ev_RunEvent:
key = extractAndConvertEnvID(e)
wrappedEvent = &pb.Event{
Timestamp: timestamp.UnixMilli(),
TimestampNano: timestamp.UnixNano(),
Payload: &pb.Event_RunEvent{RunEvent: e},
}
}

if wrappedEvent == nil {
err = fmt.Errorf("unsupported event type")
} else {
err = w.doWriteEvent(key, wrappedEvent)
}

if err != nil {
log.WithField("event", e).
WithField("level", infologger.IL_Support).
Error(err.Error())
}
}

func (w *KafkaWriter) doWriteEvent(key []byte, e *pb.Event) error {
Expand All @@ -202,9 +216,11 @@ func (w *KafkaWriter) doWriteEvent(key []byte, e *pb.Event) error {
message.Key = key
}

err = w.WriteMessages(context.Background(), message)
if err != nil {
return fmt.Errorf("failed to write event: %w", err)
select {
case w.toWriteChan <- message:
default:
log.Warnf("Writer of kafka topic [%s] cannot write because channel is full, discarding a message", w.Writer.Topic)
}

return nil
}