Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/maven-multi-os.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ jobs:
elif [[ "${{ runner.os }}" == "macOS" ]]; then
mvn validate -Dos.version=osx-x86_64
fi
- name: Run Tests with Maven
run: |
if [[ "${{ runner.os }}" == "Linux" ]]; then
mvn test -Dos.version=linux-x86_64
elif [[ "${{ runner.os }}" == "macOS" ]]; then
mvn test -Dos.version=osx-x86_64
fi
- name: Package with Maven (Fat JAR)
run: |
if [[ "${{ runner.os }}" == "Linux" ]]; then
Expand Down
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,28 @@

Repository based on work from @iclegrand in repository: https://github.com/iclegrand/AliDip2BK

Projects consumes selected messages from the CERN DIP system (LHC & ALICE -DCS) and publishes them into the O2 systems. A detailed description for this project is provided by Roberto in this document:
The BKP-LHC Client is a java based application which uses the CERN DIP `jar` dependency to consume events from desired tracks. These events are then either:
- published on O2 Kafka Topics to be consumed further by O2 applications (e.g. ECS)
- updates the O2 Bookkeeping application via their HTTP endpoints.

A detailed description for this project is provided by Roberto in this document:
https://codimd.web.cern.ch/G0TSXqA1R8iPqWw2w2wuew

### Published Events
Currently the BKP-LHC-Client publishes on Kafka (topic: "dip.lhc.beam_mode") events for the start and end of stable beams in the format of `Ev_BeamModeEvent`. The proto file's source of truth is within the [Control Repository](https://github.com/AliceO2Group/Control/blob/master/common/protos/events.proto)

### Requirements
- This program requires java 11 on a 64 bit system (this is a constrain from the DIP library)
- maven
-
### Configuration
The run configuration is defined in the `AliDip2BK.properties` file.

### Maven Commands for dev,tst,deployments
```bash
mvn <clean> compile -Dos.version={os_version}
mvn <clean> package -Dos.version={os_version}
mvn tst -Dos.version={os_version}
```

E.g. os_version `macosx-x86_64`
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<maven.compiler.target>16</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<os.version>linux-x86_64</os.version>
<cern.dip.version>5.7.0</cern.dip.version>
<cern.dip.version>2.7.0</cern.dip.version>
<protobuf.version>4.29.3</protobuf.version>
<kafka.version>3.1.0</kafka.version>
<slf4j-api.version>1.7.30</slf4j-api.version>
Expand Down Expand Up @@ -47,6 +47,13 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j-simple.version}</version>
</dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.13.4</version>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
26 changes: 18 additions & 8 deletions src/main/java/alice/dip/AliDip2BK.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
/*************
* cil
**************/

/*
* Main Class
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

package alice.dip;

import java.io.BufferedWriter;
Expand All @@ -18,8 +21,10 @@
import java.util.Date;
import java.util.Properties;

import alice.dip.beam.mode.BeamModeEventsKafkaProducer;

public class AliDip2BK implements Runnable {
public static String Version = "2.1.2 22-Jul-2025";
public static String Version = "3.0.0 13-Oct-2025";
public static String DNSnode = "dipnsdev.cern.ch";
public static String[] endFillCases = {"CUCU"};
public static boolean LIST_PARAM = false;
Expand Down Expand Up @@ -52,6 +57,7 @@ public class AliDip2BK implements Runnable {
BookkeepingClient bookkeepingClient;
StartOfRunKafkaConsumer kcs;
EndOfRunKafkaConsumer kce;
BeamModeEventsKafkaProducer beamModeEventsKafkaProducer;

public AliDip2BK() {
startDate = (new Date()).getTime();
Expand Down Expand Up @@ -82,6 +88,8 @@ public AliDip2BK() {
kcs = new StartOfRunKafkaConsumer(dipMessagesProcessor);

kce = new EndOfRunKafkaConsumer(dipMessagesProcessor);
beamModeEventsKafkaProducer = new BeamModeEventsKafkaProducer(AliDip2BK.bootstrapServers);
dipMessagesProcessor.setEventsProducer(beamModeEventsKafkaProducer);

shutdownProc();

Expand Down Expand Up @@ -145,6 +153,8 @@ public void run() {
}
dipMessagesProcessor.saveState();
writeStat("AliDip2BK.stat", true);
beamModeEventsKafkaProducer.close();
log(4, "AliDip2BK", "Beam Mode Events Kafka Producer closed");
}
});
}
Expand Down
97 changes: 50 additions & 47 deletions src/main/java/alice/dip/DipMessagesProcessor.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
/*************
* cil
**************/

/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/
package alice.dip;

import java.io.BufferedWriter;
Expand All @@ -23,6 +31,8 @@
import cern.dip.DipData;
import cern.dip.TypeMismatch;

import alice.dip.beam.mode.BeamModeEventsKafkaProducer;

/*
* Process dip messages received from the DipClient
* Receives DipData messages in a blocking Queue and then process them asynchronously
Expand All @@ -45,11 +55,13 @@ public class DipMessagesProcessor implements Runnable {
private BlockingQueue<MessageItem> outputQueue = new ArrayBlockingQueue<>(100);

private final LuminosityManager luminosityManager;
private volatile BeamModeEventsKafkaProducer beamModeEventsKafkaProducer;

public DipMessagesProcessor(BookkeepingClient bookkeepingClient, LuminosityManager luminosityManager) {

this.bookkeepingClient = bookkeepingClient;
this.luminosityManager = luminosityManager;
this.beamModeEventsKafkaProducer = null;

Thread t = new Thread(this);
t.start();
Expand All @@ -58,6 +70,14 @@ public DipMessagesProcessor(BookkeepingClient bookkeepingClient, LuminosityManag
loadState();
}

/**
* Setter of events producer
* @param beamModeEventsKafkaProducer - instance of BeamModeEventsKafkaProducer to be used to send events
*/
public void setEventsProducer(BeamModeEventsKafkaProducer beamModeEventsKafkaProducer) {
this.beamModeEventsKafkaProducer = beamModeEventsKafkaProducer;
}

/*
* This method is used for receiving DipData messages from the Dip Client
*/
Expand Down Expand Up @@ -299,25 +319,25 @@ private void handleSafeBeamMessage(DipData dipData) throws BadParameter, TypeMis
if (currentFill == null) return;

String bm = currentFill.getBeamMode();

if (bm.contentEquals("STABLE BEAMS")) {
AliDip2BK.log(
0,
"ProcData.newSafeBeams",
" VAL=" + safeBeamPayload + " isB1=" + isBeam1 + " isB2=" + isBeam2 + " isSB=" + isStableBeams
);

if (!isBeam1 || !isBeam2) {
AliDip2BK.log(
1,
"ProcData.newSafeBeams",
" VAL=" + safeBeamPayload + " isB1=" + isBeam1 + " isB2=" + isBeam2 + " isSB=" + isStableBeams
);
if (bm != null) {
if ((bm.contentEquals("STABLE BEAMS") && (!isBeam1 || !isBeam2))) {
currentFill.setBeamMode(time, "LOST BEAMS");
if (this.beamModeEventsKafkaProducer != null) {
this.beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, time);
}
AliDip2BK.log(5, "ProcData.newSafeBeams", " CHANGE BEAM MODE TO LOST BEAMS !!! ");
} else if (bm.contentEquals("LOST BEAMS") && isBeam1 && isBeam2) {
currentFill.setBeamMode(time, "STABLE BEAMS");
if (this.beamModeEventsKafkaProducer != null) {
this.beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, time);
}
AliDip2BK.log(5, "ProcData.newSafeBeams", " RECOVER FROM BEAM LOST TO STABLE BEAMS ");
}

return;
}

if (bm.contentEquals("LOST BEAMS") && isBeam1 && isBeam2) {
currentFill.setBeamMode(time, "STABLE BEAMS");
AliDip2BK.log(5, "ProcData.newSafeBeams", " RECOVER FROM BEAM LOST TO STABLE BEAMS ");
}
}

Expand Down Expand Up @@ -569,35 +589,18 @@ public void newFillNo(long date, String strFno, String par1, String par2, String
}

public void newBeamMode(long date, String BeamMode) {

if (currentFill != null) {
AliDip2BK.log(
2,
"ProcData.newBeamMode",
"New beam mode=" + BeamMode + " for FILL_NO=" + currentFill.fillNo
);
currentFill.setBeamMode(date, BeamMode);
bookkeepingClient.updateLhcFill(currentFill);
saveState();

int mc = -1;
for (int i = 0; i < AliDip2BK.endFillCases.length; i++) {
if (AliDip2BK.endFillCases[i].equalsIgnoreCase(BeamMode)) mc = i;
}
if (mc < 0) {

AliDip2BK.log(
2,
"ProcData.newBeamMode",
"New beam mode=" + BeamMode + " for FILL_NO=" + currentFill.fillNo
);
bookkeepingClient.updateLhcFill(currentFill);
saveState();
} else {
currentFill.endedTime = date;
bookkeepingClient.updateLhcFill(currentFill);
if (AliDip2BK.KEEP_FILLS_HISTORY_DIRECTORY != null) {
writeFillHistFile(currentFill);
}
AliDip2BK.log(
3,
"ProcData.newBeamMode",
"CLOSE Fill_NO=" + currentFill.fillNo + " Based on new beam mode=" + BeamMode
);
currentFill = null;
if (this.beamModeEventsKafkaProducer != null) {
this.beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, date);
}
} else {
AliDip2BK.log(4, "ProcData.newBeamMode", " ERROR new beam mode=" + BeamMode + " NO FILL NO for it");
Expand Down Expand Up @@ -753,7 +756,7 @@ private void handleBookkeepingCtpClockMessage(DipData dipData) throws BadParamet
var phaseShiftBeam2 = dipData.extractFloat("PhaseShift_Beam2");

AliDip2BK.log(
2,
0,
"ProcData.dispatch",
" Bookkeeping CTP Clock: PhaseShift_Beam1=" + phaseShiftBeam1 + " PhaseShift_Beam2=" + phaseShiftBeam2
);
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/alice/dip/adapters/BeamModeProtoAdapter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

package alice.dip.adapters;

import alice.dip.enums.BeamModeEnum;

/**
* Adapter class to convert between string representations of beam modes and the BeamModeEnum.
*/
public class BeamModeProtoAdapter {

/**
* Returns the enum constant matching the given string, or UNKNOWN if not found.
* Accepts both space and underscore separated names, case-insensitive.
* @param beamMode The beam mode string to convert.
* @return The corresponding BeamModeEnum constant, or UNKNOWN if not recognized.
*/
public static BeamModeEnum fromStringToEnum(String beamMode) {
if (beamMode == null || beamMode.trim().isEmpty()) {
return BeamModeEnum.UNKNOWN;
}
for (BeamModeEnum value : BeamModeEnum.values()) {
if (value.label.equalsIgnoreCase(beamMode)) {
return value;
}
}
return BeamModeEnum.UNKNOWN;
}
}
Loading