Skip to content

Commit 6cfdeae

Browse files
authored
DPL: improve arrow::Dataset integration (#13725)
- Modularise filesystem to allow easier navigation and support for multiple formats. - Add initial support to multiplex multiple tables on top of the same tree. - Improve support for writing boolean fields.
1 parent 567d25a commit 6cfdeae

File tree

3 files changed

+250
-85
lines changed

3 files changed

+250
-85
lines changed

Framework/Core/include/Framework/RootArrowFilesystem.h

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
#include <arrow/type_fwd.h>
1818
#include <memory>
1919

20+
class TFile;
21+
class TBranch;
2022
class TTree;
2123
class TBufferFile;
2224
class TDirectoryFile;
@@ -227,11 +229,38 @@ class TTreeFileFormat : public arrow::dataset::FileFormat
227229
const std::shared_ptr<arrow::dataset::FileFragment>& fragment) const override;
228230
};
229231

230-
// An arrow outputstream which allows to write to a ttree
232+
// An arrow outputstream which allows to write to a TDirectoryFile.
233+
// This will point to the location of the file itself. You can
234+
// specify the location of the actual object inside it by passing the
235+
// associated path to the Write() API.
236+
class TDirectoryFileOutputStream : public arrow::io::OutputStream
237+
{
238+
public:
239+
TDirectoryFileOutputStream(TDirectoryFile*);
240+
241+
arrow::Status Close() override;
242+
243+
arrow::Result<int64_t> Tell() const override;
244+
245+
arrow::Status Write(const void* data, int64_t nbytes) override;
246+
247+
bool closed() const override;
248+
249+
TDirectoryFile* GetDirectory()
250+
{
251+
return mDirectory;
252+
}
253+
254+
private:
255+
TDirectoryFile* mDirectory;
256+
};
257+
258+
// An arrow outputstream which allows to write to a TTree. Eventually
259+
// with a prefix for the branches.
231260
class TTreeOutputStream : public arrow::io::OutputStream
232261
{
233262
public:
234-
TTreeOutputStream(TTree* t);
263+
TTreeOutputStream(TTree*, std::string branchPrefix);
235264

236265
arrow::Status Close() override;
237266

@@ -241,13 +270,16 @@ class TTreeOutputStream : public arrow::io::OutputStream
241270

242271
bool closed() const override;
243272

273+
TBranch* CreateBranch(char const* branchName, char const* sizeBranch);
274+
244275
TTree* GetTree()
245276
{
246277
return mTree;
247278
}
248279

249280
private:
250281
TTree* mTree;
282+
std::string mBranchPrefix;
251283
};
252284

253285
} // namespace o2::framework

Framework/Core/src/RootArrowFilesystem.cxx

Lines changed: 114 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
#include <arrow/array/builder_nested.h>
1818
#include <arrow/array/builder_primitive.h>
1919
#include <memory>
20-
#include <stdexcept>
2120
#include <TFile.h>
2221
#include <TLeaf.h>
2322
#include <TBufferFile.h>
@@ -28,8 +27,11 @@
2827
#include <arrow/dataset/file_base.h>
2928
#include <arrow/result.h>
3029
#include <arrow/status.h>
30+
#include <arrow/util/key_value_metadata.h>
3131
#include <fmt/format.h>
3232

33+
#include <stdexcept>
34+
#include <utility>
3335

3436
O2_DECLARE_DYNAMIC_LOG(root_arrow_fs);
3537

@@ -100,7 +102,6 @@ std::shared_ptr<VirtualRootFileSystemBase> TFileFileSystem::GetSubFilesystem(arr
100102
return std::shared_ptr<VirtualRootFileSystemBase>(new SingleTreeFileSystem(tree));
101103
}
102104

103-
104105
auto directory = (TDirectoryFile*)mFile->GetObjectChecked(source.path().c_str(), TClass::GetClass<TDirectory>());
105106
if (directory) {
106107
return std::shared_ptr<VirtualRootFileSystemBase>(new TFileFileSystem(directory, 50 * 1024 * 1024));
@@ -129,8 +130,15 @@ arrow::Result<std::shared_ptr<arrow::io::OutputStream>> TFileFileSystem::OpenOut
129130
const std::string& path,
130131
const std::shared_ptr<const arrow::KeyValueMetadata>& metadata)
131132
{
132-
auto* t = new TTree(path.c_str(), "should put a name here");
133-
auto stream = std::make_shared<TTreeOutputStream>(t);
133+
if (path == "/") {
134+
return std::make_shared<TDirectoryFileOutputStream>(this->GetFile());
135+
}
136+
137+
auto* dir = dynamic_cast<TDirectoryFile*>(this->GetFile()->Get(path.c_str()));
138+
if (!dir) {
139+
throw runtime_error_f("Unable to open directory %s in file %s", path.c_str(), GetFile()->GetName());
140+
}
141+
auto stream = std::make_shared<TDirectoryFileOutputStream>(dir);
134142
return stream;
135143
}
136144

@@ -286,13 +294,46 @@ arrow::Result<std::shared_ptr<arrow::dataset::FileFragment>> TTreeFileFormat::Ma
286294
}
287295

288296
// An arrow outputstream which allows to write to a ttree
289-
TTreeOutputStream::TTreeOutputStream(TTree* t)
290-
: mTree(t)
297+
TDirectoryFileOutputStream::TDirectoryFileOutputStream(TDirectoryFile* f)
298+
: mDirectory(f)
299+
{
300+
}
301+
302+
arrow::Status TDirectoryFileOutputStream::Close()
303+
{
304+
mDirectory->GetFile()->Close();
305+
return arrow::Status::OK();
306+
}
307+
308+
arrow::Result<int64_t> TDirectoryFileOutputStream::Tell() const
309+
{
310+
return arrow::Result<int64_t>(arrow::Status::NotImplemented("Cannot move"));
311+
}
312+
313+
arrow::Status TDirectoryFileOutputStream::Write(const void* data, int64_t nbytes)
314+
{
315+
return arrow::Status::NotImplemented("Cannot write raw bytes to a TTree");
316+
}
317+
318+
bool TDirectoryFileOutputStream::closed() const
319+
{
320+
return mDirectory->GetFile()->IsOpen() == false;
321+
}
322+
323+
// An arrow outputstream which allows to write to a ttree
324+
// @a branch prefix is to be used to identify a set of branches which all belong to
325+
// the same table.
326+
TTreeOutputStream::TTreeOutputStream(TTree* f, std::string branchPrefix)
327+
: mTree(f),
328+
mBranchPrefix(std::move(branchPrefix))
291329
{
292330
}
293331

294332
arrow::Status TTreeOutputStream::Close()
295333
{
334+
if (mTree->GetCurrentFile() == nullptr) {
335+
return arrow::Status::Invalid("Cannot close a tree not attached to a file");
336+
}
296337
mTree->GetCurrentFile()->Close();
297338
return arrow::Status::OK();
298339
}
@@ -309,9 +350,18 @@ arrow::Status TTreeOutputStream::Write(const void* data, int64_t nbytes)
309350

310351
bool TTreeOutputStream::closed() const
311352
{
353+
// A standalone tree is never closed.
354+
if (mTree->GetCurrentFile() == nullptr) {
355+
return false;
356+
}
312357
return mTree->GetCurrentFile()->IsOpen() == false;
313358
}
314359

360+
TBranch* TTreeOutputStream::CreateBranch(char const* branchName, char const* sizeBranch)
361+
{
362+
return mTree->Branch((mBranchPrefix + "/" + branchName).c_str(), (char*)nullptr, (mBranchPrefix + sizeBranch).c_str());
363+
}
364+
315365
char const* rootSuffixFromArrow(arrow::Type::type id)
316366
{
317367
switch (id) {
@@ -411,8 +461,24 @@ class TTreeFileWriter : public arrow::dataset::FileWriter
411461
: FileWriter(schema, options, destination, destination_locator)
412462
{
413463
// Batches have the same number of entries for each column.
464+
auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
414465
auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
415-
TTree* tree = treeStream->GetTree();
466+
467+
if (directoryStream.get()) {
468+
TDirectoryFile* dir = directoryStream->GetDirectory();
469+
dir->cd();
470+
auto* tree = new TTree(destination_locator_.path.c_str(), "");
471+
treeStream = std::make_shared<TTreeOutputStream>(tree, "");
472+
} else if (treeStream.get()) {
473+
// We already have a tree stream, let's derive a new one
474+
// with the destination_locator_.path as prefix for the branches
475+
// This way we can multiplex multiple tables in the same tree.
476+
auto tree = treeStream->GetTree();
477+
treeStream = std::make_shared<TTreeOutputStream>(tree, destination_locator_.path);
478+
} else {
479+
// I could simply set a prefix here to merge to an already existing tree.
480+
throw std::runtime_error("Unsupported backend.");
481+
}
416482

417483
for (auto i = 0u; i < schema->fields().size(); ++i) {
418484
auto& field = schema->field(i);
@@ -427,23 +493,23 @@ class TTreeFileWriter : public arrow::dataset::FileWriter
427493
valueTypes.push_back(field->type()->field(0)->type());
428494
sizesBranches.push_back(nullptr);
429495
std::string leafList = fmt::format("{}[{}]{}", field->name(), listSizes.back(), rootSuffixFromArrow(valueTypes.back()->id()));
430-
branches.push_back(tree->Branch(field->name().c_str(), (char*)nullptr, leafList.c_str()));
496+
branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
431497
} break;
432498
case arrow::Type::LIST: {
433499
valueTypes.push_back(field->type()->field(0)->type());
434500
listSizes.back() = 0; // VLA, we need to calculate it on the fly;
435501
std::string leafList = fmt::format("{}[{}_size]{}", field->name(), field->name(), rootSuffixFromArrow(valueTypes.back()->id()));
436502
std::string sizeLeafList = field->name() + "_size/I";
437-
sizesBranches.push_back(tree->Branch((field->name() + "_size").c_str(), (char*)nullptr, sizeLeafList.c_str()));
438-
branches.push_back(tree->Branch(field->name().c_str(), (char*)nullptr, leafList.c_str()));
503+
sizesBranches.push_back(treeStream->CreateBranch((field->name() + "_size").c_str(), sizeLeafList.c_str()));
504+
branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
439505
// Notice that this could be replaced by a better guess of the
440506
// average size of the list elements, but this is not trivial.
441507
} break;
442508
default: {
443509
valueTypes.push_back(field->type());
444510
std::string leafList = field->name() + rootSuffixFromArrow(valueTypes.back()->id());
445511
sizesBranches.push_back(nullptr);
446-
branches.push_back(tree->Branch(field->name().c_str(), (char*)nullptr, leafList.c_str()));
512+
branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
447513
} break;
448514
}
449515
}
@@ -463,11 +529,18 @@ class TTreeFileWriter : public arrow::dataset::FileWriter
463529
}
464530

465531
// Batches have the same number of entries for each column.
532+
auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
533+
TTree* tree = nullptr;
534+
if (directoryStream.get()) {
535+
TDirectoryFile* dir = directoryStream->GetDirectory();
536+
tree = (TTree*)dir->Get(destination_locator_.path.c_str());
537+
}
466538
auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
467-
TTree* tree = treeStream->GetTree();
468539

469-
// Caches for the vectors of bools.
470-
std::vector<std::shared_ptr<arrow::UInt8Array>> caches;
540+
if (!tree) {
541+
// I could simply set a prefix here to merge to an already existing tree.
542+
throw std::runtime_error("Unsupported backend.");
543+
}
471544

472545
for (auto i = 0u; i < batch->columns().size(); ++i) {
473546
auto column = batch->column(i);
@@ -484,24 +557,11 @@ class TTreeFileWriter : public arrow::dataset::FileWriter
484557
auto list = std::static_pointer_cast<arrow::ListArray>(column);
485558
valueArrays.back() = list;
486559
} break;
487-
default:
488-
valueArrays.back() = column;
489-
}
490-
}
491-
492-
int64_t pos = 0;
493-
while (pos < batch->num_rows()) {
494-
for (size_t bi = 0; bi < branches.size(); ++bi) {
495-
auto* branch = branches[bi];
496-
auto* sizeBranch = sizesBranches[bi];
497-
auto array = batch->column(bi);
498-
auto& field = batch->schema()->field(bi);
499-
auto& listSize = listSizes[bi];
500-
auto valueType = valueTypes[bi];
501-
auto valueArray = valueArrays[bi];
560+
case arrow::Type::BOOL: {
561+
// In case of arrays of booleans, we need to go back to their
562+
// char based representation for ROOT to save them.
563+
auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(column);
502564

503-
if (field->type()->id() == arrow::Type::BOOL) {
504-
auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(array);
505565
int64_t length = boolArray->length();
506566
arrow::UInt8Builder builder;
507567
auto ok = builder.Reserve(length);
@@ -516,11 +576,24 @@ class TTreeFileWriter : public arrow::dataset::FileWriter
516576
auto ok = builder.AppendNull();
517577
}
518578
}
579+
valueArrays.back() = *builder.Finish();
580+
} break;
581+
default:
582+
valueArrays.back() = column;
583+
}
584+
}
585+
586+
int64_t pos = 0;
587+
while (pos < batch->num_rows()) {
588+
for (size_t bi = 0; bi < branches.size(); ++bi) {
589+
auto* branch = branches[bi];
590+
auto* sizeBranch = sizesBranches[bi];
591+
auto array = batch->column(bi);
592+
auto& field = batch->schema()->field(bi);
593+
auto& listSize = listSizes[bi];
594+
auto valueType = valueTypes[bi];
595+
auto valueArray = valueArrays[bi];
519596

520-
ok = builder.Finish(&caches[bi]);
521-
branch->SetAddress((void*)(caches[bi]->values()->data()));
522-
continue;
523-
}
524597
switch (field->type()->id()) {
525598
case arrow::Type::LIST: {
526599
auto list = std::static_pointer_cast<arrow::ListArray>(array);
@@ -764,13 +837,16 @@ arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
764837
return generator;
765838
}
766839

767-
768840
arrow::Result<std::shared_ptr<arrow::io::OutputStream>> TTreeFileSystem::OpenOutputStream(
769841
const std::string& path,
770842
const std::shared_ptr<const arrow::KeyValueMetadata>& metadata)
771843
{
772-
auto stream = std::make_shared<TTreeOutputStream>(GetTree({path, shared_from_this()}));
773-
return stream;
844+
arrow::dataset::FileSource source{path, shared_from_this()};
845+
auto prefix = metadata->Get("branch_prefix");
846+
if (prefix.ok()) {
847+
return std::make_shared<TTreeOutputStream>(GetTree(source), *prefix);
848+
}
849+
return std::make_shared<TTreeOutputStream>(GetTree(source), "");
774850
}
775851

776852
TBufferFileFS::TBufferFileFS(TBufferFile* f)
@@ -782,7 +858,6 @@ TBufferFileFS::TBufferFileFS(TBufferFile* f)
782858

783859
TTreeFileSystem::~TTreeFileSystem() = default;
784860

785-
786861
arrow::Result<arrow::fs::FileInfo> TBufferFileFS::GetFileInfo(const std::string& path)
787862
{
788863
arrow::fs::FileInfo result;

0 commit comments

Comments
 (0)