Skip to content

Commit 8c306c3

Browse files
committed
DPL: Move DataInputDirector to arrow::Dataset API
1 parent 9a3eb3e commit 8c306c3

File tree

5 files changed

+155
-81
lines changed

5 files changed

+155
-81
lines changed

Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
// or submit itself to any jurisdiction.
1111

1212
#include "AODJAlienReaderHelpers.h"
13+
#include <memory>
1314
#include "Framework/TableTreeHelpers.h"
1415
#include "Framework/AnalysisHelpers.h"
1516
#include "Framework/DataProcessingStats.h"
1617
#include "Framework/RootTableBuilderHelpers.h"
18+
#include "Framework/RootArrowFilesystem.h"
1719
#include "Framework/AlgorithmSpec.h"
1820
#include "Framework/ConfigParamRegistry.h"
1921
#include "Framework/ControlService.h"
@@ -41,6 +43,8 @@
4143
#include <arrow/io/interfaces.h>
4244
#include <arrow/table.h>
4345
#include <arrow/util/key_value_metadata.h>
46+
#include <arrow/dataset/dataset.h>
47+
#include <arrow/dataset/file_base.h>
4448

4549
using namespace o2;
4650
using namespace o2::aod;
@@ -272,11 +276,13 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
272276
// Origin file name for derived output map
273277
auto o2 = Output(TFFileNameHeader);
274278
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
275-
std::string currentFilename(fileAndFolder.file->GetName());
276-
if (strcmp(fileAndFolder.file->GetEndpointUrl()->GetProtocol(), "file") == 0 && fileAndFolder.file->GetEndpointUrl()->GetFile()[0] != '/') {
279+
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(fileAndFolder.filesystem());
280+
auto* f = dynamic_cast<TFile*>(rootFS->GetFile());
281+
std::string currentFilename(f->GetFile()->GetName());
282+
if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') {
277283
// This is not an absolute local path. Make it absolute.
278284
static std::string pwd = gSystem->pwd() + std::string("/");
279-
currentFilename = pwd + std::string(fileAndFolder.file->GetName());
285+
currentFilename = pwd + std::string(f->GetName());
280286
}
281287
outputs.make<std::string>(o2) = currentFilename;
282288
}
@@ -312,7 +318,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
312318
auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher);
313319
auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
314320
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
315-
if (!fileAndFolder.file) {
321+
322+
// In case the filesource is empty, move to the next one.
323+
if (fileAndFolder.path().empty()) {
316324
fcnt += 1;
317325
ntf = 0;
318326
if (didir->atEnd(fcnt)) {

Framework/AnalysisSupport/src/DataInputDirector.cxx

Lines changed: 128 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
#include "DataInputDirector.h"
1212
#include "Framework/DataDescriptorQueryBuilder.h"
1313
#include "Framework/Logger.h"
14+
#include "Framework/PluginManager.h"
15+
#include "Framework/RootArrowFilesystem.h"
1416
#include "Framework/AnalysisDataModelHelpers.h"
1517
#include "Framework/Output.h"
1618
#include "Headers/DataHeader.h"
@@ -26,8 +28,12 @@
2628
#include "TGrid.h"
2729
#include "TObjString.h"
2830
#include "TMap.h"
31+
#include "TFile.h"
2932

33+
#include <arrow/dataset/file_base.h>
34+
#include <arrow/dataset/dataset.h>
3035
#include <uv.h>
36+
#include <memory>
3137

3238
#if __has_include(<TJAlienFile.h>)
3339
#include <TJAlienFile.h>
@@ -47,12 +53,27 @@ FileNameHolder* makeFileNameHolder(std::string fileName)
4753
return fileNameHolder;
4854
}
4955

50-
DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mAlienSupport(alienSupport),
51-
mMonitoring(monitoring),
52-
mAllowedParentLevel(allowedParentLevel),
53-
mParentFileReplacement(std::move(parentFileReplacement)),
54-
mLevel(level)
56+
DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement)
57+
: mAlienSupport(alienSupport),
58+
mMonitoring(monitoring),
59+
mAllowedParentLevel(allowedParentLevel),
60+
mParentFileReplacement(std::move(parentFileReplacement)),
61+
mLevel(level)
5562
{
63+
std::vector<char const*> capabilitiesSpecs = {
64+
"O2Framework:RNTupleObjectReadingCapability",
65+
"O2Framework:TTreeObjectReadingCapability",
66+
};
67+
68+
std::vector<LoadablePlugin> plugins;
69+
for (auto spec : capabilitiesSpecs) {
70+
auto morePlugins = PluginManager::parsePluginSpecString(spec);
71+
for (auto& extra : morePlugins) {
72+
plugins.push_back(extra);
73+
}
74+
}
75+
76+
PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, mFactory.capabilities);
5677
}
5778

5879
void DataInputDescriptor::printOut()
@@ -108,20 +129,22 @@ bool DataInputDescriptor::setFile(int counter)
108129

109130
// open file
110131
auto filename = mfilenames[counter]->fileName;
111-
if (mcurrentFile) {
112-
if (mcurrentFile->GetName() == filename) {
132+
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
133+
if (rootFS.get()) {
134+
if (rootFS->GetFile()->GetName() == filename) {
113135
return true;
114136
}
115137
closeInputFile();
116138
}
117-
mcurrentFile = TFile::Open(filename.c_str());
118-
if (!mcurrentFile) {
139+
140+
mCurrentFilesystem = std::make_shared<TFileFileSystem>(TFile::Open(filename.c_str()), 50 * 1024 * 1024, mFactory);
141+
if (!mCurrentFilesystem.get()) {
119142
throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename));
120143
}
121-
mcurrentFile->SetReadaheadSize(50 * 1024 * 1024);
144+
rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
122145

123146
// get the parent file map if exists
124-
mParentFileMap = (TMap*)mcurrentFile->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
147+
mParentFileMap = (TMap*)rootFS->GetFile()->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
125148
if (mParentFileMap && !mParentFileReplacement.empty()) {
126149
auto pos = mParentFileReplacement.find(';');
127150
if (pos == std::string::npos) {
@@ -140,16 +163,28 @@ bool DataInputDescriptor::setFile(int counter)
140163

141164
// get the directory names
142165
if (mfilenames[counter]->numberOfTimeFrames <= 0) {
143-
std::regex TFRegex = std::regex("DF_[0-9]+");
144-
TList* keyList = mcurrentFile->GetListOfKeys();
166+
const std::regex TFRegex = std::regex("/?DF_([0-9]+)(|-.*)$");
167+
TList* keyList = rootFS->GetFile()->GetListOfKeys();
168+
std::vector<std::string> finalList;
145169

146170
// extract TF numbers and sort accordingly
171+
// We use an extra seen set to make sure we preserve the order in which
172+
// we instert things in the final list and to make sure we do not have duplicates.
173+
// Multiple folder numbers can happen if we use a flat structure /DF_<df>-<tablename>
174+
std::unordered_set<size_t> seen;
147175
for (auto key : *keyList) {
148-
if (std::regex_match(((TObjString*)key)->GetString().Data(), TFRegex)) {
149-
auto folderNumber = std::stoul(std::string(((TObjString*)key)->GetString().Data()).substr(3));
150-
mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber);
176+
std::smatch matchResult;
177+
std::string keyName = ((TObjString*)key)->GetString().Data();
178+
bool match = std::regex_match(keyName, matchResult, TFRegex);
179+
if (match) {
180+
auto folderNumber = std::stoul(matchResult[1].str());
181+
if (seen.find(folderNumber) == seen.end()) {
182+
seen.insert(folderNumber);
183+
mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber);
184+
}
151185
}
152186
}
187+
153188
if (mParentFileMap != nullptr) {
154189
// If we have a parent map, we should not process in DF alphabetical order but according to parent file to avoid swapping between files
155190
std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end(),
@@ -162,12 +197,8 @@ bool DataInputDescriptor::setFile(int counter)
162197
std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end());
163198
}
164199

165-
for (auto folderNumber : mfilenames[counter]->listOfTimeFrameNumbers) {
166-
auto folderName = "DF_" + std::to_string(folderNumber);
167-
mfilenames[counter]->listOfTimeFrameKeys.emplace_back(folderName);
168-
mfilenames[counter]->alreadyRead.emplace_back(false);
169-
}
170-
mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameKeys.size();
200+
mfilenames[counter]->alreadyRead.resize(mfilenames[counter]->alreadyRead.size() + mfilenames[counter]->listOfTimeFrameNumbers.size(), false);
201+
mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameNumbers.size();
171202
}
172203

173204
mCurrentFileID = counter;
@@ -193,26 +224,21 @@ uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF)
193224
return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF];
194225
}
195226

196-
FileAndFolder DataInputDescriptor::getFileFolder(int counter, int numTF)
227+
arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF)
197228
{
198-
FileAndFolder fileAndFolder;
199-
200229
// open file
201230
if (!setFile(counter)) {
202-
return fileAndFolder;
231+
return {};
203232
}
204233

205234
// no TF left
206235
if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) {
207-
return fileAndFolder;
236+
return {};
208237
}
209238

210-
fileAndFolder.file = mcurrentFile;
211-
fileAndFolder.folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF];
212-
213239
mfilenames[counter]->alreadyRead[numTF] = true;
214240

215-
return fileAndFolder;
241+
return {fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]), mCurrentFilesystem};
216242
}
217243

218244
DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename)
@@ -221,17 +247,19 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF,
221247
// This file has no parent map
222248
return nullptr;
223249
}
224-
auto folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF];
250+
auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]);
225251
auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str());
252+
// The current DF is not found in the parent map (this should not happen and is a fatal error)
253+
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
226254
if (!parentFileName) {
227-
// The current DF is not found in the parent map (this should not happen and is a fatal error)
228-
throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), mcurrentFile->GetName()));
255+
throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), rootFS->GetFile()->GetName()));
229256
return nullptr;
230257
}
231258

232259
if (mParentFile) {
233260
// Is this still the corresponding to the correct file?
234-
if (parentFileName->GetString().CompareTo(mParentFile->mcurrentFile->GetName()) == 0) {
261+
auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(mParentFile->mCurrentFilesystem);
262+
if (parentFileName->GetString().CompareTo(parentRootFS->GetFile()->GetName()) == 0) {
235263
return mParentFile;
236264
} else {
237265
mParentFile->closeInputFile();
@@ -241,7 +269,8 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF,
241269
}
242270

243271
if (mLevel == mAllowedParentLevel) {
244-
throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(), mcurrentFile->GetName()));
272+
throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(),
273+
rootFS->GetFile()->GetName()));
245274
}
246275

247276
LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str());
@@ -270,11 +299,13 @@ void DataInputDescriptor::printFileStatistics()
270299
if (wait_time < 0) {
271300
wait_time = 0;
272301
}
273-
std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", mcurrentFile->GetName(),
274-
mcurrentFile->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), mcurrentFile->GetBytesRead(), mcurrentFile->GetReadCalls(),
302+
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
303+
auto f = dynamic_cast<TFile*>(rootFS->GetFile());
304+
std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", f->GetName(),
305+
f->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), f->GetBytesRead(), f->GetReadCalls(),
275306
((float)mIOTime / 1e9), ((float)wait_time / 1e9), mLevel));
276307
#if __has_include(<TJAlienFile.h>)
277-
auto alienFile = dynamic_cast<TJAlienFile*>(mcurrentFile);
308+
auto alienFile = dynamic_cast<TJAlienFile*>(f);
278309
if (alienFile) {
279310
monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
280311
}
@@ -285,7 +316,7 @@ void DataInputDescriptor::printFileStatistics()
285316

286317
void DataInputDescriptor::closeInputFile()
287318
{
288-
if (mcurrentFile) {
319+
if (mCurrentFilesystem.get()) {
289320
if (mParentFile) {
290321
mParentFile->closeInputFile();
291322
delete mParentFile;
@@ -296,9 +327,7 @@ void DataInputDescriptor::closeInputFile()
296327
mParentFileMap = nullptr;
297328

298329
printFileStatistics();
299-
mcurrentFile->Close();
300-
delete mcurrentFile;
301-
mcurrentFile = nullptr;
330+
mCurrentFilesystem.reset();
302331
}
303332
}
304333

@@ -346,8 +375,8 @@ int DataInputDescriptor::fillInputfiles()
346375

347376
int DataInputDescriptor::findDFNumber(int file, std::string dfName)
348377
{
349-
auto dfList = mfilenames[file]->listOfTimeFrameKeys;
350-
auto it = std::find(dfList.begin(), dfList.end(), dfName);
378+
auto dfList = mfilenames[file]->listOfTimeFrameNumbers;
379+
auto it = std::find_if(dfList.begin(), dfList.end(), [dfName](size_t i) { return fmt::format("DF_{}", i) == dfName; });
351380
if (it == dfList.end()) {
352381
return -1;
353382
}
@@ -358,40 +387,75 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh
358387
{
359388
auto ioStart = uv_hrtime();
360389

361-
auto fileAndFolder = getFileFolder(counter, numTF);
362-
if (!fileAndFolder.file) {
390+
auto folder = getFileFolder(counter, numTF);
391+
if (!folder.filesystem()) {
363392
return false;
364393
}
365394

366-
auto fullpath = fileAndFolder.folderName + "/" + treename;
367-
auto tree = (TTree*)fileAndFolder.file->Get(fullpath.c_str());
395+
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(folder.filesystem());
396+
397+
if (!rootFS) {
398+
throw std::runtime_error(fmt::format(R"(Not a TFile filesystem!)"));
399+
}
400+
// FIXME: Ugly. We should detect the format from the treename, good enough for now.
401+
std::shared_ptr<arrow::dataset::FileFormat> format;
402+
403+
auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()};
404+
405+
for (auto& capability : mFactory.capabilities) {
406+
auto objectPath = capability.lfn2objectPath(fullpath.path());
407+
void* handle = capability.getHandle(rootFS, objectPath);
408+
if (handle) {
409+
format = capability.factory().format();
410+
break;
411+
}
412+
}
413+
414+
if (!format) {
415+
throw std::runtime_error(fmt::format(R"(Cannot find a viable format for object {}!)", fullpath.path()));
416+
}
417+
418+
auto schemaOpt = format->Inspect(fullpath);
419+
auto physicalSchema = schemaOpt;
420+
std::vector<std::shared_ptr<arrow::Field>> fields;
421+
for (auto& original : (*schemaOpt)->fields()) {
422+
if (original->name().ends_with("_size")) {
423+
continue;
424+
}
425+
fields.push_back(original);
426+
}
427+
auto datasetSchema = std::make_shared<arrow::Schema>(fields);
428+
429+
auto fragment = format->MakeFragment(fullpath, {}, *physicalSchema);
368430

369-
if (!tree) {
370-
LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.c_str());
431+
if (!fragment.ok()) {
432+
LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path());
371433
auto parentFile = getParentFile(counter, numTF, treename);
372434
if (parentFile != nullptr) {
373-
int parentNumTF = parentFile->findDFNumber(0, fileAndFolder.folderName);
435+
int parentNumTF = parentFile->findDFNumber(0, folder.path());
374436
if (parentNumTF == -1) {
375-
throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", fileAndFolder.folderName, parentFile->mcurrentFile->GetName()));
437+
auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
438+
throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", folder.path(), parentRootFS->GetFile()->GetName()));
376439
}
377440
// first argument is 0 as the parent file object contains only 1 file
378441
return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed);
379442
}
380-
throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fileAndFolder.folderName + "/" + treename, fileAndFolder.file->GetName()));
443+
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
444+
throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fullpath.path(), rootFS->GetFile()->GetName()));
381445
}
382446

383447
// create table output
384448
auto o = Output(dh);
385-
auto t2t = outputs.make<TreeToTable>(o);
386449

387-
// add branches to read
388-
// fill the table
389-
t2t->setLabel(tree->GetName());
390-
totalSizeCompressed += tree->GetZipBytes();
391-
totalSizeUncompressed += tree->GetTotBytes();
392-
t2t->addAllColumns(tree);
393-
t2t->fill(tree);
394-
delete tree;
450+
// FIXME: This should allow me to create a memory pool
451+
// which I can then use to scan the dataset.
452+
//
453+
auto f2b = outputs.make<FragmentToBatch>(o);
454+
455+
//// add branches to read
456+
//// fill the table
457+
f2b->setLabel(treename.c_str());
458+
f2b->fill(*fragment, datasetSchema, format);
395459

396460
mIOTime += (uv_hrtime() - ioStart);
397461

@@ -693,7 +757,7 @@ DataInputDescriptor* DataInputDirector::getDataInputDescriptor(header::DataHeade
693757
return result;
694758
}
695759

696-
FileAndFolder DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF)
760+
arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF)
697761
{
698762
auto didesc = getDataInputDescriptor(dh);
699763
// if NOT match then use defaultDataInputDescriptor

0 commit comments

Comments
 (0)