Skip to content

Commit e9b282d

Browse files
committed
DPL: Move DataInputDirector to arrow::Dataset API
1 parent 65192e6 commit e9b282d

File tree

7 files changed

+192
-88
lines changed

7 files changed

+192
-88
lines changed

Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
// or submit itself to any jurisdiction.
1111

1212
#include "AODJAlienReaderHelpers.h"
13+
#include <memory>
1314
#include "Framework/TableTreeHelpers.h"
1415
#include "Framework/AnalysisHelpers.h"
1516
#include "Framework/DataProcessingStats.h"
1617
#include "Framework/RootTableBuilderHelpers.h"
18+
#include "Framework/RootArrowFilesystem.h"
1719
#include "Framework/AlgorithmSpec.h"
1820
#include "Framework/ConfigParamRegistry.h"
1921
#include "Framework/ControlService.h"
@@ -41,6 +43,8 @@
4143
#include <arrow/io/interfaces.h>
4244
#include <arrow/table.h>
4345
#include <arrow/util/key_value_metadata.h>
46+
#include <arrow/dataset/dataset.h>
47+
#include <arrow/dataset/file_base.h>
4448

4549
using namespace o2;
4650
using namespace o2::aod;
@@ -272,11 +276,13 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
272276
// Origin file name for derived output map
273277
auto o2 = Output(TFFileNameHeader);
274278
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
275-
std::string currentFilename(fileAndFolder.file->GetName());
276-
if (strcmp(fileAndFolder.file->GetEndpointUrl()->GetProtocol(), "file") == 0 && fileAndFolder.file->GetEndpointUrl()->GetFile()[0] != '/') {
279+
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(fileAndFolder.filesystem());
280+
auto* f = dynamic_cast<TFile*>(rootFS->GetFile());
281+
std::string currentFilename(f->GetFile()->GetName());
282+
if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') {
277283
// This is not an absolute local path. Make it absolute.
278284
static std::string pwd = gSystem->pwd() + std::string("/");
279-
currentFilename = pwd + std::string(fileAndFolder.file->GetName());
285+
currentFilename = pwd + std::string(f->GetName());
280286
}
281287
outputs.make<std::string>(o2) = currentFilename;
282288
}
@@ -312,7 +318,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
312318
auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher);
313319
auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
314320
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
315-
if (!fileAndFolder.file) {
321+
322+
// In case the filesource is empty, move to the next one.
323+
if (fileAndFolder.path().empty()) {
316324
fcnt += 1;
317325
ntf = 0;
318326
if (didir->atEnd(fcnt)) {

Framework/AnalysisSupport/src/DataInputDirector.cxx

Lines changed: 128 additions & 64 deletions
Large diffs are not rendered by default.

Framework/AnalysisSupport/src/DataInputDirector.h

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515

1616
#include "Framework/DataDescriptorMatcher.h"
1717
#include "Framework/DataAllocator.h"
18+
#include "Framework/RootArrowFilesystem.h"
19+
20+
#include <arrow/filesystem/filesystem.h>
21+
#include <arrow/dataset/dataset.h>
1822

1923
#include <regex>
2024
#include "rapidjson/fwd.h"
@@ -31,16 +35,10 @@ struct FileNameHolder {
3135
std::string fileName;
3236
int numberOfTimeFrames = 0;
3337
std::vector<uint64_t> listOfTimeFrameNumbers;
34-
std::vector<std::string> listOfTimeFrameKeys;
3538
std::vector<bool> alreadyRead;
3639
};
3740
FileNameHolder* makeFileNameHolder(std::string fileName);
3841

39-
struct FileAndFolder {
40-
TFile* file = nullptr;
41-
std::string folderName = "";
42-
};
43-
4442
class DataInputDescriptor
4543
{
4644
/// Holds information concerning the reading of an aod table.
@@ -52,7 +50,6 @@ class DataInputDescriptor
5250
std::string treename = "";
5351
std::unique_ptr<data_matcher::DataDescriptorMatcher> matcher;
5452

55-
DataInputDescriptor() = default;
5653
DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring = nullptr, int allowedParentLevel = 0, std::string parentFileReplacement = "");
5754

5855
void printOut();
@@ -78,7 +75,7 @@ class DataInputDescriptor
7875
int findDFNumber(int file, std::string dfName);
7976

8077
uint64_t getTimeFrameNumber(int counter, int numTF);
81-
FileAndFolder getFileFolder(int counter, int numTF);
78+
arrow::dataset::FileSource getFileFolder(int counter, int numTF);
8279
DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename);
8380
int getTimeFramesInFile(int counter);
8481
int getReadTimeFramesInFile(int counter);
@@ -90,6 +87,7 @@ class DataInputDescriptor
9087
bool isAlienSupportOn() { return mAlienSupport; }
9188

9289
private:
90+
o2::framework::RootObjectReadingFactory mFactory;
9391
std::string minputfilesFile = "";
9492
std::string* minputfilesFilePtr = nullptr;
9593
std::string mFilenameRegex = "";
@@ -98,7 +96,7 @@ class DataInputDescriptor
9896
std::string mParentFileReplacement;
9997
std::vector<FileNameHolder*> mfilenames;
10098
std::vector<FileNameHolder*>* mdefaultFilenamesPtr = nullptr;
101-
TFile* mcurrentFile = nullptr;
99+
std::shared_ptr<arrow::fs::FileSystem> mCurrentFilesystem;
102100
int mCurrentFileID = -1;
103101
bool mAlienSupport = false;
104102

@@ -127,7 +125,6 @@ class DataInputDirector
127125
~DataInputDirector();
128126

129127
void reset();
130-
void createDefaultDataInputDescriptor();
131128
void printOut();
132129
bool atEnd(int counter);
133130

@@ -140,10 +137,11 @@ class DataInputDirector
140137
// getters
141138
DataInputDescriptor* getDataInputDescriptor(header::DataHeader dh);
142139
int getNumberInputDescriptors() { return mdataInputDescriptors.size(); }
140+
void createDefaultDataInputDescriptor();
143141

144142
bool readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed);
145143
uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF);
146-
FileAndFolder getFileFolder(header::DataHeader dh, int counter, int numTF);
144+
arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF);
147145
int getTimeFramesInFile(header::DataHeader dh, int counter);
148146

149147
uint64_t getTotalSizeCompressed();

Framework/AnalysisSupport/src/TTreePlugin.cxx

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "Framework/Plugins.h"
1414
#include "Framework/Signpost.h"
1515
#include "Framework/Endian.h"
16+
#include <arrow/buffer.h>
1617
#include <arrow/dataset/file_base.h>
1718
#include <arrow/extension_type.h>
1819
#include <arrow/status.h>
@@ -286,6 +287,8 @@ arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
286287

287288
auto generator = [pool = options->pool, treeFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize,
288289
&totalUncompressedSize = mTotUncompressedSize]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
290+
O2_SIGNPOST_ID_FROM_POINTER(tid, root_arrow_fs, treeFragment->GetTree());
291+
O2_SIGNPOST_START(root_arrow_fs, tid, "Generator", "Creating batch for tree %{public}s", treeFragment->GetTree()->GetName());
289292
std::vector<std::shared_ptr<arrow::Array>> columns;
290293
std::vector<std::shared_ptr<arrow::Field>> fields = dataset_schema->fields();
291294
auto physical_schema = *treeFragment->ReadPhysicalSchema();
@@ -299,27 +302,48 @@ arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
299302

300303
for (int fi = 0; fi < dataset_schema->num_fields(); ++fi) {
301304
auto dataset_field = dataset_schema->field(fi);
305+
// This is needed because for now the dataset_field
306+
// is actually the schema of the ttree
307+
O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Processing dataset field %{public}s.", dataset_field->name().c_str());
302308
int physicalFieldIdx = physical_schema->GetFieldIndex(dataset_field->name());
303309

304310
if (physicalFieldIdx < 0) {
305311
throw runtime_error_f("Cannot find physical field associated to %s", dataset_field->name().c_str());
306312
}
307313
if (physicalFieldIdx > 1 && physical_schema->field(physicalFieldIdx - 1)->name().ends_with("_size")) {
314+
O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s has sizes in %{public}s.", dataset_field->name().c_str(),
315+
physical_schema->field(physicalFieldIdx - 1)->name().c_str());
308316
mappings.push_back({physicalFieldIdx, physicalFieldIdx - 1, fi});
309317
} else {
318+
if (physicalFieldIdx > 1) {
319+
O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s previous field is %{public}s.", dataset_field->name().c_str(),
320+
physical_schema->field(physicalFieldIdx - 1)->name().c_str());
321+
}
310322
mappings.push_back({physicalFieldIdx, -1, fi});
311323
}
312324
}
313325

314326
auto* tree = treeFragment->GetTree();
315-
tree->SetCacheSize(25000000);
316327
auto branches = tree->GetListOfBranches();
328+
size_t totalTreeSize = 0;
329+
std::vector<TBranch*> selectedBranches;
317330
for (auto& mapping : mappings) {
318-
tree->AddBranchToCache((TBranch*)branches->At(mapping.mainBranchIdx), false);
331+
selectedBranches.push_back((TBranch*)branches->At(mapping.mainBranchIdx));
332+
O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
333+
totalTreeSize += selectedBranches.back()->GetTotalSize();
319334
if (mapping.vlaIdx != -1) {
320-
tree->AddBranchToCache((TBranch*)branches->At(mapping.vlaIdx), false);
335+
selectedBranches.push_back((TBranch*)branches->At(mapping.vlaIdx));
336+
O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
337+
totalTreeSize += selectedBranches.back()->GetTotalSize();
321338
}
322339
}
340+
341+
size_t cacheSize = std::max(std::min(totalTreeSize, 25000000UL), 1000000UL);
342+
O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Resizing cache to %zu.", cacheSize);
343+
tree->SetCacheSize(cacheSize);
344+
for (auto* branch : selectedBranches) {
345+
tree->AddBranchToCache(branch, false);
346+
}
323347
tree->StopCacheLearningPhase();
324348

325349
static TBufferFile buffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024};
@@ -400,9 +424,7 @@ arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
400424
}
401425
} else {
402426
// This is needed for branches which have not been persisted.
403-
auto bytes = branch->GetTotBytes();
404-
auto branchSize = bytes ? bytes : 1000000;
405-
auto&& result = arrow::AllocateResizableBuffer(branchSize, pool);
427+
auto&& result = arrow::AllocateBuffer(branch->GetTotalSize(), pool);
406428
if (!result.ok()) {
407429
throw runtime_error("Cannot allocate values buffer");
408430
}
@@ -423,7 +445,7 @@ arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
423445
if (mapping.vlaIdx != -1) {
424446
auto* mSizeBranch = (TBranch*)branches->At(mapping.vlaIdx);
425447
offsetBuffer = std::make_unique<TBufferFile>(TBuffer::EMode::kWrite, 4 * 1024 * 1024);
426-
result = arrow::AllocateResizableBuffer((totalEntries + 1) * (int64_t)sizeof(int), pool);
448+
result = arrow::AllocateBuffer((totalEntries + 1) * (int64_t)sizeof(int), pool);
427449
if (!result.ok()) {
428450
throw runtime_error("Cannot allocate offset buffer");
429451
}
@@ -435,6 +457,9 @@ arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
435457
// read sizes first
436458
while (readEntries < totalEntries) {
437459
auto readLast = mSizeBranch->GetBulkRead().GetEntriesSerialized(readEntries, *offsetBuffer);
460+
if (readLast == -1) {
461+
throw runtime_error_f("Unable to read from branch %s.", mSizeBranch->GetName());
462+
}
438463
readEntries += readLast;
439464
for (auto i = 0; i < readLast; ++i) {
440465
offsets[count++] = (int)offset;
@@ -492,6 +517,7 @@ arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
492517
auto batch = arrow::RecordBatch::Make(dataset_schema, rows, columns);
493518
totalCompressedSize += tree->GetZipBytes();
494519
totalUncompressedSize += tree->GetTotBytes();
520+
O2_SIGNPOST_END(root_arrow_fs, tid, "Generator", "Done creating batch compressed:%zu uncompressed:%zu", totalCompressedSize, totalUncompressedSize);
495521
return batch;
496522
};
497523
return generator;

Framework/Core/include/Framework/RootArrowFilesystem.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ class TFileFileSystem : public VirtualRootFileSystemBase
144144

145145
TFileFileSystem(TDirectoryFile* f, size_t readahead, RootObjectReadingFactory&);
146146

147+
~TFileFileSystem() override;
148+
147149
std::string type_name() const override
148150
{
149151
return "TDirectoryFile";

Framework/Core/src/RootArrowFilesystem.cxx

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ TFileFileSystem::TFileFileSystem(TDirectoryFile* f, size_t readahead, RootObject
4242
((TFile*)mFile)->SetReadaheadSize(50 * 1024 * 1024);
4343
}
4444

45+
TFileFileSystem::~TFileFileSystem()
46+
{
47+
mFile->Close();
48+
delete mFile;
49+
}
50+
4551
std::shared_ptr<RootObjectHandler> TFileFileSystem::GetObjectHandler(arrow::dataset::FileSource source)
4652
{
4753
// We use a plugin to create the actual objects inside the

Framework/TestWorkflows/src/o2TestHistograms.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ struct EtaAndClsHistogramsSimple {
4040
OutputObj<TH2F> etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)};
4141
Produces<o2::aod::SkimmedExampleTrack> skimEx;
4242

43-
void process(aod::Tracks const& tracks)
43+
void process(aod::Tracks const& tracks, aod::FT0s const&)
4444
{
4545
LOGP(info, "Invoking the simple one");
4646
for (auto& track : tracks) {
@@ -54,7 +54,7 @@ struct EtaAndClsHistogramsIUSimple {
5454
OutputObj<TH2F> etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)};
5555
Produces<o2::aod::SkimmedExampleTrack> skimEx;
5656

57-
void process(aod::TracksIU const& tracks)
57+
void process(aod::TracksIU const& tracks, aod::FT0s const&)
5858
{
5959
LOGP(info, "Invoking the simple one");
6060
for (auto& track : tracks) {

0 commit comments

Comments
 (0)