@@ -27,44 +27,42 @@ package event
2727import (
2828 "context"
2929 "fmt"
30+
3031 "github.com/AliceO2Group/Control/common/event/topic"
3132 "github.com/AliceO2Group/Control/common/logger/infologger"
3233 pb "github.com/AliceO2Group/Control/common/protos"
3334 "github.com/segmentio/kafka-go"
3435 "github.com/spf13/viper"
3536 "google.golang.org/protobuf/proto"
36- "sync"
3737)
3838
3939// Reader interface provides methods to read events.
4040type Reader interface {
41+ // Next should return the next event or cancel if the context is cancelled.
4142 Next (ctx context.Context ) (* pb.Event , error )
43+ // Last should return the last available event currently present on the topic (or nil if none)
44+ // or cancel if the context is cancelled.
45+ Last (ctx context.Context ) (* pb.Event , error )
4246 Close () error
4347}
4448
4549// DummyReader is an implementation of Reader that returns no events.
4650type DummyReader struct {}
4751
48- // Next returns the next event or nil if there are no more events.
4952func (* DummyReader ) Next (context.Context ) (* pb.Event , error ) { return nil , nil }
50-
51- // Close closes the DummyReader.
52- func (* DummyReader ) Close () error { return nil }
53+ func (* DummyReader ) Last (context.Context ) (* pb.Event , error ) { return nil , nil }
54+ func (* DummyReader ) Close () error { return nil }
5355
5456// KafkaReader reads events from Kafka and provides a blocking, cancellable API to fetch events.
55- // Consumption mode is chosen at creation time:
56- // - latestOnly=false: consume everything (from stored offsets or beginning depending on group state)
57- // - latestOnly=true: seek to latest offsets on start and only receive messages produced after start
5857type KafkaReader struct {
5958 * kafka.Reader
60- mu sync.Mutex
61- topic string
59+ topic string
60+ brokers []string
61+ groupID string
6262}
6363
6464// NewReaderWithTopic creates a KafkaReader for the provided topic and starts it.
65- // If latestOnly is true the reader attempts to seek to the latest offsets on start so that
66- // only new messages (produced after creation) are consumed.
67- func NewReaderWithTopic (topic topic.Topic , groupID string , latestOnly bool ) * KafkaReader {
65+ func NewReaderWithTopic (topic topic.Topic , groupID string ) * KafkaReader {
6866 cfg := kafka.ReaderConfig {
6967 Brokers : viper .GetStringSlice ("kafkaEndpoints" ),
7068 Topic : string (topic ),
@@ -74,53 +72,80 @@ func NewReaderWithTopic(topic topic.Topic, groupID string, latestOnly bool) *Kaf
7472 }
7573
7674 rk := & KafkaReader {
77- Reader : kafka .NewReader (cfg ),
78- topic : string (topic ),
79- }
80-
81- if latestOnly {
82- // best-effort: set offset to last so we don't replay older messages
83- if err := rk .SetOffset (kafka .LastOffset ); err != nil {
84- log .WithField (infologger .Level , infologger .IL_Devel ).
85- Warnf ("failed to set offset to last offset: %v" , err )
86- }
75+ Reader : kafka .NewReader (cfg ),
76+ topic : string (topic ),
77+ brokers : append ([]string {}, cfg .Brokers ... ),
78+ groupID : groupID ,
8779 }
88-
8980 return rk
9081}
9182
92- // Next blocks until the next event is available or ctx is cancelled. It returns an error when the reader is closed
93- // (io.EOF) or the context is cancelled. The caller is responsible for providing a cancellable ctx.
83+ // Next blocks until the next event is available or ctx is cancelled.
9484func (r * KafkaReader ) Next (ctx context.Context ) (* pb.Event , error ) {
9585 if r == nil {
9686 return nil , fmt .Errorf ("nil reader" )
9787 }
98-
9988 msg , err := r .ReadMessage (ctx )
10089 if err != nil {
10190 return nil , err
10291 }
92+ return kafkaMessageToEvent (msg )
93+ }
10394
104- event , err := kafkaMessageToEvent (msg )
95+ // Last fetches the last available message on the topic (considering all partitions).
96+ // If multiple partitions have data, the event with the greatest message timestamp is returned.
97+ func (r * KafkaReader ) Last (ctx context.Context ) (* pb.Event , error ) {
98+ if r == nil {
99+ return nil , fmt .Errorf ("nil reader" )
100+ }
101+ partitions , err := r .readPartitions ()
105102 if err != nil {
106103 return nil , err
107104 }
108-
109- return event , nil
105+ var latestEvt * pb.Event
106+ var latestEvtTimeNs int64
107+ for _ , p := range partitions {
108+ if p .Topic != r .topic {
109+ continue
110+ }
111+ first , last , err := r .readFirstAndLast (p .ID )
112+ if err != nil {
113+ log .WithField (infologger .Level , infologger .IL_Devel ).WithError (err ).
114+ Warnf ("failed to read offsets for %s[%d]" , r .topic , p .ID )
115+ continue
116+ }
117+ if last <= first {
118+ continue
119+ }
120+ msg , err := r .readAtOffset (ctx , p .ID , last - 1 )
121+ if err != nil {
122+ log .WithError (err ).
123+ WithField (infologger .Level , infologger .IL_Devel ).
124+ Warnf ("failed to read last message for %s[%d] at offset %d" , r .topic , p .ID , last - 1 )
125+ continue
126+ }
127+ evt , err := kafkaMessageToEvent (msg )
128+ if err != nil {
129+ log .WithError (err ).
130+ WithField (infologger .Level , infologger .IL_Devel ).
131+ Warnf ("failed to decode last message for %s[%d]" , r .topic , p .ID )
132+ continue
133+ }
134+ currentEvtTimeNs := msg .Time .UnixNano ()
135+ if latestEvt == nil || currentEvtTimeNs > latestEvtTimeNs {
136+ latestEvt = evt
137+ latestEvtTimeNs = currentEvtTimeNs
138+ }
139+ }
140+ return latestEvt , nil
110141}
111142
112143// Close stops the reader.
113144func (r * KafkaReader ) Close () error {
114145 if r == nil {
115146 return nil
116147 }
117- // Close the underlying kafka reader which will cause ReadMessage to return an error
118- err := r .Reader .Close ()
119- if err != nil {
120- log .WithField (infologger .Level , infologger .IL_Devel ).
121- Errorf ("failed to close kafka reader: %v" , err )
122- }
123- return err
148+ return r .Reader .Close ()
124149}
125150
126151func kafkaMessageToEvent (m kafka.Message ) (* pb.Event , error ) {
@@ -130,3 +155,55 @@ func kafkaMessageToEvent(m kafka.Message) (*pb.Event, error) {
130155 }
131156 return & evt , nil
132157}
158+
159+ func (r * KafkaReader ) brokerAddr () (string , error ) {
160+ if len (r .brokers ) == 0 {
161+ return "" , fmt .Errorf ("no kafka brokers configured" )
162+ }
163+ return r .brokers [0 ], nil
164+ }
165+
166+ func (r * KafkaReader ) readPartitions () ([]kafka.Partition , error ) {
167+ addr , err := r .brokerAddr ()
168+ if err != nil {
169+ return nil , err
170+ }
171+ conn , err := kafka .Dial ("tcp" , addr )
172+ if err != nil {
173+ return nil , err
174+ }
175+ defer conn .Close ()
176+ return conn .ReadPartitions (r .topic )
177+ }
178+
179+ func (r * KafkaReader ) readFirstAndLast (partition int ) (int64 , int64 , error ) {
180+ addr , err := r .brokerAddr ()
181+ if err != nil {
182+ return 0 , 0 , err
183+ }
184+ conn , err := kafka .DialLeader (context .Background (), "tcp" , addr , r .topic , partition )
185+ if err != nil {
186+ return 0 , 0 , err
187+ }
188+ defer conn .Close ()
189+ first , last , err := conn .ReadOffsets ()
190+ return first , last , err
191+ }
192+
193+ func (r * KafkaReader ) readAtOffset (ctx context.Context , partition int , offset int64 ) (kafka.Message , error ) {
194+ if offset < 0 {
195+ return kafka.Message {}, fmt .Errorf ("invalid offset %d" , offset )
196+ }
197+ kr := kafka .NewReader (kafka.ReaderConfig {
198+ Brokers : append ([]string {}, r .brokers ... ),
199+ Topic : r .topic ,
200+ Partition : partition ,
201+ MinBytes : 1 ,
202+ MaxBytes : 10e6 ,
203+ })
204+ defer kr .Close ()
205+ if err := kr .SetOffset (offset ); err != nil {
206+ return kafka.Message {}, err
207+ }
208+ return kr .ReadMessage (ctx )
209+ }
0 commit comments