1313#include " Framework/Plugins.h"
1414#include " Framework/Signpost.h"
1515#include " Framework/Endian.h"
16+ #include < arrow/buffer.h>
1617#include < arrow/dataset/file_base.h>
1718#include < arrow/extension_type.h>
1819#include < arrow/status.h>
@@ -286,6 +287,8 @@ arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
286287
287288 auto generator = [pool = options->pool , treeFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize ,
288289 &totalUncompressedSize = mTotUncompressedSize ]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
290+ O2_SIGNPOST_ID_FROM_POINTER (tid, root_arrow_fs, treeFragment->GetTree ());
291+ O2_SIGNPOST_START (root_arrow_fs, tid, " Generator" , " Creating batch for tree %{public}s" , treeFragment->GetTree ()->GetName ());
289292 std::vector<std::shared_ptr<arrow::Array>> columns;
290293 std::vector<std::shared_ptr<arrow::Field>> fields = dataset_schema->fields ();
291294 auto physical_schema = *treeFragment->ReadPhysicalSchema ();
@@ -299,27 +302,48 @@ arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
299302
300303 for (int fi = 0 ; fi < dataset_schema->num_fields (); ++fi) {
301304 auto dataset_field = dataset_schema->field (fi);
305+ // This is needed because for now the dataset_field
306+ // is actually the schema of the ttree
307+ O2_SIGNPOST_EVENT_EMIT (root_arrow_fs, tid, " Generator" , " Processing dataset field %{public}s." , dataset_field->name ().c_str ());
302308 int physicalFieldIdx = physical_schema->GetFieldIndex (dataset_field->name ());
303309
304310 if (physicalFieldIdx < 0 ) {
305311 throw runtime_error_f (" Cannot find physical field associated to %s" , dataset_field->name ().c_str ());
306312 }
307313 if (physicalFieldIdx > 1 && physical_schema->field (physicalFieldIdx - 1 )->name ().ends_with (" _size" )) {
314+ O2_SIGNPOST_EVENT_EMIT (root_arrow_fs, tid, " Generator" , " Field %{public}s has sizes in %{public}s." , dataset_field->name ().c_str (),
315+ physical_schema->field (physicalFieldIdx - 1 )->name ().c_str ());
308316 mappings.push_back ({physicalFieldIdx, physicalFieldIdx - 1 , fi});
309317 } else {
318+ if (physicalFieldIdx > 1 ) {
319+ O2_SIGNPOST_EVENT_EMIT (root_arrow_fs, tid, " Generator" , " Field %{public}s previous field is %{public}s." , dataset_field->name ().c_str (),
320+ physical_schema->field (physicalFieldIdx - 1 )->name ().c_str ());
321+ }
310322 mappings.push_back ({physicalFieldIdx, -1 , fi});
311323 }
312324 }
313325
314326 auto * tree = treeFragment->GetTree ();
315- tree->SetCacheSize (25000000 );
316327 auto branches = tree->GetListOfBranches ();
328+ size_t totalTreeSize = 0 ;
329+ std::vector<TBranch*> selectedBranches;
317330 for (auto & mapping : mappings) {
318- tree->AddBranchToCache ((TBranch*)branches->At (mapping.mainBranchIdx ), false );
331+ selectedBranches.push_back ((TBranch*)branches->At (mapping.mainBranchIdx ));
332+ O2_SIGNPOST_EVENT_EMIT (root_arrow_fs, tid, " Generator" , " Adding branch %{public}s to stream." , selectedBranches.back ()->GetName ());
333+ totalTreeSize += selectedBranches.back ()->GetTotalSize ();
319334 if (mapping.vlaIdx != -1 ) {
320- tree->AddBranchToCache ((TBranch*)branches->At (mapping.vlaIdx ), false );
335+ selectedBranches.push_back ((TBranch*)branches->At (mapping.vlaIdx ));
336+ O2_SIGNPOST_EVENT_EMIT (root_arrow_fs, tid, " Generator" , " Adding branch %{public}s to stream." , selectedBranches.back ()->GetName ());
337+ totalTreeSize += selectedBranches.back ()->GetTotalSize ();
321338 }
322339 }
340+
341+ size_t cacheSize = std::max (std::min (totalTreeSize, 25000000UL ), 1000000UL );
342+ O2_SIGNPOST_EVENT_EMIT (root_arrow_fs, tid, " Generator" , " Resizing cache to %zu." , cacheSize);
343+ tree->SetCacheSize (cacheSize);
344+ for (auto * branch : selectedBranches) {
345+ tree->AddBranchToCache (branch, false );
346+ }
323347 tree->StopCacheLearningPhase ();
324348
325349 static TBufferFile buffer{TBuffer::EMode::kWrite , 4 * 1024 * 1024 };
@@ -400,9 +424,7 @@ arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
400424 }
401425 } else {
402426 // This is needed for branches which have not been persisted.
403- auto bytes = branch->GetTotBytes ();
404- auto branchSize = bytes ? bytes : 1000000 ;
405- auto && result = arrow::AllocateResizableBuffer (branchSize, pool);
427+ auto && result = arrow::AllocateBuffer (branch->GetTotalSize (), pool);
406428 if (!result.ok ()) {
407429 throw runtime_error (" Cannot allocate values buffer" );
408430 }
@@ -423,7 +445,7 @@ arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
423445 if (mapping.vlaIdx != -1 ) {
424446 auto * mSizeBranch = (TBranch*)branches->At (mapping.vlaIdx );
425447 offsetBuffer = std::make_unique<TBufferFile>(TBuffer::EMode::kWrite , 4 * 1024 * 1024 );
426- result = arrow::AllocateResizableBuffer ((totalEntries + 1 ) * (int64_t )sizeof (int ), pool);
448+ result = arrow::AllocateBuffer ((totalEntries + 1 ) * (int64_t )sizeof (int ), pool);
427449 if (!result.ok ()) {
428450 throw runtime_error (" Cannot allocate offset buffer" );
429451 }
@@ -435,6 +457,9 @@ arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
435457 // read sizes first
436458 while (readEntries < totalEntries) {
437459 auto readLast = mSizeBranch ->GetBulkRead ().GetEntriesSerialized (readEntries, *offsetBuffer);
460+ if (readLast == -1 ) {
461+ throw runtime_error_f (" Unable to read from branch %s." , mSizeBranch ->GetName ());
462+ }
438463 readEntries += readLast;
439464 for (auto i = 0 ; i < readLast; ++i) {
440465 offsets[count++] = (int )offset;
@@ -492,6 +517,7 @@ arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
492517 auto batch = arrow::RecordBatch::Make (dataset_schema, rows, columns);
493518 totalCompressedSize += tree->GetZipBytes ();
494519 totalUncompressedSize += tree->GetTotBytes ();
520+ O2_SIGNPOST_END (root_arrow_fs, tid, " Generator" , " Done creating batch compressed:%zu uncompressed:%zu" , totalCompressedSize, totalUncompressedSize);
495521 return batch;
496522 };
497523 return generator;
0 commit comments