Skip to content

Commit ffe9981

Browse files
authored
DPL Analysis: fix for DYN propagation (#8705)
1 parent f0b95fe commit ffe9981

File tree

3 files changed

+48
-27
lines changed

3 files changed

+48
-27
lines changed

Framework/Core/include/Framework/AnalysisHelpers.h

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,15 +92,27 @@ struct WritingCursor<soa::Table<PC...>> {
9292
int64_t mCount = -1;
9393
};
9494

95-
template <typename T>
96-
struct Produces {
97-
static_assert(always_static_assert_v<T>, "Type must be a o2::soa::Table");
98-
};
99-
10095
/// This helper class allows you to declare things which will be created by a
10196
/// given analysis task. Notice how the actual cursor is implemented by the
10297
/// means of the WritingCursor helper class, from which produces actually
10398
/// derives.
99+
template <typename T>
100+
struct Produces : WritingCursor<typename soa::PackToTable<typename T::table_t::persistent_columns_t>::table> {
101+
using table_t = T;
102+
using metadata = typename aod::MetadataTrait<T>::metadata;
103+
104+
// @return the associated OutputSpec
105+
OutputSpec const spec()
106+
{
107+
return OutputSpec{OutputLabel{metadata::tableLabel()}, metadata::origin(), metadata::description()};
108+
}
109+
110+
OutputRef ref()
111+
{
112+
return OutputRef{metadata::tableLabel(), 0};
113+
}
114+
};
115+
104116
template <template <typename...> class T, typename... C>
105117
struct Produces<T<C...>> : WritingCursor<typename soa::PackToTable<typename T<C...>::table_t::persistent_columns_t>::table> {
106118
using table_t = soa::Table<C...>;

Framework/Core/src/ArrowSupport.cxx

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,9 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
419419
std::vector<InputSpec> requestedDYNs;
420420
std::vector<OutputSpec> providedDYNs;
421421

422+
auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
423+
auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
424+
422425
if (builder != workflow.end()) {
423426
// collect currently requested IDXs
424427
std::vector<InputSpec> requestedIDXs;
@@ -460,13 +463,21 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
460463
}
461464
}
462465
}
466+
std::sort(requestedDYNs.begin(), requestedDYNs.end(), inputSpecLessThan);
467+
std::sort(providedDYNs.begin(), providedDYNs.end(), outputSpecLessThan);
468+
std::vector<InputSpec> spawnerInputs;
469+
for (auto& input : requestedDYNs) {
470+
if (std::none_of(providedDYNs.begin(), providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
471+
spawnerInputs.emplace_back(input);
472+
}
473+
}
463474
// recreate inputs and outputs
464475
spawner->outputs.clear();
465476
spawner->inputs.clear();
466477
// replace AlgorithmSpec
467478
// FIXME: it should be made more generic, so it does not need replacement...
468-
spawner->algorithm = readers::AODReaderHelpers::aodSpawnerCallback(requestedDYNs);
469-
WorkflowHelpers::addMissingOutputsToSpawner(providedDYNs, requestedDYNs, requestedAODs, *spawner);
479+
spawner->algorithm = readers::AODReaderHelpers::aodSpawnerCallback(spawnerInputs);
480+
WorkflowHelpers::addMissingOutputsToSpawner({}, spawnerInputs, requestedAODs, *spawner);
470481
}
471482

472483
if (writer != workflow.end()) {
@@ -502,7 +513,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
502513
// ATTENTION: if there are dangling outputs the getGlobalAODSink
503514
// has to be created in any case!
504515
std::vector<InputSpec> outputsInputsAOD;
505-
auto isAOD = [](InputSpec const& spec) { return DataSpecUtils::partialMatch(spec, header::DataOrigin("AOD")); };
516+
auto isAOD = [](InputSpec const& spec) { return (DataSpecUtils::partialMatch(spec, header::DataOrigin("AOD")) || DataSpecUtils::partialMatch(spec, header::DataOrigin("DYN"))); };
506517

507518
for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
508519
if (isAOD(outputsInputs[ii])) {

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -390,24 +390,19 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
390390
break;
391391
}
392392
if (DataSpecUtils::partialMatch(input, header::DataOrigin{"AOD"})) {
393-
requestedAODs.emplace_back(input);
393+
DataSpecUtils::updateInputList(requestedAODs, InputSpec{input});
394394
}
395395
if (DataSpecUtils::partialMatch(input, header::DataOrigin{"DYN"})) {
396-
if (std::find_if(requestedDYNs.begin(), requestedDYNs.end(), [&](InputSpec const& spec) { return input.binding == spec.binding; }) == requestedDYNs.end()) {
397-
requestedDYNs.emplace_back(input);
398-
}
396+
DataSpecUtils::updateInputList(requestedDYNs, InputSpec{input});
399397
}
400398
if (DataSpecUtils::partialMatch(input, header::DataOrigin{"IDX"})) {
401-
if (std::find_if(requestedIDXs.begin(), requestedIDXs.end(), [&](InputSpec const& spec) { return input.binding == spec.binding; }) == requestedIDXs.end()) {
402-
requestedIDXs.emplace_back(input);
403-
}
399+
DataSpecUtils::updateInputList(requestedIDXs, InputSpec{input});
404400
}
405401
}
406402

407403
std::stable_sort(timer.outputs.begin(), timer.outputs.end(), [](OutputSpec const& a, OutputSpec const& b) { return *DataSpecUtils::getOptionalSubSpec(a) < *DataSpecUtils::getOptionalSubSpec(b); });
408404

409-
for (size_t oi = 0; oi < processor.outputs.size(); ++oi) {
410-
auto& output = processor.outputs[oi];
405+
for (auto& output : processor.outputs) {
411406
if (DataSpecUtils::partialMatch(output, header::DataOrigin{"AOD"})) {
412407
providedAODs.emplace_back(output);
413408
} else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"DYN"})) {
@@ -426,20 +421,23 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
426421
}
427422
}
428423
}
429-
auto sortingEquals = [](InputSpec const& a, InputSpec const& b) { return DataSpecUtils::describe(a) == DataSpecUtils::describe(b); };
430-
std::sort(requestedDYNs.begin(), requestedDYNs.end(), sortingEquals);
431-
auto last = std::unique(requestedDYNs.begin(), requestedDYNs.end());
432-
requestedDYNs.erase(last, requestedDYNs.end());
433424

434-
std::sort(requestedIDXs.begin(), requestedIDXs.end(), sortingEquals);
435-
last = std::unique(requestedIDXs.begin(), requestedIDXs.end());
436-
requestedIDXs.erase(last, requestedIDXs.end());
425+
auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
426+
auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
427+
std::sort(requestedDYNs.begin(), requestedDYNs.end(), inputSpecLessThan);
428+
std::sort(providedDYNs.begin(), providedDYNs.end(), outputSpecLessThan);
429+
std::vector<InputSpec> spawnerInputs;
430+
for (auto& input : requestedDYNs) {
431+
if (std::none_of(providedDYNs.begin(), providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
432+
spawnerInputs.emplace_back(input);
433+
}
434+
}
437435

438436
DataProcessorSpec aodSpawner{
439437
"internal-dpl-aod-spawner",
440438
{},
441439
{},
442-
readers::AODReaderHelpers::aodSpawnerCallback(requestedDYNs),
440+
readers::AODReaderHelpers::aodSpawnerCallback(spawnerInputs),
443441
{}};
444442

445443
DataProcessorSpec indexBuilder{
@@ -450,7 +448,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
450448
{}};
451449

452450
addMissingOutputsToBuilder(requestedIDXs, requestedAODs, requestedDYNs, indexBuilder);
453-
addMissingOutputsToSpawner(providedDYNs, requestedDYNs, requestedAODs, aodSpawner);
451+
addMissingOutputsToSpawner({}, spawnerInputs, requestedAODs, aodSpawner);
454452

455453
addMissingOutputsToReader(providedAODs, requestedAODs, aodReader);
456454
addMissingOutputsToReader(providedCCDBs, requestedCCDBs, ccdbBackend);
@@ -605,7 +603,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
605603
// ATTENTION: if there are dangling outputs the getGlobalAODSink
606604
// has to be created in any case!
607605
std::vector<InputSpec> outputsInputsAOD;
608-
auto isAOD = [](InputSpec const& spec) { return DataSpecUtils::partialMatch(spec, header::DataOrigin("AOD")); };
606+
auto isAOD = [](InputSpec const& spec) { return (DataSpecUtils::partialMatch(spec, header::DataOrigin("AOD")) || DataSpecUtils::partialMatch(spec, header::DataOrigin("DYN"))); };
609607
for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
610608
if (isAOD(outputsInputs[ii])) {
611609
auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);

0 commit comments

Comments
 (0)