Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions Framework/Core/src/ArrowSupport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -420,24 +420,26 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
auto builder = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-index-builder"; });
auto reader = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-reader"; });
auto writer = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-writer"; });
std::vector<InputSpec> requestedAODs;
std::vector<InputSpec> requestedDYNs;
std::vector<OutputSpec> providedDYNs;
auto &ac = ctx.services().get<AnalysisContext>();
ac.requestedAODs.clear();
ac.requestedDYNs.clear();
ac.providedDYNs.clear();


auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };

if (builder != workflow.end()) {
// collect currently requested IDXs
std::vector<InputSpec> requestedIDXs;
ac.requestedIDXs.clear();
for (auto& d : workflow) {
if (d.name == builder->name) {
continue;
}
for (auto& i : d.inputs) {
if (DataSpecUtils::partialMatch(i, header::DataOrigin{"IDX"})) {
auto copy = i;
DataSpecUtils::updateInputList(requestedIDXs, std::move(copy));
DataSpecUtils::updateInputList(ac.requestedIDXs, std::move(copy));
}
}
}
Expand All @@ -446,8 +448,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
builder->outputs.clear();
// replace AlgorithmSpec
// FIXME: it should be made more generic, so it does not need replacement...
builder->algorithm = readers::AODReaderHelpers::indexBuilderCallback(requestedIDXs);
AnalysisSupportHelpers::addMissingOutputsToBuilder(requestedIDXs, requestedAODs, requestedDYNs, *builder);
builder->algorithm = readers::AODReaderHelpers::indexBuilderCallback(ac.requestedIDXs);
AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, *builder);
}

if (spawner != workflow.end()) {
Expand All @@ -459,20 +461,20 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
for (auto const& i : d.inputs) {
if (DataSpecUtils::partialMatch(i, header::DataOrigin{"DYN"})) {
auto copy = i;
DataSpecUtils::updateInputList(requestedDYNs, std::move(copy));
DataSpecUtils::updateInputList(ac.requestedDYNs, std::move(copy));
}
}
for (auto const& o : d.outputs) {
if (DataSpecUtils::partialMatch(o, header::DataOrigin{"DYN"})) {
providedDYNs.emplace_back(o);
ac.providedDYNs.emplace_back(o);
}
}
}
std::sort(requestedDYNs.begin(), requestedDYNs.end(), inputSpecLessThan);
std::sort(providedDYNs.begin(), providedDYNs.end(), outputSpecLessThan);
std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
std::vector<InputSpec> spawnerInputs;
for (auto& input : requestedDYNs) {
if (std::none_of(providedDYNs.begin(), providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
for (auto& input : ac.requestedDYNs) {
if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
spawnerInputs.emplace_back(input);
}
}
Expand All @@ -482,7 +484,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
// replace AlgorithmSpec
// FIXME: it should be made more generic, so it does not need replacement...
spawner->algorithm = readers::AODReaderHelpers::aodSpawnerCallback(spawnerInputs);
AnalysisSupportHelpers::addMissingOutputsToSpawner({}, spawnerInputs, requestedAODs, *spawner);
AnalysisSupportHelpers::addMissingOutputsToSpawner({}, spawnerInputs, ac.requestedAODs, *spawner);
}

if (writer != workflow.end()) {
Expand All @@ -496,14 +498,14 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
for (auto const& i : d.inputs) {
if (DataSpecUtils::partialMatch(i, AODOrigins)) {
auto copy = i;
DataSpecUtils::updateInputList(requestedAODs, std::move(copy));
DataSpecUtils::updateInputList(ac.requestedAODs, std::move(copy));
}
}
}

// remove unmatched outputs
auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](OutputSpec const& o) {
return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(requestedAODs.begin(), requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); });
return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(ac.requestedAODs.begin(), ac.requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); });
});
reader->outputs.erase(o_end, reader->outputs.end());
if (reader->outputs.empty()) {
Expand All @@ -521,22 +523,22 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
// select outputs of type AOD which need to be saved
// ATTENTION: if there are dangling outputs the getGlobalAODSink
// has to be created in any case!
std::vector<InputSpec> outputsInputsAOD;
ac.outputsInputsAOD.clear();

for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) {
auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
if (!ds.empty() || isDangling[ii]) {
outputsInputsAOD.emplace_back(outputsInputs[ii]);
ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
}
}
}

// file sink for any AOD output
if (!outputsInputsAOD.empty()) {
if (!ac.outputsInputsAOD.empty()) {
// add TFNumber and TFFilename as input to the writer
outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber");
outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename");
ac.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber");
ac.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename");
workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx));
}
// Move the dummy sink at the end, if needed
Expand Down
Loading