Skip to content

Commit ab3a965

Browse files
committed
Stability improvement for o2-sim startup
Fixing a zeromq communication problem when the O2PrimaryServerDevice is too fast and quitting before O2HitMerge is even initialized. This can happen rarely, for instance when startup needs long due to CVMFS latency. This situation is now avoided by waiting for the permission to shutdown. The permission is given by O2HitMerger once the system is up and running (first actual hits have been received). Some minor cleanup (comment removal) in addition.
1 parent 0b946d9 commit ab3a965

File tree

4 files changed

+51
-15
lines changed

4 files changed

+51
-15
lines changed

run/O2HitMerger.h

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,29 @@ namespace o2
8888
namespace devices
8989
{
9090

91+
// Function communicating to primary particle server that it is now safe to shutdown.
92+
// From the perspective of o2-sim, this is the case when all configs have been propagated and the system
93+
// is running ok: For instance after the HitMerger is initialized and got it's first data from Geant workers.
94+
bool primaryServer_sendShutdownPermission(fair::mq::Channel& channel)
95+
{
96+
std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage((int)o2::O2PrimaryServerInfoRequest::AllowShutdown));
97+
std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
98+
99+
int timeoutinMS = 100;
100+
if (channel.Send(request, timeoutinMS) > 0) {
101+
LOG(info) << "Sending Shutdown permission to particle server";
102+
if (channel.Receive(reply, timeoutinMS) > 0) {
103+
// the answer is a simple ack with a status code
104+
LOG(info) << "Shutdown permission was acknowledged";
105+
} else {
106+
LOG(error) << "No answer received within " << timeoutinMS << "ms\n";
107+
return false;
108+
}
109+
return true;
110+
}
111+
return false;
112+
}
113+
91114
class O2HitMerger : public fair::mq::Device
92115
{
93116

@@ -129,6 +152,9 @@ class O2HitMerger : public fair::mq::Device
129152
if (o2::devices::O2SimDevice::querySimConfig(GetChannels().at("o2sim-primserv-info").at(0))) {
130153
outfilename = o2::base::NameConf::getMCKinematicsFileName(o2::conf::SimConfig::Instance().getOutPrefix().c_str());
131154
mNExpectedEvents = o2::conf::SimConfig::Instance().getNEvents();
155+
} else {
156+
// we didn't manage to get a configuration --> better to fail
157+
LOG(fatal) << "No configuration received. Aborting";
132158
}
133159
mAsService = o2::conf::SimConfig::Instance().asService();
134160
mForwardKine = o2::conf::SimConfig::Instance().forwardKine();
@@ -354,6 +380,13 @@ class O2HitMerger : public fair::mq::Device
354380
// for the next batch
355381
return waitForControlInput();
356382
}
383+
384+
static bool initAcknowledged = false;
385+
if (!initAcknowledged) {
386+
primaryServer_sendShutdownPermission(GetChannels().at("o2sim-primserv-info").at(0));
387+
initAcknowledged = true;
388+
}
389+
357390
return more;
358391
}
359392

@@ -413,10 +446,6 @@ class O2HitMerger : public fair::mq::Device
413446
};
414447
}
415448
}
416-
if (!expectmore) {
417-
// somehow FairMQ has difficulties shutting down; helping manually
418-
// raise(SIGINT);
419-
}
420449
return expectmore;
421450
}
422451

run/O2PrimaryServerDevice.h

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -247,11 +247,10 @@ class O2PrimaryServerDevice final : public fair::mq::Device
247247
}
248248
}
249249

250-
// launches a thread that listens for status requests from outside asynchronously
250+
// launches a thread that listens for status/config/shutdown requests from outside asynchronously
251251
void launchInfoThread()
252252
{
253253
static std::vector<std::thread> threads;
254-
255254
auto sendErrorReply = [](fair::mq::Channel& channel) {
256255
LOG(error) << "UNKNOWN REQUEST";
257256
std::unique_ptr<fair::mq::Message> reply(channel.NewSimpleMessage((int)(404)));
@@ -260,7 +259,9 @@ class O2PrimaryServerDevice final : public fair::mq::Device
260259

261260
LOG(info) << "LAUNCHING STATUS THREAD";
262261
auto lambda = [this, sendErrorReply]() {
263-
while (mState != O2PrimaryServerState::Stopped) {
262+
bool canShutdown{false};
263+
// Exit only when both: serving stopped and allowed from outside.
264+
while (!(mState == O2PrimaryServerState::Stopped && canShutdown)) {
264265
auto& channel = GetChannels().at("o2sim-primserv-info").at(0);
265266
if (!channel.IsValid()) {
266267
LOG(error) << "channel primserv-info not valid";
@@ -285,6 +286,11 @@ class O2PrimaryServerDevice final : public fair::mq::Device
285286
}
286287
} else if (request_payload == (int)O2PrimaryServerInfoRequest::Config) {
287288
HandleConfigRequest(channel);
289+
} else if (request_payload == (int)O2PrimaryServerInfoRequest::AllowShutdown) {
290+
LOG(info) << "Got info that we may shutdown";
291+
std::unique_ptr<fair::mq::Message> ack(channel.NewSimpleMessage(200));
292+
channel.Send(ack);
293+
canShutdown = true;
288294
} else {
289295
sendErrorReply(channel);
290296
}
@@ -518,10 +524,13 @@ class O2PrimaryServerDevice final : public fair::mq::Device
518524

519525
void PostRun() override
520526
{
527+
// We shouldn't shut down immediately when all events have been served
528+
// Instead we also need to wait until the info thread running some communication server
529+
// with other processes is finished.
521530
while (!mInfoThreadStopped) {
522531
LOG(info) << "Waiting info thread";
523532
using namespace std::chrono_literals;
524-
std::this_thread::sleep_for(100ms);
533+
std::this_thread::sleep_for(1000ms);
525534
}
526535
}
527536

@@ -534,7 +543,7 @@ class O2PrimaryServerDevice final : public fair::mq::Device
534543
if (mEventCounter >= mMaxEvents && mNeedNewEvent) {
535544
workavailable = false;
536545
}
537-
if (!(mState == O2PrimaryServerState::ReadyToServe || mState == O2PrimaryServerState::WaitingEvent)) {
546+
if (!(mState.load() == O2PrimaryServerState::ReadyToServe || mState.load() == O2PrimaryServerState::WaitingEvent)) {
538547
// send a zero answer
539548
workavailable = false;
540549
}

run/O2SimDevice.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,6 @@ class O2SimDevice final : public fair::mq::Device
9595
// returns true if successful / false if not
9696
static bool querySimConfig(fair::mq::Channel& channel)
9797
{
98-
// auto text = new std::string("configrequest");
99-
// std::unique_ptr<fair::mq::Message> request(channel.NewMessage(const_cast<char*>(text->c_str()),
100-
// text->length(), CustomCleanup, text));
10198
std::unique_ptr<fair::mq::Message> request(channel.NewSimpleMessage((int)O2PrimaryServerInfoRequest::Config));
10299
std::unique_ptr<fair::mq::Message> reply(channel.NewMessage());
103100

run/PrimaryServerState.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,11 @@ enum class O2PrimaryServerState {
2525
};
2626
static const char* PrimStateToString[5] = {"INIT", "SERVING", "WAITEVENT", "IDLE", "STOPPED"};
2727

28-
/// enum class for type of info request
28+
/// enum class for request to o2sim-primserv-info channel of the O2PrimaryServerDevice
2929
enum class O2PrimaryServerInfoRequest {
30-
Status = 1,
31-
Config = 2
30+
Status = 1, // asks to retrieve current status of O2PrimaryServerDevice --> will send O2PrimaryServerState
31+
Config = 2, // asks for o2-sim config reply
32+
AllowShutdown = 3 // can be used to let particle server know that shutdown is now safe (once all components initialized)
3233
};
3334

3435
/// Struct to be used as payload when making a request

0 commit comments

Comments
 (0)