Skip to content

Commit f44b2d2

Browse files
committed
DPL: add support for decompressing directly to shared memory
This PR postpones the read operations which would usually populate an intermediate RecordBatch and it performs them directly on its subsequent shared memory serialization. Doing so avoids having the intermediate representation allocate most of the memory. For the moment this is only done for TTree. RNtuple support will come in a subsequent PR.
1 parent 7efa08e commit f44b2d2

File tree

8 files changed

+557
-219
lines changed

8 files changed

+557
-219
lines changed

Framework/AnalysisSupport/src/DataInputDirector.cxx

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,7 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh
399399
}
400400
// FIXME: Ugly. We should detect the format from the treename, good enough for now.
401401
std::shared_ptr<arrow::dataset::FileFormat> format;
402+
FragmentToBatch::StreamerCreator creator = nullptr;
402403

403404
auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()};
404405

@@ -407,6 +408,7 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh
407408
void* handle = capability.getHandle(rootFS, objectPath);
408409
if (handle) {
409410
format = capability.factory().format();
411+
creator = capability.factory().deferredOutputStreamer;
410412
break;
411413
}
412414
}
@@ -449,13 +451,12 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh
449451

450452
// FIXME: This should allow me to create a memory pool
451453
// which I can then use to scan the dataset.
452-
//
453-
auto f2b = outputs.make<FragmentToBatch>(o);
454+
auto f2b = outputs.make<FragmentToBatch>(o, creator, *fragment);
454455

455456
//// add branches to read
456457
//// fill the table
457458
f2b->setLabel(treename.c_str());
458-
f2b->fill(*fragment, datasetSchema, format);
459+
f2b->fill(datasetSchema, format);
459460

460461
mIOTime += (uv_hrtime() - ioStart);
461462

Framework/AnalysisSupport/src/RNTuplePlugin.cxx

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "Framework/RuntimeError.h"
1313
#include "Framework/RootArrowFilesystem.h"
1414
#include "Framework/Plugins.h"
15+
#include "Framework/FairMQResizableBuffer.h"
1516
#include <ROOT/RNTupleModel.hxx>
1617
#include <ROOT/RNTupleWriteOptions.hxx>
1718
#include <ROOT/RNTupleWriter.hxx>
@@ -852,7 +853,10 @@ struct RNTupleObjectReadingImplementation : public RootArrowFactoryPlugin {
852853
return new RootArrowFactory{
853854
.options = [context]() { return context->format->DefaultWriteOptions(); },
854855
.format = [context]() { return context->format; },
855-
};
856+
.deferredOutputStreamer = [](std::shared_ptr<arrow::dataset::FileFragment> fragment, const std::shared_ptr<arrow::ResizableBuffer>& buffer) -> std::shared_ptr<arrow::io::OutputStream> {
857+
auto treeFragment = std::dynamic_pointer_cast<RNTupleFileFragment>(fragment);
858+
return std::make_shared<FairMQOutputStream>(buffer);
859+
}};
856860
}
857861
};
858862

0 commit comments

Comments
 (0)