Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions Framework/Core/src/ExternalFairMQDeviceProxy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,11 @@ void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::ve
}
std::string missing = "";
bool showAlarm = false;
uint32_t runNumber = 0;
try {
runNumber = strtoul(device.fConfig->GetProperty<std::string>("runNumber", "").c_str(), nullptr, 10);
} catch (...) {
}
for (auto mi : unmatchedDescriptions) {
auto& spec = routes[mi].matcher;
missing += " " + DataSpecUtils::describe(spec);
Expand All @@ -412,6 +417,7 @@ void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::ve
dh.dataDescription = concrete.description;
dh.subSpecification = *subSpec;
dh.payloadSize = 0;
dh.runNumber = runNumber;
dh.splitPayloadParts = 0;
dh.splitPayloadIndex = 0;
dh.payloadSerializationMethod = header::gSerializationMethodNone;
Expand Down Expand Up @@ -504,15 +510,16 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, DPL
LOG(error) << "unexpected nullptr found. Skipping message pair.";
continue;
}
const auto dh = o2::header::get<DataHeader*>(parts.At(msgidx)->GetData());
auto* header = parts.At(msgidx)->GetData();
const auto dh = o2::header::get<DataHeader*>(header);
if (!dh) {
LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataHeader missing";
if (msgidx > 0) {
--msgidx;
}
continue;
}
auto dph = o2::header::get<DataProcessingHeader*>(parts.At(msgidx)->GetData());
auto dph = o2::header::get<DataProcessingHeader*>(header);
if (!dph) {
LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataProcessingHeader missing";
continue;
Expand All @@ -527,7 +534,7 @@ InjectorFunction dplModelAdaptor(std::vector<OutputSpec> const& filterSpecs, DPL
timingInfo.runNumber = dh->runNumber;
timingInfo.tfCounter = dh->tfCounter;
LOG(debug) << msgidx << ": " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts << " payload " << parts.At(msgidx + 1)->GetSize();
if (dh->runNumber == 0 || dh->tfCounter == 0 || (fmqRunNumber > 0 && fmqRunNumber != dh->runNumber)) {
if (dh->runNumber == 0 || (dh->tfCounter == 0 && o2::header::get<SourceInfoHeader*>(header) == nullptr) || (fmqRunNumber > 0 && fmqRunNumber != dh->runNumber)) {
LOG(error) << "INVALID runNumber / tfCounter: runNumber " << dh->runNumber
<< ", tfCounter " << dh->tfCounter << ", FMQ runNumber " << fmqRunNumber
<< " for msgidx " << msgidx << ": " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification}) << " part " << dh->splitPayloadIndex << " of " << dh->splitPayloadParts << " payload " << parts.At(msgidx + 1)->GetSize();
Expand Down Expand Up @@ -623,6 +630,11 @@ InjectorFunction incrementalConverter(OutputSpec const& spec, o2::header::Serial
auto timesliceId = std::make_shared<size_t>(startTime);
return [timesliceId, spec, step, method](TimingInfo&, ServiceRegistryRef const& services, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool&) {
auto* device = services.get<RawDeviceService>().device();
uint32_t runNumber = 0;
try {
runNumber = strtoul(device->fConfig->GetProperty<std::string>("runNumber", "").c_str(), nullptr, 10);
} catch (...) {
}
// We iterate on all the parts and we send them two by two,
// adding the appropriate O2 header.
for (int i = 0; i < parts.Size(); ++i) {
Expand All @@ -635,6 +647,7 @@ InjectorFunction incrementalConverter(OutputSpec const& spec, o2::header::Serial
dh.dataDescription = matcher.description;
dh.subSpecification = matcher.subSpec;
dh.payloadSize = parts.At(i)->GetSize();
dh.runNumber = runNumber;

DataProcessingHeader dph{newTimesliceId, 0};
if (*timesliceId != newTimesliceId) {
Expand Down Expand Up @@ -977,11 +990,18 @@ DataProcessorSpec specifyFairMQDeviceOutputProxy(char const* name,
if (channelName != outputChannelName) {
continue;
}

uint32_t runNumber = 0;
try {
runNumber = strtoul(device->fConfig->GetProperty<std::string>("runNumber", "").c_str(), nullptr, 10);
} catch (...) {
}
DataHeader dh;
dh.dataOrigin = "DPL";
dh.dataDescription = "EOS";
dh.subSpecification = 0;
dh.payloadSize = 0;
dh.runNumber = runNumber;
dh.payloadSerializationMethod = o2::header::gSerializationMethodNone;
dh.tfCounter = 0;
dh.firstTForbit = 0;
Expand Down Expand Up @@ -1091,12 +1111,18 @@ DataProcessorSpec specifyFairMQDeviceMultiOutputProxy(char const* name,
if (!checkChannel(channelName)) {
continue;
}
uint32_t runNumber = 0;
try {
runNumber = strtoul(device->fConfig->GetProperty<std::string>("runNumber", "").c_str(), nullptr, 10);
} catch (...) {
}
DataHeader dh;
dh.dataOrigin = "DPL";
dh.dataDescription = "EOS";
dh.subSpecification = 0;
dh.payloadSize = 0;
dh.payloadSerializationMethod = o2::header::gSerializationMethodNone;
dh.runNumber = runNumber;
dh.tfCounter = 0;
dh.firstTForbit = 0;
SourceInfoHeader sih;
Expand Down