Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 56 additions & 25 deletions Framework/AnalysisSupport/src/AODWriterHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@
#include "Framework/TableConsumer.h"
#include "Framework/DataOutputDirector.h"
#include "Framework/TableTreeHelpers.h"
#include "Framework/Signpost.h"

#include <TFile.h>
#include <TFile.h>
#include <TTree.h>
#include <TFile.h>
#include <TMap.h>
#include <TObjString.h>
#include <arrow/table.h>

O2_DECLARE_DYNAMIC_LOG(histograms);
O2_DECLARE_DYNAMIC_LOG(derived_data);

namespace o2::framework::writers
{

Expand Down Expand Up @@ -102,12 +106,13 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)

// this functor is called once per time frame
return [dod, tfNumbers, tfFilenames, aodMetaDataKeys, aodMetaDataVals, compressionLevel](ProcessingContext& pc) mutable -> void {
LOGP(debug, "======== getGlobalAODSink::processing ==========");
LOGP(debug, " processing data set with {} entries", pc.inputs().size());
O2_SIGNPOST_ID_GENERATE(hid, histograms);
O2_SIGNPOST_START(derived_data, hid, "getGlobalAODSink", "Processing dataset with %zu entries.", pc.inputs().size());

// return immediately if pc.inputs() is empty. This should never happen!
if (pc.inputs().size() == 0) {
LOGP(info, "No inputs available!");
O2_SIGNPOST_EVENT_EMIT(derived_data, hid, "getGlobalAODSink", "Processing dataset with %zu entries.", pc.inputs().size());
O2_SIGNPOST_END(derived_data, hid, "getGlobalAODSink", "Done processing.");
return;
}

Expand Down Expand Up @@ -135,7 +140,7 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
// loop over the DataRefs which are contained in pc.inputs()
for (const auto& ref : pc.inputs()) {
if (!ref.spec) {
LOGP(debug, "Invalid input will be skipped!");
O2_SIGNPOST_EVENT_EMIT_ERROR(derived_data, hid, "getGlobalAODSink", "Invalid input will be skipped!");
continue;
}

Expand Down Expand Up @@ -178,17 +183,18 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
// get the TableConsumer and corresponding arrow table
auto msg = pc.inputs().get(ref.spec->binding);
if (msg.header == nullptr) {
LOGP(error, "No header for message {}:{}", ref.spec->binding, DataSpecUtils::describe(*ref.spec));
O2_SIGNPOST_EVENT_EMIT_ERROR(derived_data, hid, "getGlobalAODSink", "No header for message %{public}s:%{public}s",
ref.spec->binding.c_str(), DataSpecUtils::describe(*ref.spec).c_str());
continue;
}
auto s = pc.inputs().get<TableConsumer>(ref.spec->binding);
auto table = s->asArrowTable();
if (!table->Validate().ok()) {
LOGP(warning, "The table \"{}\" is not valid and will not be saved!", tableName);
O2_SIGNPOST_EVENT_EMIT_WARN(derived_data, hid, "getGlobalAODSink", "The table \"%{public}s\" is not valid and will not be saved!", tableName.c_str());
continue;
}
if (table->schema()->fields().empty()) {
LOGP(debug, "The table \"{}\" is empty but will be saved anyway!", tableName);
O2_SIGNPOST_EVENT_EMIT(derived_data, hid, "getGlobalAODSink", "The table \"%{public}s\" is empty but will be saved anyway!", tableName.c_str());
}

// loop over all DataOutputDescriptors
Expand All @@ -203,7 +209,8 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)

// update metadata
if (fileAndFolder.file->FindObjectAny("metaData")) {
LOGF(debug, "Metadata: target file %s already has metadata, preserving it", fileAndFolder.file->GetName());
O2_SIGNPOST_EVENT_EMIT(derived_data, hid, "getGlobalAODSink", "Metadata: target file %{public}s already has metadata, preserving it",
fileAndFolder.file->GetName());
} else if (!aodMetaDataKeys.empty() && !aodMetaDataVals.empty()) {
TMap aodMetaDataMap;
for (uint32_t imd = 0; imd < aodMetaDataKeys.size(); imd++) {
Expand All @@ -227,6 +234,7 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
ta2tr.process();
}
}
O2_SIGNPOST_END(derived_data, hid, "getGlobalAODSink", "Done processing.");
};
}

Expand All @@ -252,9 +260,11 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
static std::string currentFile = "";

auto endofdatacb = [inputObjects](EndOfStreamContext& context) {
LOG(debug) << "Writing merged objects and histograms to file";
O2_SIGNPOST_ID_GENERATE(hid, histograms);
O2_SIGNPOST_START(histograms, hid, "getOutputObjHistWriter", "Writing merged objects and histograms to file");
if (inputObjects->empty()) {
LOG(error) << "Output object map is empty!";
O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "Output object map is empty!");
O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Writing completed with error.");
context.services().get<ControlService>().readyToQuit(QuitRequest::Me);
return;
}
Expand All @@ -263,30 +273,39 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
f[i]->Close();
}
}
LOG(debug) << "All outputs merged in their respective target files";
O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Writing completed correctly.");
context.services().get<ControlService>().readyToQuit(QuitRequest::Me);
};

callbacks.set<CallbackService::Id::EndOfStream>(endofdatacb);
return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void {
O2_SIGNPOST_ID_GENERATE(hid, histograms);
O2_SIGNPOST_START(histograms, hid, "getOutputObjHistWriter", "Processing dataset with %zu entries.", pc.inputs().size());
auto const& ref = pc.inputs().get("x");
if (!ref.header) {
LOG(error) << "Header not found";
return;
}
if (!ref.payload) {
LOG(error) << "Payload not found";
O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "Header not found.");
O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error.");
return;
}
auto datah = o2::header::get<o2::header::DataHeader*>(ref.header);
if (!datah) {
LOG(error) << "No data header in stack";
O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "No data header in stack.");
O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error.");
return;
}

if (!ref.payload) {
O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "Payload not found for %{public}s.",
datah->dataDescription.as<std::string>().c_str());
O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error.");
return;
}

auto objh = o2::header::get<o2::framework::OutputObjHeader*>(ref.header);
if (!objh) {
LOG(error) << "No output object header in stack";
O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "No output object in stack %{public}s.",
datah->dataDescription.as<std::string>().c_str());
O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error.");
return;
}

Expand All @@ -297,7 +316,8 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
tm.SetBufferOffset(0);
tm.ResetMap();
if (obj.kind == nullptr) {
LOG(error) << "Cannot read class info from buffer.";
O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "Cannot read class info from buffer.");
O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error.");
return;
}

Expand All @@ -310,18 +330,23 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
obj.name = named->GetName();
auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; });
if (hpos == tskmap.end()) {
LOG(error) << "No task found for hash " << hash;
O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "No task found for hash %d", hash);
O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error.");
return;
}
auto taskname = hpos->name;
auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; });
if (opos == objmap.end()) {
LOG(error) << "No object list found for task " << taskname << " (hash=" << hash << ")";
O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "No object list found for task %{public}s (hash=%d)",
taskname.c_str(), hash);
O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error.");
return;
}
auto objects = opos->bindings;
if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) {
LOG(error) << "No object " << obj.name << " in map for task " << taskname;
O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "No object %{public}s in map for task %{public}s",
obj.name.c_str(), taskname.c_str());
O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error.");
return;
}
auto nameHash = runtime_hash(obj.name.c_str());
Expand All @@ -330,14 +355,16 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
// If it's the first one, we just add it to the list.
if (existing == inputObjects->end()) {
obj.count = objh->mPipelineSize;
inputObjects->push_back(std::make_pair(key, obj));
inputObjects->emplace_back(key, obj);
existing = inputObjects->end() - 1;
} else {
obj.count = existing->second.count;
// Otherwise, we merge it with the existing one.
auto merger = existing->second.kind->GetMerge();
if (!merger) {
LOG(error) << "Already one unmergeable object found for " << obj.name;
O2_SIGNPOST_EVENT_EMIT_ERROR(histograms, hid, "getOutputObjHistWriter", "Already one unmergeable object found for %{public}s",
obj.name.c_str());
O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Processing completed with error.");
return;
}
TList coll;
Expand All @@ -349,13 +376,16 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
existing->second.count -= 1;

if (existing->second.count != 0) {
O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Still waiting for %d histograms to arrive.", existing->second.count);
return;
}
O2_SIGNPOST_EVENT_EMIT(histograms, hid, "getOutputObjHistWriter", "All histogramsa are there. Writing to disk.");
// Write the object here.
auto route = existing->first;
auto entry = existing->second;
auto file = ROOTfileNames.find(route.policy);
if (file == ROOTfileNames.end()) {
O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Could not find where to write object.");
return;
}
auto filename = file->second;
Expand Down Expand Up @@ -408,6 +438,7 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
delete (TObject*)entry.obj;
entry.obj = nullptr;
}
O2_SIGNPOST_END(histograms, hid, "getOutputObjHistWriter", "Done processing histogram.");
};
}};
}
Expand Down