Skip to content

Commit 31b8710

Browse files
committed
Add a Kafka reader
A necessary building block for OCTRL-1049
1 parent f03795d commit 31b8710

File tree

2 files changed

+180
-0
lines changed

2 files changed

+180
-0
lines changed

common/event/reader.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2025 CERN and copyright holders of ALICE O².
5+
* Author: Piotr Konopka <pkonopka@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package event
26+
27+
import (
28+
"context"
29+
"fmt"
30+
"github.com/AliceO2Group/Control/common/event/topic"
31+
"github.com/AliceO2Group/Control/common/logger/infologger"
32+
pb "github.com/AliceO2Group/Control/common/protos"
33+
"github.com/segmentio/kafka-go"
34+
"github.com/spf13/viper"
35+
"google.golang.org/protobuf/proto"
36+
"sync"
37+
)
38+
39+
// Reader interface provides methods to read events.
40+
type Reader interface {
41+
Next(ctx context.Context) (*pb.Event, error)
42+
Close() error
43+
}
44+
45+
// DummyReader is an implementation of Reader that returns no events.
46+
type DummyReader struct{}
47+
48+
// Next returns the next event or nil if there are no more events.
49+
func (*DummyReader) Next(context.Context) (*pb.Event, error) { return nil, nil }
50+
51+
// Close closes the DummyReader.
52+
func (*DummyReader) Close() error { return nil }
53+
54+
// KafkaReader reads events from Kafka and provides a blocking, cancellable API to fetch events.
55+
// Consumption mode is chosen at creation time:
56+
// - latestOnly=false: consume everything (from stored offsets or beginning depending on group state)
57+
// - latestOnly=true: seek to latest offsets on start and only receive messages produced after start
58+
type KafkaReader struct {
59+
*kafka.Reader
60+
mu sync.Mutex
61+
topic string
62+
}
63+
64+
// NewReaderWithTopic creates a KafkaReader for the provided topic and starts it.
65+
// If latestOnly is true the reader attempts to seek to the latest offsets on start so that
66+
// only new messages (produced after creation) are consumed.
67+
func NewReaderWithTopic(topic topic.Topic, groupID string, latestOnly bool) *KafkaReader {
68+
cfg := kafka.ReaderConfig{
69+
Brokers: viper.GetStringSlice("kafkaEndpoints"),
70+
Topic: string(topic),
71+
GroupID: groupID,
72+
MinBytes: 1,
73+
MaxBytes: 10e7,
74+
}
75+
76+
rk := &KafkaReader{
77+
Reader: kafka.NewReader(cfg),
78+
topic: string(topic),
79+
}
80+
81+
if latestOnly {
82+
// best-effort: set offset to last so we don't replay older messages
83+
if err := rk.SetOffset(kafka.LastOffset); err != nil {
84+
log.WithField(infologger.Level, infologger.IL_Devel).
85+
Warnf("failed to set offset to last offset: %v", err)
86+
}
87+
}
88+
89+
return rk
90+
}
91+
92+
// Next blocks until the next event is available or ctx is cancelled. It returns an error when the reader is closed
93+
// (io.EOF) or the context is cancelled. The caller is responsible for providing a cancellable ctx.
94+
func (r *KafkaReader) Next(ctx context.Context) (*pb.Event, error) {
95+
if r == nil {
96+
return nil, fmt.Errorf("nil reader")
97+
}
98+
99+
msg, err := r.ReadMessage(ctx)
100+
if err != nil {
101+
return nil, err
102+
}
103+
104+
event, err := kafkaMessageToEvent(msg)
105+
if err != nil {
106+
return nil, err
107+
}
108+
109+
return event, nil
110+
}
111+
112+
// Close stops the reader.
113+
func (r *KafkaReader) Close() error {
114+
if r == nil {
115+
return nil
116+
}
117+
// Close the underlying kafka reader which will cause ReadMessage to return an error
118+
err := r.Reader.Close()
119+
if err != nil {
120+
log.WithField(infologger.Level, infologger.IL_Devel).
121+
Errorf("failed to close kafka reader: %v", err)
122+
}
123+
return err
124+
}
125+
126+
func kafkaMessageToEvent(m kafka.Message) (*pb.Event, error) {
127+
var evt pb.Event
128+
if err := proto.Unmarshal(m.Value, &evt); err != nil {
129+
return nil, fmt.Errorf("failed to unmarshal kafka message: %w", err)
130+
}
131+
return &evt, nil
132+
}

common/event/reader_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2025 CERN and copyright holders of ALICE O².
5+
* Author: Piotr Konopka <pkonopka@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package event
26+
27+
import (
28+
pb "github.com/AliceO2Group/Control/common/protos"
29+
. "github.com/onsi/ginkgo/v2"
30+
. "github.com/onsi/gomega"
31+
"github.com/segmentio/kafka-go"
32+
"google.golang.org/protobuf/proto"
33+
)
34+
35+
var _ = Describe("Reader", func() {
36+
When("converting kafka message to event", func() {
37+
It("unmarshals protobuf payload correctly", func() {
38+
e := &pb.Event{Payload: &pb.Event_CoreStartEvent{CoreStartEvent: &pb.Ev_MetaEvent_CoreStart{FrameworkId: "z"}}}
39+
b, err := proto.Marshal(e)
40+
Expect(err).To(BeNil())
41+
42+
m := kafka.Message{Value: b}
43+
evt, err := kafkaMessageToEvent(m)
44+
Expect(err).To(BeNil())
45+
Expect(evt.GetCoreStartEvent().FrameworkId).To(Equal("z"))
46+
})
47+
})
48+
})

0 commit comments

Comments
 (0)