Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
// or submit itself to any jurisdiction.

#include "AODJAlienReaderHelpers.h"
#include <memory>
#include "Framework/TableTreeHelpers.h"
#include "Framework/AnalysisHelpers.h"
#include "Framework/DataProcessingStats.h"
#include "Framework/RootTableBuilderHelpers.h"
#include "Framework/RootArrowFilesystem.h"
#include "Framework/AlgorithmSpec.h"
#include "Framework/ConfigParamRegistry.h"
#include "Framework/ControlService.h"
Expand Down Expand Up @@ -41,6 +43,8 @@
#include <arrow/io/interfaces.h>
#include <arrow/table.h>
#include <arrow/util/key_value_metadata.h>
#include <arrow/dataset/dataset.h>
#include <arrow/dataset/file_base.h>

using namespace o2;
using namespace o2::aod;
Expand Down Expand Up @@ -272,11 +276,13 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
// Origin file name for derived output map
auto o2 = Output(TFFileNameHeader);
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
std::string currentFilename(fileAndFolder.file->GetName());
if (strcmp(fileAndFolder.file->GetEndpointUrl()->GetProtocol(), "file") == 0 && fileAndFolder.file->GetEndpointUrl()->GetFile()[0] != '/') {
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(fileAndFolder.filesystem());
auto* f = dynamic_cast<TFile*>(rootFS->GetFile());
std::string currentFilename(f->GetFile()->GetName());
if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') {
// This is not an absolute local path. Make it absolute.
static std::string pwd = gSystem->pwd() + std::string("/");
currentFilename = pwd + std::string(fileAndFolder.file->GetName());
currentFilename = pwd + std::string(f->GetName());
}
outputs.make<std::string>(o2) = currentFilename;
}
Expand Down Expand Up @@ -312,7 +318,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher);
auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
if (!fileAndFolder.file) {

// In case the filesource is empty, move to the next one.
if (fileAndFolder.path().empty()) {
fcnt += 1;
ntf = 0;
if (didir->atEnd(fcnt)) {
Expand Down
184 changes: 120 additions & 64 deletions Framework/AnalysisSupport/src/DataInputDirector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "DataInputDirector.h"
#include "Framework/DataDescriptorQueryBuilder.h"
#include "Framework/Logger.h"
#include "Framework/PluginManager.h"
#include "Framework/RootArrowFilesystem.h"
#include "Framework/AnalysisDataModelHelpers.h"
#include "Framework/Output.h"
#include "Headers/DataHeader.h"
Expand All @@ -26,8 +28,12 @@
#include "TGrid.h"
#include "TObjString.h"
#include "TMap.h"
#include "TFile.h"

#include <arrow/dataset/file_base.h>
#include <arrow/dataset/dataset.h>
#include <uv.h>
#include <memory>

#if __has_include(<TJAlienFile.h>)
#include <TJAlienFile.h>
Expand All @@ -47,12 +53,27 @@ FileNameHolder* makeFileNameHolder(std::string fileName)
return fileNameHolder;
}

DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mAlienSupport(alienSupport),
mMonitoring(monitoring),
mAllowedParentLevel(allowedParentLevel),
mParentFileReplacement(std::move(parentFileReplacement)),
mLevel(level)
DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement)
: mAlienSupport(alienSupport),
mMonitoring(monitoring),
mAllowedParentLevel(allowedParentLevel),
mParentFileReplacement(std::move(parentFileReplacement)),
mLevel(level)
{
std::vector<char const*> capabilitiesSpecs = {
"O2Framework:RNTupleObjectReadingCapability",
"O2Framework:TTreeObjectReadingCapability",
};

std::vector<LoadablePlugin> plugins;
for (auto spec : capabilitiesSpecs) {
auto morePlugins = PluginManager::parsePluginSpecString(spec);
for (auto& extra : morePlugins) {
plugins.push_back(extra);
}
}

PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, mFactory.capabilities);
}

void DataInputDescriptor::printOut()
Expand Down Expand Up @@ -108,20 +129,22 @@ bool DataInputDescriptor::setFile(int counter)

// open file
auto filename = mfilenames[counter]->fileName;
if (mcurrentFile) {
if (mcurrentFile->GetName() == filename) {
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
if (rootFS.get()) {
if (rootFS->GetFile()->GetName() == filename) {
return true;
}
closeInputFile();
}
mcurrentFile = TFile::Open(filename.c_str());
if (!mcurrentFile) {

mCurrentFilesystem = std::make_shared<TFileFileSystem>(TFile::Open(filename.c_str()), 50 * 1024 * 1024, mFactory);
if (!mCurrentFilesystem.get()) {
throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename));
}
mcurrentFile->SetReadaheadSize(50 * 1024 * 1024);
rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);

// get the parent file map if exists
mParentFileMap = (TMap*)mcurrentFile->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
mParentFileMap = (TMap*)rootFS->GetFile()->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
if (mParentFileMap && !mParentFileReplacement.empty()) {
auto pos = mParentFileReplacement.find(';');
if (pos == std::string::npos) {
Expand All @@ -140,16 +163,28 @@ bool DataInputDescriptor::setFile(int counter)

// get the directory names
if (mfilenames[counter]->numberOfTimeFrames <= 0) {
std::regex TFRegex = std::regex("DF_[0-9]+");
TList* keyList = mcurrentFile->GetListOfKeys();
const std::regex TFRegex = std::regex("/?DF_([0-9]+)(|-.*)$");
TList* keyList = rootFS->GetFile()->GetListOfKeys();
std::vector<std::string> finalList;

// extract TF numbers and sort accordingly
// We use an extra seen set to make sure we preserve the order in which
// we instert things in the final list and to make sure we do not have duplicates.
// Multiple folder numbers can happen if we use a flat structure /DF_<df>-<tablename>
std::unordered_set<size_t> seen;
for (auto key : *keyList) {
if (std::regex_match(((TObjString*)key)->GetString().Data(), TFRegex)) {
auto folderNumber = std::stoul(std::string(((TObjString*)key)->GetString().Data()).substr(3));
mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber);
std::smatch matchResult;
std::string keyName = ((TObjString*)key)->GetString().Data();
bool match = std::regex_match(keyName, matchResult, TFRegex);
if (match) {
auto folderNumber = std::stoul(matchResult[1].str());
if (seen.find(folderNumber) == seen.end()) {
seen.insert(folderNumber);
mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber);
}
}
}

if (mParentFileMap != nullptr) {
// If we have a parent map, we should not process in DF alphabetical order but according to parent file to avoid swapping between files
std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end(),
Expand All @@ -162,12 +197,8 @@ bool DataInputDescriptor::setFile(int counter)
std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end());
}

for (auto folderNumber : mfilenames[counter]->listOfTimeFrameNumbers) {
auto folderName = "DF_" + std::to_string(folderNumber);
mfilenames[counter]->listOfTimeFrameKeys.emplace_back(folderName);
mfilenames[counter]->alreadyRead.emplace_back(false);
}
mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameKeys.size();
mfilenames[counter]->alreadyRead.resize(mfilenames[counter]->alreadyRead.size() + mfilenames[counter]->listOfTimeFrameNumbers.size(), false);
mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameNumbers.size();
}

mCurrentFileID = counter;
Expand All @@ -193,26 +224,21 @@ uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF)
return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF];
}

FileAndFolder DataInputDescriptor::getFileFolder(int counter, int numTF)
arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF)
{
FileAndFolder fileAndFolder;

// open file
if (!setFile(counter)) {
return fileAndFolder;
return {};
}

// no TF left
if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) {
return fileAndFolder;
return {};
}

fileAndFolder.file = mcurrentFile;
fileAndFolder.folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF];

mfilenames[counter]->alreadyRead[numTF] = true;

return fileAndFolder;
return {fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]), mCurrentFilesystem};
}

DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename)
Expand All @@ -221,17 +247,19 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF,
// This file has no parent map
return nullptr;
}
auto folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF];
auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]);
auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str());
// The current DF is not found in the parent map (this should not happen and is a fatal error)
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
if (!parentFileName) {
// The current DF is not found in the parent map (this should not happen and is a fatal error)
throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), mcurrentFile->GetName()));
throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), rootFS->GetFile()->GetName()));
return nullptr;
}

if (mParentFile) {
// Is this still the corresponding to the correct file?
if (parentFileName->GetString().CompareTo(mParentFile->mcurrentFile->GetName()) == 0) {
auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(mParentFile->mCurrentFilesystem);
if (parentFileName->GetString().CompareTo(parentRootFS->GetFile()->GetName()) == 0) {
return mParentFile;
} else {
mParentFile->closeInputFile();
Expand All @@ -241,7 +269,8 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF,
}

if (mLevel == mAllowedParentLevel) {
throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(), mcurrentFile->GetName()));
throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(),
rootFS->GetFile()->GetName()));
}

LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str());
Expand Down Expand Up @@ -270,11 +299,13 @@ void DataInputDescriptor::printFileStatistics()
if (wait_time < 0) {
wait_time = 0;
}
std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", mcurrentFile->GetName(),
mcurrentFile->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), mcurrentFile->GetBytesRead(), mcurrentFile->GetReadCalls(),
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
auto f = dynamic_cast<TFile*>(rootFS->GetFile());
std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", f->GetName(),
f->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), f->GetBytesRead(), f->GetReadCalls(),
((float)mIOTime / 1e9), ((float)wait_time / 1e9), mLevel));
#if __has_include(<TJAlienFile.h>)
auto alienFile = dynamic_cast<TJAlienFile*>(mcurrentFile);
auto alienFile = dynamic_cast<TJAlienFile*>(f);
if (alienFile) {
monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
}
Expand All @@ -285,7 +316,7 @@ void DataInputDescriptor::printFileStatistics()

void DataInputDescriptor::closeInputFile()
{
if (mcurrentFile) {
if (mCurrentFilesystem.get()) {
if (mParentFile) {
mParentFile->closeInputFile();
delete mParentFile;
Expand All @@ -296,9 +327,7 @@ void DataInputDescriptor::closeInputFile()
mParentFileMap = nullptr;

printFileStatistics();
mcurrentFile->Close();
delete mcurrentFile;
mcurrentFile = nullptr;
mCurrentFilesystem.reset();
}
}

Expand Down Expand Up @@ -346,8 +375,8 @@ int DataInputDescriptor::fillInputfiles()

int DataInputDescriptor::findDFNumber(int file, std::string dfName)
{
auto dfList = mfilenames[file]->listOfTimeFrameKeys;
auto it = std::find(dfList.begin(), dfList.end(), dfName);
auto dfList = mfilenames[file]->listOfTimeFrameNumbers;
auto it = std::find_if(dfList.begin(), dfList.end(), [dfName](size_t i) { return fmt::format("DF_{}", i) == dfName; });
if (it == dfList.end()) {
return -1;
}
Expand All @@ -358,40 +387,67 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh
{
auto ioStart = uv_hrtime();

auto fileAndFolder = getFileFolder(counter, numTF);
if (!fileAndFolder.file) {
auto folder = getFileFolder(counter, numTF);
if (!folder.filesystem()) {
return false;
}

auto fullpath = fileAndFolder.folderName + "/" + treename;
auto tree = (TTree*)fileAndFolder.file->Get(fullpath.c_str());
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(folder.filesystem());

if (!rootFS) {
throw std::runtime_error(fmt::format(R"(Not a TFile filesystem!)"));
}
// FIXME: Ugly. We should detect the format from the treename, good enough for now.
std::shared_ptr<arrow::dataset::FileFormat> format;

auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()};

for (auto& capability : mFactory.capabilities) {
auto objectPath = capability.lfn2objectPath(fullpath.path());
void* handle = capability.getHandle(rootFS, objectPath);
if (handle) {
format = capability.factory().format();
break;
}
}

if (!tree) {
LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.c_str());
if (!format) {
throw std::runtime_error(fmt::format(R"(Cannot find a viable format for object {}!)", fullpath.path()));
}

auto schemaOpt = format->Inspect(fullpath);
auto schema = *schemaOpt;

auto fragment = format->MakeFragment(fullpath, {}, schema);

if (!fragment.ok()) {
LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path());
auto parentFile = getParentFile(counter, numTF, treename);
if (parentFile != nullptr) {
int parentNumTF = parentFile->findDFNumber(0, fileAndFolder.folderName);
int parentNumTF = parentFile->findDFNumber(0, folder.path());
if (parentNumTF == -1) {
throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", fileAndFolder.folderName, parentFile->mcurrentFile->GetName()));
auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", folder.path(), parentRootFS->GetFile()->GetName()));
}
// first argument is 0 as the parent file object contains only 1 file
return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed);
}
throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fileAndFolder.folderName + "/" + treename, fileAndFolder.file->GetName()));
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fullpath.path(), rootFS->GetFile()->GetName()));
}

// create table output
auto o = Output(dh);
auto t2t = outputs.make<TreeToTable>(o);

// add branches to read
// fill the table
t2t->setLabel(tree->GetName());
totalSizeCompressed += tree->GetZipBytes();
totalSizeUncompressed += tree->GetTotBytes();
t2t->addAllColumns(tree);
t2t->fill(tree);
delete tree;
// FIXME: This should allow me to create a memory pool
// which I can then use to scan the dataset.
//
auto f2b = outputs.make<FragmentToBatch>(o);

//// add branches to read
//// fill the table
f2b->setLabel(treename.c_str());
f2b->fill(*fragment, schema, format);

mIOTime += (uv_hrtime() - ioStart);

Expand Down Expand Up @@ -693,7 +749,7 @@ DataInputDescriptor* DataInputDirector::getDataInputDescriptor(header::DataHeade
return result;
}

FileAndFolder DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF)
arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF)
{
auto didesc = getDataInputDescriptor(dh);
// if NOT match then use defaultDataInputDescriptor
Expand Down
Loading