Skip to content

Commit 8c955dd

Browse files
committed
DPL: Move DataInputDirector to arrow::Dataset API
1 parent 7eaa964 commit 8c955dd

File tree

3 files changed

+77
-58
lines changed

3 files changed

+77
-58
lines changed

Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
// or submit itself to any jurisdiction.
1111

1212
#include "AODJAlienReaderHelpers.h"
13+
#include <memory>
1314
#include "Framework/TableTreeHelpers.h"
1415
#include "Framework/AnalysisHelpers.h"
1516
#include "Framework/DataProcessingStats.h"
1617
#include "Framework/RootTableBuilderHelpers.h"
18+
#include "Framework/RootArrowFilesystem.h"
1719
#include "Framework/AlgorithmSpec.h"
1820
#include "Framework/ConfigParamRegistry.h"
1921
#include "Framework/ControlService.h"
@@ -41,6 +43,8 @@
4143
#include <arrow/io/interfaces.h>
4244
#include <arrow/table.h>
4345
#include <arrow/util/key_value_metadata.h>
46+
#include <arrow/dataset/dataset.h>
47+
#include <arrow/dataset/file_base.h>
4448

4549
using namespace o2;
4650
using namespace o2::aod;
@@ -272,11 +276,13 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
272276
// Origin file name for derived output map
273277
auto o2 = Output(TFFileNameHeader);
274278
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
275-
std::string currentFilename(fileAndFolder.file->GetName());
276-
if (strcmp(fileAndFolder.file->GetEndpointUrl()->GetProtocol(), "file") == 0 && fileAndFolder.file->GetEndpointUrl()->GetFile()[0] != '/') {
279+
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(fileAndFolder.filesystem());
280+
auto* f = dynamic_cast<TFile*>(rootFS->GetFile());
281+
std::string currentFilename(f->GetFile()->GetName());
282+
if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') {
277283
// This is not an absolute local path. Make it absolute.
278284
static std::string pwd = gSystem->pwd() + std::string("/");
279-
currentFilename = pwd + std::string(fileAndFolder.file->GetName());
285+
currentFilename = pwd + std::string(f->GetName());
280286
}
281287
outputs.make<std::string>(o2) = currentFilename;
282288
}
@@ -312,7 +318,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
312318
auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher);
313319
auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
314320
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
315-
if (!fileAndFolder.file) {
321+
322+
// In case the filesource is empty, move to the next one.
323+
if (fileAndFolder.path().empty()) {
316324
fcnt += 1;
317325
ntf = 0;
318326
if (didir->atEnd(fcnt)) {

Framework/AnalysisSupport/src/DataInputDirector.cxx

Lines changed: 59 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "DataInputDirector.h"
1212
#include "Framework/DataDescriptorQueryBuilder.h"
1313
#include "Framework/Logger.h"
14+
#include "Framework/RootArrowFilesystem.h"
1415
#include "Framework/AnalysisDataModelHelpers.h"
1516
#include "Framework/Output.h"
1617
#include "Headers/DataHeader.h"
@@ -26,8 +27,12 @@
2627
#include "TGrid.h"
2728
#include "TObjString.h"
2829
#include "TMap.h"
30+
#include "TFile.h"
2931

32+
#include <arrow/dataset/file_base.h>
33+
#include <arrow/dataset/dataset.h>
3034
#include <uv.h>
35+
#include <memory>
3136

3237
#if __has_include(<TJAlienFile.h>)
3338
#include <TJAlienFile.h>
@@ -108,20 +113,22 @@ bool DataInputDescriptor::setFile(int counter)
108113

109114
// open file
110115
auto filename = mfilenames[counter]->fileName;
111-
if (mcurrentFile) {
112-
if (mcurrentFile->GetName() == filename) {
116+
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
117+
if (rootFS.get()) {
118+
if (rootFS->GetFile()->GetName() == filename) {
113119
return true;
114120
}
115121
closeInputFile();
116122
}
117-
mcurrentFile = TFile::Open(filename.c_str());
118-
if (!mcurrentFile) {
123+
124+
mCurrentFilesystem = std::make_shared<TFileFileSystem>(TFile::Open(filename.c_str()), 50 * 1024 * 1024);
125+
if (!mCurrentFilesystem.get()) {
119126
throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename));
120127
}
121-
mcurrentFile->SetReadaheadSize(50 * 1024 * 1024);
128+
rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
122129

123130
// get the parent file map if exists
124-
mParentFileMap = (TMap*)mcurrentFile->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
131+
mParentFileMap = (TMap*)rootFS->GetFile()->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
125132
if (mParentFileMap && !mParentFileReplacement.empty()) {
126133
auto pos = mParentFileReplacement.find(';');
127134
if (pos == std::string::npos) {
@@ -141,7 +148,7 @@ bool DataInputDescriptor::setFile(int counter)
141148
// get the directory names
142149
if (mfilenames[counter]->numberOfTimeFrames <= 0) {
143150
std::regex TFRegex = std::regex("DF_[0-9]+");
144-
TList* keyList = mcurrentFile->GetListOfKeys();
151+
TList* keyList = rootFS->GetFile()->GetListOfKeys();
145152

146153
// extract TF numbers and sort accordingly
147154
for (auto key : *keyList) {
@@ -193,26 +200,21 @@ uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF)
193200
return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF];
194201
}
195202

196-
FileAndFolder DataInputDescriptor::getFileFolder(int counter, int numTF)
203+
arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF)
197204
{
198-
FileAndFolder fileAndFolder;
199-
200205
// open file
201206
if (!setFile(counter)) {
202-
return fileAndFolder;
207+
return {};
203208
}
204209

205210
// no TF left
206211
if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) {
207-
return fileAndFolder;
212+
return {};
208213
}
209214

210-
fileAndFolder.file = mcurrentFile;
211-
fileAndFolder.folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF];
212-
213215
mfilenames[counter]->alreadyRead[numTF] = true;
214216

215-
return fileAndFolder;
217+
return {(mfilenames[counter]->listOfTimeFrameKeys)[numTF], mCurrentFilesystem};
216218
}
217219

218220
DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename)
@@ -223,15 +225,17 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF,
223225
}
224226
auto folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF];
225227
auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str());
228+
// The current DF is not found in the parent map (this should not happen and is a fatal error)
229+
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
226230
if (!parentFileName) {
227-
// The current DF is not found in the parent map (this should not happen and is a fatal error)
228-
throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), mcurrentFile->GetName()));
231+
throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), rootFS->GetFile()->GetName()));
229232
return nullptr;
230233
}
231234

232235
if (mParentFile) {
233236
// Is this still the corresponding to the correct file?
234-
if (parentFileName->GetString().CompareTo(mParentFile->mcurrentFile->GetName()) == 0) {
237+
auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(mParentFile->mCurrentFilesystem);
238+
if (parentFileName->GetString().CompareTo(parentRootFS->GetFile()->GetName()) == 0) {
235239
return mParentFile;
236240
} else {
237241
mParentFile->closeInputFile();
@@ -241,7 +245,8 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF,
241245
}
242246

243247
if (mLevel == mAllowedParentLevel) {
244-
throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(), mcurrentFile->GetName()));
248+
throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(),
249+
rootFS->GetFile()->GetName()));
245250
}
246251

247252
LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str());
@@ -270,11 +275,13 @@ void DataInputDescriptor::printFileStatistics()
270275
if (wait_time < 0) {
271276
wait_time = 0;
272277
}
273-
std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", mcurrentFile->GetName(),
274-
mcurrentFile->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), mcurrentFile->GetBytesRead(), mcurrentFile->GetReadCalls(),
278+
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
279+
auto f = dynamic_cast<TFile*>(rootFS->GetFile());
280+
std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", f->GetName(),
281+
f->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), f->GetBytesRead(), f->GetReadCalls(),
275282
((float)mIOTime / 1e9), ((float)wait_time / 1e9), mLevel));
276283
#if __has_include(<TJAlienFile.h>)
277-
auto alienFile = dynamic_cast<TJAlienFile*>(mcurrentFile);
284+
auto alienFile = dynamic_cast<TJAlienFile*>(f);
278285
if (alienFile) {
279286
monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
280287
}
@@ -285,7 +292,7 @@ void DataInputDescriptor::printFileStatistics()
285292

286293
void DataInputDescriptor::closeInputFile()
287294
{
288-
if (mcurrentFile) {
295+
if (mCurrentFilesystem.get()) {
289296
if (mParentFile) {
290297
mParentFile->closeInputFile();
291298
delete mParentFile;
@@ -296,9 +303,9 @@ void DataInputDescriptor::closeInputFile()
296303
mParentFileMap = nullptr;
297304

298305
printFileStatistics();
299-
mcurrentFile->Close();
300-
delete mcurrentFile;
301-
mcurrentFile = nullptr;
306+
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
307+
rootFS->GetFile()->Close();
308+
mCurrentFilesystem.reset();
302309
}
303310
}
304311

@@ -358,40 +365,46 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh
358365
{
359366
auto ioStart = uv_hrtime();
360367

361-
auto fileAndFolder = getFileFolder(counter, numTF);
362-
if (!fileAndFolder.file) {
368+
auto folder = getFileFolder(counter, numTF);
369+
if (!folder.filesystem()) {
363370
return false;
364371
}
365372

366-
auto fullpath = fileAndFolder.folderName + "/" + treename;
367-
auto tree = (TTree*)fileAndFolder.file->Get(fullpath.c_str());
373+
auto format = std::make_shared<TTreeFileFormat>(totalSizeCompressed, totalSizeUncompressed);
374+
auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()};
375+
auto schemaOpt = format->Inspect(fullpath);
376+
auto schema = *schemaOpt;
368377

369-
if (!tree) {
370-
LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.c_str());
378+
auto fragment = format->MakeFragment(fullpath, {}, schema);
379+
380+
if (!fragment.ok()) {
381+
LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path());
371382
auto parentFile = getParentFile(counter, numTF, treename);
372383
if (parentFile != nullptr) {
373-
int parentNumTF = parentFile->findDFNumber(0, fileAndFolder.folderName);
384+
int parentNumTF = parentFile->findDFNumber(0, folder.path());
374385
if (parentNumTF == -1) {
375-
throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", fileAndFolder.folderName, parentFile->mcurrentFile->GetName()));
386+
auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
387+
throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", folder.path(), parentRootFS->GetFile()->GetName()));
376388
}
377389
// first argument is 0 as the parent file object contains only 1 file
378390
return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed);
379391
}
380-
throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fileAndFolder.folderName + "/" + treename, fileAndFolder.file->GetName()));
392+
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
393+
throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fullpath.path(), rootFS->GetFile()->GetName()));
381394
}
382395

383396
// create table output
384397
auto o = Output(dh);
385-
auto t2t = outputs.make<TreeToTable>(o);
386398

387-
// add branches to read
388-
// fill the table
389-
t2t->setLabel(tree->GetName());
390-
totalSizeCompressed += tree->GetZipBytes();
391-
totalSizeUncompressed += tree->GetTotBytes();
392-
t2t->addAllColumns(tree);
393-
t2t->fill(tree);
394-
delete tree;
399+
// FIXME: This should allow me to create a memory pool
400+
// which I can then use to scan the dataset.
401+
//
402+
auto f2b = outputs.make<FragmentToBatch>(o);
403+
404+
//// add branches to read
405+
//// fill the table
406+
f2b->setLabel(treename.c_str());
407+
f2b->fill(*fragment, schema, format);
395408

396409
mIOTime += (uv_hrtime() - ioStart);
397410

@@ -693,7 +706,7 @@ DataInputDescriptor* DataInputDirector::getDataInputDescriptor(header::DataHeade
693706
return result;
694707
}
695708

696-
FileAndFolder DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF)
709+
arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF)
697710
{
698711
auto didesc = getDataInputDescriptor(dh);
699712
// if NOT match then use defaultDataInputDescriptor

Framework/AnalysisSupport/src/DataInputDirector.h

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
#include "Framework/DataDescriptorMatcher.h"
1717
#include "Framework/DataAllocator.h"
1818

19+
#include <arrow/filesystem/filesystem.h>
20+
#include <arrow/dataset/dataset.h>
21+
1922
#include <regex>
2023
#include "rapidjson/fwd.h"
2124

@@ -36,11 +39,6 @@ struct FileNameHolder {
3639
};
3740
FileNameHolder* makeFileNameHolder(std::string fileName);
3841

39-
struct FileAndFolder {
40-
TFile* file = nullptr;
41-
std::string folderName = "";
42-
};
43-
4442
class DataInputDescriptor
4543
{
4644
/// Holds information concerning the reading of an aod table.
@@ -78,7 +76,7 @@ class DataInputDescriptor
7876
int findDFNumber(int file, std::string dfName);
7977

8078
uint64_t getTimeFrameNumber(int counter, int numTF);
81-
FileAndFolder getFileFolder(int counter, int numTF);
79+
arrow::dataset::FileSource getFileFolder(int counter, int numTF);
8280
DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename);
8381
int getTimeFramesInFile(int counter);
8482
int getReadTimeFramesInFile(int counter);
@@ -98,7 +96,7 @@ class DataInputDescriptor
9896
std::string mParentFileReplacement;
9997
std::vector<FileNameHolder*> mfilenames;
10098
std::vector<FileNameHolder*>* mdefaultFilenamesPtr = nullptr;
101-
TFile* mcurrentFile = nullptr;
99+
std::shared_ptr<arrow::fs::FileSystem> mCurrentFilesystem;
102100
int mCurrentFileID = -1;
103101
bool mAlienSupport = false;
104102

@@ -143,7 +141,7 @@ class DataInputDirector
143141

144142
bool readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed);
145143
uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF);
146-
FileAndFolder getFileFolder(header::DataHeader dh, int counter, int numTF);
144+
arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF);
147145
int getTimeFramesInFile(header::DataHeader dh, int counter);
148146

149147
uint64_t getTotalSizeCompressed();

0 commit comments

Comments
 (0)