Skip to content

Commit 5d76679

Browse files
committed
DPL: add support for decompressing directly to shared memory
This PR postpones the read operations which would usually populate an intermediate RecordBatch and it performs them directly on its subsequent shared memory serialization. Doing so avoids having the intermediate representation allocate most of the memory. For the moment this is only done for TTree. RNtuple support will come in a subsequent PR.
1 parent 8c306c3 commit 5d76679

File tree

4 files changed

+50
-44
lines changed

4 files changed

+50
-44
lines changed

Framework/AnalysisSupport/src/DataInputDirector.cxx

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,7 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh
399399
}
400400
// FIXME: Ugly. We should detect the format from the treename, good enough for now.
401401
std::shared_ptr<arrow::dataset::FileFormat> format;
402+
FragmentToBatch::StreamerCreator creator = nullptr;
402403

403404
auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()};
404405

@@ -407,6 +408,7 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh
407408
void* handle = capability.getHandle(rootFS, objectPath);
408409
if (handle) {
409410
format = capability.factory().format();
411+
creator = capability.factory().deferredOutputStreamer;
410412
break;
411413
}
412414
}
@@ -449,13 +451,12 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh
449451

450452
// FIXME: This should allow me to create a memory pool
451453
// which I can then use to scan the dataset.
452-
//
453-
auto f2b = outputs.make<FragmentToBatch>(o);
454+
auto f2b = outputs.make<FragmentToBatch>(o, creator, *fragment);
454455

455456
//// add branches to read
456457
//// fill the table
457458
f2b->setLabel(treename.c_str());
458-
f2b->fill(*fragment, datasetSchema, format);
459+
f2b->fill(datasetSchema, format);
459460

460461
mIOTime += (uv_hrtime() - ioStart);
461462

Framework/Core/include/Framework/TableTreeHelpers.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
#ifndef O2_FRAMEWORK_TABLETREEHELPERS_H_
1212
#define O2_FRAMEWORK_TABLETREEHELPERS_H_
1313

14+
#include <arrow/buffer.h>
15+
#include <arrow/io/interfaces.h>
1416
#include <arrow/record_batch.h>
1517
#include "TFile.h"
1618
#include "TTreeReader.h"
@@ -146,15 +148,25 @@ class TreeToTable
146148
class FragmentToBatch
147149
{
148150
public:
149-
FragmentToBatch(arrow::MemoryPool* pool = arrow::default_memory_pool());
151+
// The function to be used to create the required stream.
152+
using StreamerCreator = std::function<std::shared_ptr<arrow::io::OutputStream>(std::shared_ptr<arrow::dataset::FileFragment>, const std::shared_ptr<arrow::ResizableBuffer>& buffer)>;
153+
154+
FragmentToBatch(StreamerCreator, std::shared_ptr<arrow::dataset::FileFragment>, arrow::MemoryPool* pool = arrow::default_memory_pool());
150155
void setLabel(const char* label);
151-
void fill(std::shared_ptr<arrow::dataset::FileFragment>, std::shared_ptr<arrow::Schema> dataSetSchema, std::shared_ptr<arrow::dataset::FileFormat>);
156+
void fill(std::shared_ptr<arrow::Schema> dataSetSchema, std::shared_ptr<arrow::dataset::FileFormat>);
152157
std::shared_ptr<arrow::RecordBatch> finalize();
153158

159+
std::shared_ptr<arrow::io::OutputStream> streamer(std::shared_ptr<arrow::ResizableBuffer> buffer)
160+
{
161+
return mCreator(mFragment, buffer);
162+
}
163+
154164
private:
165+
std::shared_ptr<arrow::dataset::FileFragment> mFragment;
155166
arrow::MemoryPool* mArrowMemoryPool = nullptr;
156167
std::string mTableLabel;
157168
std::shared_ptr<arrow::RecordBatch> mRecordBatch;
169+
StreamerCreator mCreator;
158170
};
159171

160172
// -----------------------------------------------------------------------------

Framework/Core/src/DataAllocator.cxx

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -211,34 +211,6 @@ void doWriteTable(std::shared_ptr<FairMQResizableBuffer> b, arrow::Table* table)
211211
}
212212
}
213213

214-
void doWriteBatch(std::shared_ptr<FairMQResizableBuffer> b, arrow::RecordBatch* batch)
215-
{
216-
auto mock = std::make_shared<arrow::io::MockOutputStream>();
217-
int64_t expectedSize = 0;
218-
auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), batch->schema());
219-
arrow::Status outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch);
220-
221-
expectedSize = mock->Tell().ValueOrDie();
222-
auto reserve = b->Reserve(expectedSize);
223-
if (reserve.ok() == false) {
224-
throw std::runtime_error("Unable to reserve memory for table");
225-
}
226-
227-
auto stream = std::make_shared<FairMQOutputStream>(b);
228-
// This is a copy maybe we can finally get rid of it by having using the
229-
// dataset API?
230-
auto outBatch = arrow::ipc::MakeStreamWriter(stream.get(), batch->schema());
231-
if (outBatch.ok() == false) {
232-
throw ::std::runtime_error("Unable to create batch writer");
233-
}
234-
235-
outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch);
236-
237-
if (outStatus.ok() == false) {
238-
throw std::runtime_error("Unable to Write batch");
239-
}
240-
}
241-
242214
void DataAllocator::adopt(const Output& spec, LifetimeHolder<TableBuilder>& tb)
243215
{
244216
auto& timingInfo = mRegistry.get<TimingInfo>();
@@ -318,16 +290,35 @@ void DataAllocator::adopt(const Output& spec, LifetimeHolder<FragmentToBatch>& f
318290
// Serialization happens in here, so that we can
319291
// get rid of the intermediate tree 2 table object, saving memory.
320292
auto batch = source.finalize();
321-
doWriteBatch(buffer, batch.get());
293+
auto mock = std::make_shared<arrow::io::MockOutputStream>();
294+
int64_t expectedSize = 0;
295+
auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), batch->schema());
296+
arrow::Status outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch);
297+
298+
expectedSize = mock->Tell().ValueOrDie();
299+
auto reserve = buffer->Reserve(expectedSize);
300+
if (reserve.ok() == false) {
301+
throw std::runtime_error("Unable to reserve memory for table");
302+
}
303+
304+
auto deferredWriterStream = source.streamer(buffer);
305+
306+
auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream, batch->schema());
307+
if (outBatch.ok() == false) {
308+
throw ::std::runtime_error("Unable to create batch writer");
309+
}
310+
311+
outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch);
312+
313+
if (outStatus.ok() == false) {
314+
throw std::runtime_error("Unable to Write batch");
315+
}
322316
// deletion happens in the caller
323317
};
324318

325-
/// To finalise this we write the table to the buffer.
326-
/// FIXME: most likely not a great idea. We should probably write to the buffer
327-
/// directly in the TableBuilder, incrementally.
328319
auto finalizer = [](std::shared_ptr<FairMQResizableBuffer> b) -> void {
329320
// This is empty because we already serialised the object when
330-
// the LifetimeHolder goes out of scope.
321+
// the LifetimeHolder goes out of scope. See code above.
331322
};
332323

333324
context.addBuffer(std::move(header), buffer, std::move(finalizer), routeIndex);

Framework/Core/src/TableTreeHelpers.cxx

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
#include "Framework/Endian.h"
1414
#include "Framework/Signpost.h"
1515

16-
#include "arrow/type_traits.h"
1716
#include <arrow/dataset/file_base.h>
1817
#include <arrow/record_batch.h>
1918
#include <arrow/type.h>
@@ -533,7 +532,7 @@ void TreeToTable::setLabel(const char* label)
533532
mTableLabel = label;
534533
}
535534

536-
void TreeToTable::fill(TTree*tree)
535+
void TreeToTable::fill(TTree* tree)
537536
{
538537
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
539538
std::vector<std::shared_ptr<arrow::Field>> fields;
@@ -569,8 +568,10 @@ std::shared_ptr<arrow::Table> TreeToTable::finalize()
569568
return mTable;
570569
}
571570

572-
FragmentToBatch::FragmentToBatch(arrow::MemoryPool* pool)
573-
: mArrowMemoryPool{pool}
571+
FragmentToBatch::FragmentToBatch(StreamerCreator creator, std::shared_ptr<arrow::dataset::FileFragment> fragment, arrow::MemoryPool* pool)
572+
: mFragment{std::move(fragment)},
573+
mArrowMemoryPool{pool},
574+
mCreator{std::move(creator)}
574575
{
575576
}
576577

@@ -579,13 +580,14 @@ void FragmentToBatch::setLabel(const char* label)
579580
mTableLabel = label;
580581
}
581582

582-
void FragmentToBatch::fill(std::shared_ptr<arrow::dataset::FileFragment> fragment, std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileFormat> format)
583+
void FragmentToBatch::fill(std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileFormat> format)
583584
{
584585
auto options = std::make_shared<arrow::dataset::ScanOptions>();
585586
options->dataset_schema = schema;
586-
auto scanner = format->ScanBatchesAsync(options, fragment);
587+
auto scanner = format->ScanBatchesAsync(options, mFragment);
587588
auto batch = (*scanner)();
588589
mRecordBatch = *batch.result();
590+
// Notice that up to here the buffer was not yet filled.
589591
}
590592

591593
std::shared_ptr<arrow::RecordBatch> FragmentToBatch::finalize()

0 commit comments

Comments
 (0)