Skip to content

Commit ff8ba81

Browse files
authored
DPL Analysis: move ownership of payloads to the fragment (#13931)
This makes sure the FileFragment is the entity which owns the TTree / RNtuple, so that its caching and memory management have the correct life-cycle and we do not end up with memory churn or having to reconfigure the caches.
1 parent 9dcdaae commit ff8ba81

File tree

6 files changed

+265
-131
lines changed

6 files changed

+265
-131
lines changed

Framework/AnalysisSupport/src/RNTuplePlugin.cxx

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <ROOT/RFieldVisitor.hxx>
2222
#include <ROOT/RNTupleInspector.hxx>
2323
#include <ROOT/RVec.hxx>
24+
#include <memory>
2425
#include <TBufferFile.h>
2526

2627
#include <TDirectory.h>
@@ -51,10 +52,6 @@ class RNTupleFileSystem : public VirtualRootFileSystemBase
5152
public:
5253
~RNTupleFileSystem() override;
5354

54-
std::shared_ptr<VirtualRootFileSystemBase> GetSubFilesystem(arrow::dataset::FileSource source) override
55-
{
56-
return std::dynamic_pointer_cast<VirtualRootFileSystemBase>(shared_from_this());
57-
};
5855
virtual ROOT::Experimental::RNTuple* GetRNTuple(arrow::dataset::FileSource source) = 0;
5956
};
6057

@@ -100,9 +97,28 @@ class RNTupleFileFragment : public arrow::dataset::FileFragment
10097
std::shared_ptr<arrow::dataset::FileFormat> format,
10198
arrow::compute::Expression partition_expression,
10299
std::shared_ptr<arrow::Schema> physical_schema)
103-
: FileFragment(std::move(source), std::move(format), std::move(partition_expression), std::move(physical_schema))
100+
: FileFragment(source, format, partition_expression, physical_schema)
104101
{
102+
auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
103+
if (!fs.get()) {
104+
throw runtime_error_f("Do not know how to extract %s from %s", source.path().c_str(), fs->type_name().c_str());
105+
}
106+
auto handler = fs->GetObjectHandler(source);
107+
if (!handler->format->Equals(*format)) {
108+
throw runtime_error_f("Format for %s does not match. Found %s, expected %s.", source.path().c_str(),
109+
handler->format->type_name().c_str(),
110+
format->type_name().c_str());
111+
}
112+
mNTuple = handler->GetObjectAsOwner<ROOT::Experimental::RNTuple>();
105113
}
114+
115+
ROOT::Experimental::RNTuple* GetRNTuple()
116+
{
117+
return mNTuple.get();
118+
}
119+
120+
private:
121+
std::unique_ptr<ROOT::Experimental::RNTuple> mNTuple;
106122
};
107123

108124
class RNTupleFileFormat : public arrow::dataset::FileFormat
@@ -133,11 +149,10 @@ class RNTupleFileFormat : public arrow::dataset::FileFormat
133149
arrow::Result<bool> IsSupported(const arrow::dataset::FileSource& source) const override
134150
{
135151
auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
136-
auto subFs = fs->GetSubFilesystem(source);
137-
if (std::dynamic_pointer_cast<RNTupleFileSystem>(subFs)) {
138-
return true;
152+
if (!fs) {
153+
return false;
139154
}
140-
return false;
155+
return fs->CheckSupport(source);
141156
}
142157

143158
arrow::Result<std::shared_ptr<arrow::Schema>> Inspect(const arrow::dataset::FileSource& source) const override;
@@ -493,11 +508,12 @@ arrow::Result<std::shared_ptr<arrow::Schema>> RNTupleFileFormat::Inspect(const a
493508

494509
auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
495510
// Actually get the TTree from the ROOT file.
496-
auto ntupleFs = std::dynamic_pointer_cast<RNTupleFileSystem>(fs->GetSubFilesystem(source));
497-
if (!ntupleFs.get()) {
498-
throw runtime_error_f("Unknown filesystem %s\n", source.filesystem()->type_name().c_str());
511+
auto objectHandler = fs->GetObjectHandler(source);
512+
if (objectHandler->format->type_name() != this->type_name()) {
513+
throw runtime_error_f("Unexpected kind of filesystem %s to handle payload %s.\n", source.filesystem()->type_name().c_str(), source.path().c_str());
499514
}
500-
ROOT::Experimental::RNTuple* rntuple = ntupleFs->GetRNTuple(source);
515+
// We know this is a RNTuple, so we can continue with the inspection.
516+
auto rntuple = objectHandler->GetObjectAsOwner<ROOT::Experimental::RNTuple>().release();
501517

502518
auto inspector = ROOT::Experimental::RNTupleInspector::Create(rntuple);
503519

@@ -526,11 +542,8 @@ arrow::Result<arrow::RecordBatchGenerator> RNTupleFileFormat::ScanBatchesAsync(
526542
std::vector<std::shared_ptr<arrow::Array>> columns;
527543
std::vector<std::shared_ptr<arrow::Field>> fields = dataset_schema->fields();
528544

529-
auto containerFS = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(ntupleFragment->source().filesystem());
530-
auto fs = std::dynamic_pointer_cast<RNTupleFileSystem>(containerFS->GetSubFilesystem(ntupleFragment->source()));
531-
532545
int64_t rows = -1;
533-
ROOT::Experimental::RNTuple* rntuple = fs->GetRNTuple(ntupleFragment->source());
546+
ROOT::Experimental::RNTuple* rntuple = ntupleFragment->GetRNTuple();
534547
auto reader = ROOT::Experimental::RNTupleReader::Open(rntuple);
535548
auto& model = reader->GetModel();
536549
for (auto& physicalField : fields) {
@@ -670,7 +683,7 @@ arrow::Result<arrow::RecordBatchGenerator> RNTupleFileFormat::ScanBatchesAsync(
670683
if (!result.ok()) {
671684
throw runtime_error("Cannot allocate offset buffer");
672685
}
673-
arrowOffsetBuffer = std::move(result).ValueUnsafe();
686+
arrowOffsetBuffer = result.MoveValueUnsafe();
674687

675688
// Offset bulk
676689
auto offsetBulk = model.CreateBulk(physicalField->name());
@@ -692,7 +705,7 @@ arrow::Result<arrow::RecordBatchGenerator> RNTupleFileFormat::ScanBatchesAsync(
692705
if (!result.ok()) {
693706
throw runtime_error("Cannot allocate values buffer");
694707
}
695-
arrowValuesBuffer = std::move(result).ValueUnsafe();
708+
arrowValuesBuffer = result.MoveValueUnsafe();
696709
ptr = (uint8_t*)(arrowValuesBuffer->mutable_data());
697710
// Calculate the size of the buffer here.
698711
for (size_t i = 0; i < total; i++) {
@@ -811,9 +824,9 @@ arrow::Result<std::shared_ptr<arrow::dataset::FileFragment>> RNTupleFileFormat::
811824
{
812825
std::shared_ptr<arrow::dataset::FileFormat> format = std::make_shared<RNTupleFileFormat>(mTotCompressedSize, mTotUncompressedSize);
813826

814-
auto fragment = std::make_shared<RNTupleFileFragment>(std::move(source), std::move(format),
815-
std::move(partition_expression),
816-
std::move(physical_schema));
827+
auto fragment = std::make_shared<RNTupleFileFragment>(source, format,
828+
partition_expression,
829+
physical_schema);
817830
return std::dynamic_pointer_cast<arrow::dataset::FileFragment>(fragment);
818831
}
819832

@@ -839,9 +852,6 @@ struct RNTupleObjectReadingImplementation : public RootArrowFactoryPlugin {
839852
return new RootArrowFactory{
840853
.options = [context]() { return context->format->DefaultWriteOptions(); },
841854
.format = [context]() { return context->format; },
842-
.getSubFilesystem = [](void* handle) {
843-
auto rntuple = (ROOT::Experimental::RNTuple*)handle;
844-
return std::shared_ptr<VirtualRootFileSystemBase>(new SingleRNTupleFileSystem(rntuple)); },
845855
};
846856
}
847857
};

0 commit comments

Comments
 (0)