@@ -211,6 +211,34 @@ void doWriteTable(std::shared_ptr<FairMQResizableBuffer> b, arrow::Table* table)
211211 }
212212}
213213
214+ void doWriteBatch (std::shared_ptr<FairMQResizableBuffer> b, arrow::RecordBatch* batch)
215+ {
216+ auto mock = std::make_shared<arrow::io::MockOutputStream>();
217+ int64_t expectedSize = 0 ;
218+ auto mockWriter = arrow::ipc::MakeStreamWriter (mock.get (), batch->schema ());
219+ arrow::Status outStatus = mockWriter.ValueOrDie ()->WriteRecordBatch (*batch);
220+
221+ expectedSize = mock->Tell ().ValueOrDie ();
222+ auto reserve = b->Reserve (expectedSize);
223+ if (reserve.ok () == false ) {
224+ throw std::runtime_error (" Unable to reserve memory for table" );
225+ }
226+
227+ auto stream = std::make_shared<FairMQOutputStream>(b);
228+ // This is a copy maybe we can finally get rid of it by having using the
229+ // dataset API?
230+ auto outBatch = arrow::ipc::MakeStreamWriter (stream.get (), batch->schema ());
231+ if (outBatch.ok () == false ) {
232+ throw ::std::runtime_error (" Unable to create batch writer" );
233+ }
234+
235+ outStatus = outBatch.ValueOrDie ()->WriteRecordBatch (*batch);
236+
237+ if (outStatus.ok () == false ) {
238+ throw std::runtime_error (" Unable to Write batch" );
239+ }
240+ }
241+
214242void DataAllocator::adopt (const Output& spec, LifetimeHolder<TableBuilder>& tb)
215243{
216244 auto & timingInfo = mRegistry .get <TimingInfo>();
@@ -273,6 +301,38 @@ void DataAllocator::adopt(const Output& spec, LifetimeHolder<TreeToTable>& t2t)
273301 context.addBuffer (std::move (header), buffer, std::move (finalizer), routeIndex);
274302}
275303
304+ void DataAllocator::adopt (const Output& spec, LifetimeHolder<FragmentToBatch>& f2b)
305+ {
306+ auto & timingInfo = mRegistry .get <TimingInfo>();
307+ RouteIndex routeIndex = matchDataHeader (spec, timingInfo.timeslice );
308+
309+ auto header = headerMessageFromOutput (spec, routeIndex, o2::header::gSerializationMethodArrow , 0 );
310+ auto & context = mRegistry .get <ArrowContext>();
311+
312+ auto creator = [transport = context.proxy ().getOutputTransport (routeIndex)](size_t s) -> std::unique_ptr<fair::mq::Message> {
313+ return transport->CreateMessage (s);
314+ };
315+ auto buffer = std::make_shared<FairMQResizableBuffer>(creator);
316+
317+ f2b.callback = [buffer = buffer, transport = context.proxy ().getOutputTransport (routeIndex)](FragmentToBatch& source) {
318+ // Serialization happens in here, so that we can
319+ // get rid of the intermediate tree 2 table object, saving memory.
320+ auto batch = source.finalize ();
321+ doWriteBatch (buffer, batch.get ());
322+ // deletion happens in the caller
323+ };
324+
325+ // / To finalise this we write the table to the buffer.
326+ // / FIXME: most likely not a great idea. We should probably write to the buffer
327+ // / directly in the TableBuilder, incrementally.
328+ auto finalizer = [](std::shared_ptr<FairMQResizableBuffer> b) -> void {
329+ // This is empty because we already serialised the object when
330+ // the LifetimeHolder goes out of scope.
331+ };
332+
333+ context.addBuffer (std::move (header), buffer, std::move (finalizer), routeIndex);
334+ }
335+
276336void DataAllocator::adopt (const Output& spec, std::shared_ptr<arrow::Table> ptr)
277337{
278338 auto & timingInfo = mRegistry .get <TimingInfo>();
0 commit comments