Skip to content

Commit 2640284

Browse files
authored
DPL: set run number also on EoS (#14158)
1 parent 24c97f2 commit 2640284

File tree

1 file changed

+29
-3
lines changed

1 file changed

+29
-3
lines changed

Framework/Core/src/ExternalFairMQDeviceProxy.cxx

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,11 @@ void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::ve
397397
}
398398
std::string missing = "";
399399
bool showAlarm = false;
400+
uint32_t runNumber = 0;
401+
try {
402+
runNumber = strtoul(device.fConfig->GetProperty<std::string>("runNumber", "").c_str(), nullptr, 10);
403+
} catch (...) {
404+
}
400405
for (auto mi : unmatchedDescriptions) {
401406
auto& spec = routes[mi].matcher;
402407
missing += " " + DataSpecUtils::describe(spec);
@@ -412,6 +417,7 @@ void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::ve
412417
dh.dataDescription = concrete.description;
413418
dh.subSpecification = *subSpec;
414419
dh.payloadSize = 0;
420+
dh.runNumber = runNumber;
415421
dh.splitPayloadParts = 0;
416422
dh.splitPayloadIndex = 0;
417423
dh.payloadSerializationMethod = header::gSerializationMethodNone;
@@ -504,15 +510,16 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, DPL
504510
LOG(error) << "unexpected nullptr found. Skipping message pair.";
505511
continue;
506512
}
507-
const auto dh = o2::header::get<DataHeader*>(parts.At(msgidx)->GetData());
513+
auto* header = parts.At(msgidx)->GetData();
514+
const auto dh = o2::header::get<DataHeader*>(header);
508515
if (!dh) {
509516
LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataHeader missing";
510517
if (msgidx > 0) {
511518
--msgidx;
512519
}
513520
continue;
514521
}
515-
auto dph = o2::header::get<DataProcessingHeader*>(parts.At(msgidx)->GetData());
522+
auto dph = o2::header::get<DataProcessingHeader*>(header);
516523
if (!dph) {
517524
LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataProcessingHeader missing";
518525
continue;
@@ -527,7 +534,7 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, DPL
527534
timingInfo.runNumber = dh->runNumber;
528535
timingInfo.tfCounter = dh->tfCounter;
529536
LOG(debug) << msgidx << ": " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts << " payload " << parts.At(msgidx + 1)->GetSize();
530-
if (dh->runNumber == 0 || dh->tfCounter == 0 || (fmqRunNumber > 0 && fmqRunNumber != dh->runNumber)) {
537+
if (dh->runNumber == 0 || (dh->tfCounter == 0 && o2::header::get<SourceInfoHeader*>(header) == nullptr) || (fmqRunNumber > 0 && fmqRunNumber != dh->runNumber)) {
531538
LOG(error) << "INVALID runNumber / tfCounter: runNumber " << dh->runNumber
532539
<< ", tfCounter " << dh->tfCounter << ", FMQ runNumber " << fmqRunNumber
533540
<< " for msgidx " << msgidx << ": " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts << " payload " << parts.At(msgidx + 1)->GetSize();
@@ -623,6 +630,11 @@ InjectorFunction incrementalConverter(OutputSpec const& spec, o2::header::Serial
623630
auto timesliceId = std::make_shared<size_t>(startTime);
624631
return [timesliceId, spec, step, method](TimingInfo&, ServiceRegistryRef const& services, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool&) {
625632
auto* device = services.get<RawDeviceService>().device();
633+
uint32_t runNumber = 0;
634+
try {
635+
runNumber = strtoul(device->fConfig->GetProperty<std::string>("runNumber", "").c_str(), nullptr, 10);
636+
} catch (...) {
637+
}
626638
// We iterate on all the parts and we send them two by two,
627639
// adding the appropriate O2 header.
628640
for (int i = 0; i < parts.Size(); ++i) {
@@ -635,6 +647,7 @@ InjectorFunction incrementalConverter(OutputSpec const& spec, o2::header::Serial
635647
dh.dataDescription = matcher.description;
636648
dh.subSpecification = matcher.subSpec;
637649
dh.payloadSize = parts.At(i)->GetSize();
650+
dh.runNumber = runNumber;
638651

639652
DataProcessingHeader dph{newTimesliceId, 0};
640653
if (*timesliceId != newTimesliceId) {
@@ -977,11 +990,18 @@ DataProcessorSpec specifyFairMQDeviceOutputProxy(char const* name,
977990
if (channelName != outputChannelName) {
978991
continue;
979992
}
993+
994+
uint32_t runNumber = 0;
995+
try {
996+
runNumber = strtoul(device->fConfig->GetProperty<std::string>("runNumber", "").c_str(), nullptr, 10);
997+
} catch (...) {
998+
}
980999
DataHeader dh;
9811000
dh.dataOrigin = "DPL";
9821001
dh.dataDescription = "EOS";
9831002
dh.subSpecification = 0;
9841003
dh.payloadSize = 0;
1004+
dh.runNumber = runNumber;
9851005
dh.payloadSerializationMethod = o2::header::gSerializationMethodNone;
9861006
dh.tfCounter = 0;
9871007
dh.firstTForbit = 0;
@@ -1091,12 +1111,18 @@ DataProcessorSpec specifyFairMQDeviceMultiOutputProxy(char const* name,
10911111
if (!checkChannel(channelName)) {
10921112
continue;
10931113
}
1114+
uint32_t runNumber = 0;
1115+
try {
1116+
runNumber = strtoul(device->fConfig->GetProperty<std::string>("runNumber", "").c_str(), nullptr, 10);
1117+
} catch (...) {
1118+
}
10941119
DataHeader dh;
10951120
dh.dataOrigin = "DPL";
10961121
dh.dataDescription = "EOS";
10971122
dh.subSpecification = 0;
10981123
dh.payloadSize = 0;
10991124
dh.payloadSerializationMethod = o2::header::gSerializationMethodNone;
1125+
dh.runNumber = runNumber;
11001126
dh.tfCounter = 0;
11011127
dh.firstTForbit = 0;
11021128
SourceInfoHeader sih;

0 commit comments

Comments
 (0)