Skip to content

Commit 98d49cf

Browse files
committed
New LHC plugin to follow LHC updates available in Kafka
The updates come from ALICE LHC DIP client. Work towards OCTRL-1049. Eventually, this plugin will also allow to get rid of BKP.RetrieveFillInfo call and consume Fill information without going through BKP.
1 parent 31b8710 commit 98d49cf

File tree

5 files changed

+267
-0
lines changed

5 files changed

+267
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ There are two ways of interacting with AliECS:
113113
* [ECS2DCS2ECS mock server](/core/integration/README.md#ecs2dcs2ecs-mock-server)
114114
* [DD Scheduler](/core/integration/README.md#dd-scheduler)
115115
* [Kafka (legacy)](/core/integration/README.md#kafka-legacy)
116+
* [LHC](/core/integration/README.md)
116117
* [ODC](/core/integration/README.md#odc)
117118
* [Test plugin](/core/integration/README.md#test-plugin)
118119
* [Trigger](/core/integration/README.md#trigger)

cmd/o2-aliecs-core/main.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/AliceO2Group/Control/core/integration/dcs"
3636
"github.com/AliceO2Group/Control/core/integration/ddsched"
3737
"github.com/AliceO2Group/Control/core/integration/kafka"
38+
"github.com/AliceO2Group/Control/core/integration/lhc"
3839
"github.com/AliceO2Group/Control/core/integration/odc"
3940
"github.com/AliceO2Group/Control/core/integration/testplugin"
4041
"github.com/AliceO2Group/Control/core/integration/trg"
@@ -64,6 +65,10 @@ func init() {
6465
"kafka",
6566
"kafkaEndpoint",
6667
kafka.NewPlugin)
68+
integration.RegisterPlugin(
69+
"lhc",
70+
"kafkaEndpoints",
71+
lhc.NewPlugin)
6772
integration.RegisterPlugin(
6873
"odc",
6974
"odcEndpoint",

core/integration/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ DD scheduler plugin informs the Data Distribution software about the pool of FLP
177177

178178
See [Legacy events: Kafka plugin](/docs/kafka.md#legacy-events-kafka-plugin)
179179

180+
# LHC plugin
181+
182+
This plugin listens to Kafka messages coming from the LHC DIP Client and pushes any relevant internal notifications to the AliECS core.
183+
Its main purpose is to provide basic information about ongoing LHC activity (e.g. fill information) to affected parties and allow AliECS to react upon them (e.g. by automatically stopping a physics run when stable beams are over).
184+
180185
## ODC
181186

182187
ODC plugin communicates with the [Online Device Control (ODC)](https://github.com/FairRootGroup/ODC) instance of the ALICE experiment, which controls the event processing farm used in data taking and offline processing.
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2025 CERN and copyright holders of ALICE O².
5+
* Author: Piotr Konopka <pkonopka@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package event
26+
27+
import (
28+
"github.com/AliceO2Group/Control/common/event"
29+
commonpb "github.com/AliceO2Group/Control/common/protos"
30+
)
31+
32+
// BeamInfo mirrors (a subset of) the information described in the proto draft.
33+
type BeamInfo struct {
34+
StableBeamsStart int64 `json:"stableBeamsStart,omitempty"`
35+
StableBeamsEnd int64 `json:"stableBeamsEnd,omitempty"`
36+
FillNumber int32 `json:"fillNumber,omitempty"`
37+
FillingSchemeName string `json:"fillingSchemeName,omitempty"`
38+
BeamType string `json:"beamType,omitempty"`
39+
BeamMode commonpb.BeamMode `json:"beamMode,omitempty"`
40+
}
41+
42+
type LhcStateChangeEvent struct {
43+
event.IntegratedServiceEventBase
44+
BeamInfo BeamInfo
45+
}
46+
47+
func (e *LhcStateChangeEvent) GetName() string {
48+
return "LHC_STATE_CHANGE_EVENT"
49+
}
50+
51+
func (e *LhcStateChangeEvent) GetBeamInfo() BeamInfo {
52+
if e == nil {
53+
return BeamInfo{}
54+
}
55+
return e.BeamInfo
56+
}

core/integration/lhc/plugin.go

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2025 CERN and copyright holders of ALICE O².
5+
* Author: Piotr Konopka <pkonopka@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package lhc
26+
27+
import (
28+
"context"
29+
"encoding/json"
30+
"errors"
31+
"github.com/AliceO2Group/Control/common/event/topic"
32+
"github.com/AliceO2Group/Control/common/logger/infologger"
33+
pb "github.com/AliceO2Group/Control/common/protos"
34+
"io"
35+
"strings"
36+
"sync"
37+
"time"
38+
39+
cmnevent "github.com/AliceO2Group/Control/common/event"
40+
"github.com/AliceO2Group/Control/common/logger"
41+
"github.com/AliceO2Group/Control/common/utils/uid"
42+
"github.com/AliceO2Group/Control/core/environment"
43+
"github.com/AliceO2Group/Control/core/integration"
44+
lhcevent "github.com/AliceO2Group/Control/core/integration/lhc/event"
45+
"github.com/sirupsen/logrus"
46+
"github.com/spf13/viper"
47+
)
48+
49+
var log = logger.New(logrus.StandardLogger(), "lhcclient")
50+
var dipClientTopic topic.Topic = "dip.lhc.beam_mode"
51+
52+
// Plugin implements integration.Plugin and listens for LHC updates.
53+
type Plugin struct {
54+
endpoint string
55+
ctx context.Context
56+
//cancel context.CancelFunc
57+
//wg sync.WaitGroup
58+
mu sync.Mutex
59+
currentState *pb.BeamInfo
60+
reader cmnevent.Reader
61+
}
62+
63+
func NewPlugin(endpoint string) integration.Plugin {
64+
65+
return &Plugin{endpoint: endpoint, mu: sync.Mutex{}, currentState: &pb.BeamInfo{BeamMode: pb.BeamMode_UNKNOWN}}
66+
}
67+
68+
func (p *Plugin) Init(_ string) error {
69+
70+
// use a background context for reader loop; Destroy will Close the reader
71+
p.ctx = context.Background()
72+
73+
p.reader = cmnevent.NewReaderWithTopic(dipClientTopic, "", true)
74+
75+
if p.reader == nil {
76+
return errors.New("could not create a kafka reader for LHC plugin")
77+
}
78+
go p.readAndInjectLhcUpdates()
79+
80+
log.Debug("LHC plugin initialized (client started)")
81+
return nil
82+
}
83+
84+
func (p *Plugin) GetName() string { return "lhc" }
85+
func (p *Plugin) GetPrettyName() string { return "LHC (DIP/Kafka client)" }
86+
func (p *Plugin) GetEndpoint() string {
87+
return strings.Join(viper.GetStringSlice("kafkaEndpoints"), ",")
88+
}
89+
90+
func (p *Plugin) GetConnectionState() string {
91+
if p == nil || p.reader == nil {
92+
return "UNKNOWN"
93+
}
94+
return "READY" // Unfortunately, kafka.Reader does not provide any GetStatus method
95+
}
96+
97+
func (p *Plugin) GetData(_ []any) string {
98+
p.mu.Lock()
99+
defer p.mu.Unlock()
100+
if p.currentState == nil {
101+
return ""
102+
}
103+
104+
outMap := make(map[string]interface{})
105+
outMap["BeamMode"] = p.currentState.BeamMode.String()
106+
outMap["BeamType"] = p.currentState.BeamType
107+
outMap["FillingSchemeName"] = p.currentState.FillingSchemeName
108+
outMap["FillNumber"] = p.currentState.FillNumber
109+
outMap["StableBeamsEnd"] = p.currentState.StableBeamsEnd
110+
outMap["StableBeamsStart"] = p.currentState.StableBeamsStart
111+
112+
b, _ := json.Marshal(outMap)
113+
return string(b)
114+
}
115+
116+
func (p *Plugin) GetEnvironmentsData(envIds []uid.ID) map[uid.ID]string {
117+
// there is nothing sensible we could provide here, LHC client is not environment-specific
118+
return nil
119+
}
120+
121+
func (p *Plugin) GetEnvironmentsShortData(envIds []uid.ID) map[uid.ID]string {
122+
return p.GetEnvironmentsData(envIds)
123+
}
124+
125+
func (p *Plugin) ObjectStack(_ map[string]string, _ map[string]string) (stack map[string]interface{}) {
126+
return make(map[string]interface{})
127+
}
128+
func (p *Plugin) CallStack(_ interface{}) (stack map[string]interface{}) {
129+
return make(map[string]interface{})
130+
}
131+
132+
func (p *Plugin) Destroy() error {
133+
if p == nil {
134+
return nil
135+
}
136+
p.mu.Lock()
137+
defer p.mu.Unlock()
138+
139+
if p.reader != nil {
140+
err := p.reader.Close()
141+
if err != nil {
142+
return err
143+
}
144+
}
145+
return nil
146+
}
147+
148+
func (p *Plugin) readAndInjectLhcUpdates() {
149+
for {
150+
msg, err := p.reader.Next(p.ctx)
151+
if errors.Is(err, io.EOF) {
152+
log.WithField(infologger.Level, infologger.IL_Support).
153+
Debug("received an EOF from Kafka reader, likely cancellation was requested, breaking")
154+
break
155+
}
156+
if err != nil {
157+
log.WithField(infologger.Level, infologger.IL_Support).
158+
WithError(err).
159+
Error("error while reading from Kafka")
160+
// in case of errors, we throttle the loop to mitigate the risk a log spam if error persists
161+
time.Sleep(time.Second * 1)
162+
continue
163+
}
164+
if msg == nil {
165+
log.WithField(infologger.Level, infologger.IL_Devel).
166+
Warn("received an empty message with no error. it's unexpected, but continuing")
167+
continue
168+
}
169+
170+
if bmEvt := msg.GetBeamModeEvent(); bmEvt != nil && bmEvt.GetBeamInfo() != nil {
171+
beamInfo := bmEvt.GetBeamInfo()
172+
log.WithField(infologger.Level, infologger.IL_Devel).
173+
Debugf("new LHC update received: BeamMode=%s, FillNumber=%d, FillingScheme=%s, StableBeamsStart=%d, StableBeamsEnd=%d, BeamType=%s",
174+
beamInfo.GetBeamMode().String(), beamInfo.GetFillNumber(), beamInfo.GetFillingSchemeName(),
175+
beamInfo.GetStableBeamsStart(), beamInfo.GetStableBeamsEnd(), beamInfo.GetBeamType())
176+
// update plugin state
177+
p.mu.Lock()
178+
p.currentState = beamInfo
179+
p.mu.Unlock()
180+
181+
// convert to internal LHC event and notify environment manager
182+
go func(beamInfo *pb.BeamInfo) {
183+
envMan := environment.ManagerInstance()
184+
185+
ev := &lhcevent.LhcStateChangeEvent{
186+
IntegratedServiceEventBase: cmnevent.IntegratedServiceEventBase{ServiceName: "LHC"},
187+
BeamInfo: lhcevent.BeamInfo{
188+
BeamMode: beamInfo.GetBeamMode(),
189+
StableBeamsStart: beamInfo.GetStableBeamsStart(),
190+
StableBeamsEnd: beamInfo.GetStableBeamsEnd(),
191+
FillNumber: beamInfo.GetFillNumber(),
192+
FillingSchemeName: beamInfo.GetFillingSchemeName(),
193+
BeamType: beamInfo.GetBeamType(),
194+
},
195+
}
196+
envMan.NotifyIntegratedServiceEvent(ev)
197+
}(beamInfo)
198+
}
199+
}
200+
}

0 commit comments

Comments
 (0)