Skip to content

Commit 9a1ce0e

Browse files
committed
DPL: Add option to raw-proxy to print input sizes per spec
1 parent b752def commit 9a1ce0e

File tree

3 files changed

+62
-16
lines changed

3 files changed

+62
-16
lines changed

Framework/Core/include/Framework/ExternalFairMQDeviceProxy.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* label,
106106
InjectorFunction converter,
107107
uint64_t minSHM = 0,
108108
bool sendTFcounter = false,
109-
bool doInjectMissingData = false
110-
);
109+
bool doInjectMissingData = false,
110+
unsigned int doPrintSizes = 0);
111111

112112
DataProcessorSpec specifyFairMQDeviceOutputProxy(char const* label,
113113
Inputs const& inputSpecs,

Framework/Core/src/ExternalFairMQDeviceProxy.cxx

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -233,12 +233,19 @@ auto getFinalIndex(DataHeader const& dh, size_t msgidx) -> size_t
233233
return finalBlockIndex;
234234
};
235235

236-
void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::vector<OutputRoute> const& routes)
236+
void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::vector<OutputRoute> const& routes, bool doInjectMissingData, unsigned int doPrintSizes)
237237
{
238238
// Check for missing data.
239239
static std::vector<bool> present;
240+
static std::vector<size_t> dataSizes;
241+
static std::vector<bool> showSize;
240242
present.clear();
241243
present.resize(routes.size(), false);
244+
dataSizes.clear();
245+
dataSizes.resize(routes.size(), 0);
246+
showSize.clear();
247+
showSize.resize(routes.size(), false);
248+
242249
static std::vector<size_t> unmatchedDescriptions;
243250
unmatchedDescriptions.clear();
244251
DataProcessingHeader const* dph = nullptr;
@@ -261,6 +268,7 @@ void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::ve
261268
size_t foundDataSpecs = 0;
262269
for (int msgidx = 0; msgidx < parts.Size(); msgidx += 2) {
263270
bool allFound = true;
271+
int addToSize = -1;
264272
const auto dh = o2::header::get<DataHeader*>(parts.At(msgidx)->GetData());
265273
auto const sih = o2::header::get<SourceInfoHeader*>(parts.At(msgidx)->GetData());
266274
if (sih != nullptr) {
@@ -284,9 +292,12 @@ void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::ve
284292
}
285293
if (firstDH == nullptr) {
286294
firstDH = dh;
295+
if (doPrintSizes && firstDH->tfCounter % doPrintSizes != 0) {
296+
doPrintSizes = 0;
297+
}
287298
}
288299
for (size_t pi = 0; pi < present.size(); ++pi) {
289-
if (present[pi]) {
300+
if (present[pi] && !doPrintSizes) {
290301
continue;
291302
}
292303
// Consider uninvolved pipelines as present.
@@ -298,24 +309,53 @@ void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::ve
298309
auto& spec = routes[pi].matcher;
299310
OutputSpec query{dh->dataOrigin, dh->dataDescription, dh->subSpecification};
300311
if (DataSpecUtils::match(spec, query)) {
301-
present[pi] = true;
302-
++foundDataSpecs;
312+
if (!present[pi]) {
313+
++foundDataSpecs;
314+
present[pi] = true;
315+
showSize[pi] = true;
316+
}
317+
addToSize = pi;
303318
break;
304319
}
305320
}
306-
// Skip the rest of the block of messages. We subtract 2 because above
307-
// we increment by 2.
308-
msgidx = getFinalIndex(*dh, msgidx) - 2;
309-
if (allFound) {
321+
int msgidxLast = getFinalIndex(*dh, msgidx);
322+
if (addToSize >= 0) {
323+
int increment = (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) ? 1 : 2;
324+
for (int msgidx2 = msgidx + 1; msgidx2 < msgidxLast; msgidx2 += increment) {
325+
dataSizes[addToSize] += parts.At(msgidx2)->GetSize();
326+
}
327+
}
328+
// Skip the rest of the block of messages. We subtract 2 because above we increment by 2.
329+
msgidx = msgidxLast - 2;
330+
if (allFound && !doPrintSizes) {
310331
return;
311332
}
312333
}
334+
313335
for (size_t pi = 0; pi < present.size(); ++pi) {
314336
if (!present[pi]) {
337+
showSize[pi] = true;
315338
unmatchedDescriptions.push_back(pi);
316339
}
317340
}
318341

342+
if (firstDH && doPrintSizes) {
343+
std::string sizes = "";
344+
size_t totalSize = 0;
345+
for (size_t pi = 0; pi < present.size(); ++pi) {
346+
if (showSize[pi]) {
347+
totalSize += dataSizes[pi];
348+
auto& spec = routes[pi].matcher;
349+
sizes += DataSpecUtils::describe(spec) + fmt::format(":{} ", fmt::group_digits(dataSizes[pi]));
350+
}
351+
}
352+
LOGP(important, "RAW {} size report:{}- Total:{}", firstDH->tfCounter, sizes, fmt::group_digits(totalSize));
353+
}
354+
355+
if (!doInjectMissingData) {
356+
return;
357+
}
358+
319359
if (unmatchedDescriptions.size() > 0) {
320360
if (hassih) {
321361
if (firstDH) {
@@ -363,7 +403,7 @@ void injectMissingData(fair::mq::Device& device, fair::mq::Parts& parts, std::ve
363403
static int maxWarn = 10; // Correct would be o2::conf::VerbosityConfig::Instance().maxWarnDeadBeef, but Framework does not depend on CommonUtils..., but not so critical since receives will send correct number of DEADBEEF messages
364404
static int contDeadBeef = 0;
365405
if (++contDeadBeef <= maxWarn) {
366-
LOGP(alarm, "Found {}/{} data specs, missing data specs: {}, injecting 0xDEADBEEF", foundDataSpecs, expectedDataSpecs, missing);
406+
LOGP(alarm, "Found {}/{} data specs, missing data specs: {}, injecting 0xDEADBEEF{}", foundDataSpecs, expectedDataSpecs, missing, contDeadBeef == maxWarn ? " - disabling alarm now to stop flooding the log" : "");
367407
}
368408
}
369409
}
@@ -573,7 +613,8 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
573613
InjectorFunction converter,
574614
uint64_t minSHM,
575615
bool sendTFcounter,
576-
bool doInjectMissingData)
616+
bool doInjectMissingData,
617+
unsigned int doPrintSizes)
577618
{
578619
DataProcessorSpec spec;
579620
spec.name = strdup(name);
@@ -585,7 +626,7 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
585626
// The Init method will register a new "Out of band" channel and
586627
// attach an OnData to it which is responsible for converting incoming
587628
// messages into DPL messages.
588-
spec.algorithm = AlgorithmSpec{[converter, minSHM, deviceName = spec.name, sendTFcounter, doInjectMissingData](InitContext& ctx) {
629+
spec.algorithm = AlgorithmSpec{[converter, minSHM, deviceName = spec.name, sendTFcounter, doInjectMissingData, doPrintSizes](InitContext& ctx) {
589630
auto* device = ctx.services().get<RawDeviceService>().device();
590631
// make a copy of the output routes and pass to the lambda by move
591632
auto outputRoutes = ctx.services().get<RawDeviceService>().spec().outputs;
@@ -709,7 +750,7 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
709750
return count;
710751
};
711752

712-
auto dataHandler = [device, converter, doInjectMissingData,
753+
auto dataHandler = [device, converter, doInjectMissingData, doPrintSizes,
713754
outputRoutes = std::move(outputRoutes),
714755
control = &ctx.services().get<ControlService>(),
715756
deviceState = &ctx.services().get<DeviceState>(),
@@ -740,7 +781,7 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
740781
// For reference, the oldest possible timeframe passed as newTimesliceId here comes from LifetimeHelpers::enumDrivenCreation()
741782
bool shouldstop = false;
742783
if (doInjectMissingData) {
743-
injectMissingData(*device, inputs, outputRoutes);
784+
injectMissingData(*device, inputs, outputRoutes, doInjectMissingData, doPrintSizes);
744785
}
745786
converter(timingInfo, *device, inputs, channelRetriever, timesliceIndex->getOldestPossibleOutput().timeslice.value, shouldstop);
746787

Framework/Utils/src/raw-proxy.cxx

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
3535
ConfigParamSpec{
3636
"inject-missing-data", VariantType::Bool, false, {"inject missing data according to dataspec if not found in the input"}});
3737

38+
workflowOptions.push_back(
39+
ConfigParamSpec{
40+
"print-input-sizes", VariantType::Int, 0, {"print statistics about sizes per input spec every n TFs"}});
41+
3842
workflowOptions.push_back(
3943
ConfigParamSpec{
4044
"throwOnUnmatched", VariantType::Bool, false, {"throw if unmatched input data is found"}});
@@ -51,6 +55,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config)
5155
std::string processorName = config.options().get<std::string>("proxy-name");
5256
std::string outputconfig = config.options().get<std::string>("dataspec");
5357
bool injectMissingData = config.options().get<bool>("inject-missing-data");
58+
unsigned int printSizes = config.options().get<unsigned int>("print-input-sizes");
5459
bool throwOnUnmatched = config.options().get<bool>("throwOnUnmatched");
5560
uint64_t minSHM = std::stoul(config.options().get<std::string>("timeframes-shm-limit"));
5661
std::vector<InputSpec> matchers = select(outputconfig.c_str());
@@ -65,7 +70,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config)
6570
processorName.c_str(),
6671
std::move(readoutProxyOutput),
6772
"type=pair,method=connect,address=ipc:///tmp/readout-pipe-0,rateLogging=1,transport=shmem",
68-
dplModelAdaptor(filterSpecs, throwOnUnmatched), minSHM, false, injectMissingData);
73+
dplModelAdaptor(filterSpecs, throwOnUnmatched), minSHM, false, injectMissingData, printSizes);
6974

7075
WorkflowSpec workflow;
7176
workflow.emplace_back(readoutProxy);

0 commit comments

Comments
 (0)