Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
// or submit itself to any jurisdiction.

#include "AODJAlienReaderHelpers.h"
#include <memory>
#include "Framework/TableTreeHelpers.h"
#include "Framework/AnalysisHelpers.h"
#include "Framework/DataProcessingStats.h"
#include "Framework/RootTableBuilderHelpers.h"
#include "Framework/RootArrowFilesystem.h"
#include "Framework/AlgorithmSpec.h"
#include "Framework/ConfigParamRegistry.h"
#include "Framework/ControlService.h"
Expand Down Expand Up @@ -41,6 +43,8 @@
#include <arrow/io/interfaces.h>
#include <arrow/table.h>
#include <arrow/util/key_value_metadata.h>
#include <arrow/dataset/dataset.h>
#include <arrow/dataset/file_base.h>

using namespace o2;
using namespace o2::aod;
Expand Down Expand Up @@ -272,11 +276,13 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
// Origin file name for derived output map
auto o2 = Output(TFFileNameHeader);
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
std::string currentFilename(fileAndFolder.file->GetName());
if (strcmp(fileAndFolder.file->GetEndpointUrl()->GetProtocol(), "file") == 0 && fileAndFolder.file->GetEndpointUrl()->GetFile()[0] != '/') {
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(fileAndFolder.filesystem());
auto* f = dynamic_cast<TFile*>(rootFS->GetFile());
std::string currentFilename(f->GetFile()->GetName());
if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') {
// This is not an absolute local path. Make it absolute.
static std::string pwd = gSystem->pwd() + std::string("/");
currentFilename = pwd + std::string(fileAndFolder.file->GetName());
currentFilename = pwd + std::string(f->GetName());
}
outputs.make<std::string>(o2) = currentFilename;
}
Expand Down Expand Up @@ -312,7 +318,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher);
auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
if (!fileAndFolder.file) {

// In case the filesource is empty, move to the next one.
if (fileAndFolder.path().empty()) {
fcnt += 1;
ntf = 0;
if (didir->atEnd(fcnt)) {
Expand Down
193 changes: 129 additions & 64 deletions Framework/AnalysisSupport/src/DataInputDirector.cxx

Large diffs are not rendered by default.

20 changes: 9 additions & 11 deletions Framework/AnalysisSupport/src/DataInputDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

#include "Framework/DataDescriptorMatcher.h"
#include "Framework/DataAllocator.h"
#include "Framework/RootArrowFilesystem.h"

#include <arrow/filesystem/filesystem.h>
#include <arrow/dataset/dataset.h>

#include <regex>
#include "rapidjson/fwd.h"
Expand All @@ -31,16 +35,10 @@ struct FileNameHolder {
std::string fileName;
int numberOfTimeFrames = 0;
std::vector<uint64_t> listOfTimeFrameNumbers;
std::vector<std::string> listOfTimeFrameKeys;
std::vector<bool> alreadyRead;
};
FileNameHolder* makeFileNameHolder(std::string fileName);

struct FileAndFolder {
TFile* file = nullptr;
std::string folderName = "";
};

class DataInputDescriptor
{
/// Holds information concerning the reading of an aod table.
Expand All @@ -52,7 +50,6 @@ class DataInputDescriptor
std::string treename = "";
std::unique_ptr<data_matcher::DataDescriptorMatcher> matcher;

DataInputDescriptor() = default;
DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring = nullptr, int allowedParentLevel = 0, std::string parentFileReplacement = "");

void printOut();
Expand All @@ -78,7 +75,7 @@ class DataInputDescriptor
int findDFNumber(int file, std::string dfName);

uint64_t getTimeFrameNumber(int counter, int numTF);
FileAndFolder getFileFolder(int counter, int numTF);
arrow::dataset::FileSource getFileFolder(int counter, int numTF);
DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename);
int getTimeFramesInFile(int counter);
int getReadTimeFramesInFile(int counter);
Expand All @@ -90,6 +87,7 @@ class DataInputDescriptor
bool isAlienSupportOn() { return mAlienSupport; }

private:
o2::framework::RootObjectReadingFactory mFactory;
std::string minputfilesFile = "";
std::string* minputfilesFilePtr = nullptr;
std::string mFilenameRegex = "";
Expand All @@ -98,7 +96,7 @@ class DataInputDescriptor
std::string mParentFileReplacement;
std::vector<FileNameHolder*> mfilenames;
std::vector<FileNameHolder*>* mdefaultFilenamesPtr = nullptr;
TFile* mcurrentFile = nullptr;
std::shared_ptr<arrow::fs::FileSystem> mCurrentFilesystem;
int mCurrentFileID = -1;
bool mAlienSupport = false;

Expand Down Expand Up @@ -127,7 +125,6 @@ class DataInputDirector
~DataInputDirector();

void reset();
void createDefaultDataInputDescriptor();
void printOut();
bool atEnd(int counter);

Expand All @@ -140,10 +137,11 @@ class DataInputDirector
// getters
DataInputDescriptor* getDataInputDescriptor(header::DataHeader dh);
int getNumberInputDescriptors() { return mdataInputDescriptors.size(); }
void createDefaultDataInputDescriptor();

bool readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed);
uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF);
FileAndFolder getFileFolder(header::DataHeader dh, int counter, int numTF);
arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF);
int getTimeFramesInFile(header::DataHeader dh, int counter);

uint64_t getTotalSizeCompressed();
Expand Down
40 changes: 34 additions & 6 deletions Framework/AnalysisSupport/src/Plugin.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,48 @@ std::vector<std::string> getListOfTables(std::unique_ptr<TFile>& f)
{
std::vector<std::string> r;
TList* keyList = f->GetListOfKeys();
// We should handle two cases, one where the list of tables in a TDirectory,
// the other one where the dataframe number is just a prefix
std::string first = "";

for (auto key : *keyList) {
if (!std::string_view(key->GetName()).starts_with("DF_")) {
if (!std::string_view(key->GetName()).starts_with("DF_") && !std::string_view(key->GetName()).starts_with("/DF_")) {
continue;
}
auto* d = (TDirectory*)f->Get(key->GetName());
TList* branchList = d->GetListOfKeys();
for (auto b : *branchList) {
r.emplace_back(b->GetName());
auto* d = (TDirectory*)f->GetObjectChecked(key->GetName(), TClass::GetClass("TDirectory"));
// Objects are in a folder, list it.
if (d) {
TList* branchList = d->GetListOfKeys();
for (auto b : *branchList) {
r.emplace_back(b->GetName());
}
break;
}

void* v = f->GetObjectChecked(key->GetName(), TClass::GetClass("ROOT::Experimental::RNTuple"));
if (v) {
std::string s = key->GetName();
size_t pos = s.find('-');
// Check if '-' is found
// Skip metaData and parentFiles
if (pos == std::string::npos) {
continue;
}
std::string t = s.substr(pos + 1);
// If we find a duplicate table name, it means we are in the next DF and we can stop.
if (t == first) {
break;
}
if (first.empty()) {
first = t;
}
// Create a new string starting after the '-'
r.emplace_back(t);
}
break;
}
return r;
}

auto readMetadata(std::unique_ptr<TFile>& currentFile) -> std::vector<ConfigParamSpec>
{
// Get the metadata, if any
Expand Down
4 changes: 4 additions & 0 deletions Framework/AnalysisSupport/src/TTreePlugin.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,10 @@ arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
mappings.push_back({physicalFieldIdx, physicalFieldIdx - 1, fi});
opsCount += 2;
} else {
if (physicalFieldIdx > 1) {
O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s previous field is %{public}s.", dataset_field->name().c_str(),
physical_schema->field(physicalFieldIdx - 1)->name().c_str());
}
mappings.push_back({physicalFieldIdx, -1, fi});
opsCount++;
}
Expand Down
16 changes: 14 additions & 2 deletions Framework/Core/include/Framework/TableTreeHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#ifndef O2_FRAMEWORK_TABLETREEHELPERS_H_
#define O2_FRAMEWORK_TABLETREEHELPERS_H_

#include <arrow/buffer.h>
#include <arrow/io/interfaces.h>
#include <arrow/record_batch.h>
#include "TFile.h"
#include "TTreeReader.h"
Expand Down Expand Up @@ -146,15 +148,25 @@ class TreeToTable
class FragmentToBatch
{
public:
FragmentToBatch(arrow::MemoryPool* pool = arrow::default_memory_pool());
// The function to be used to create the required stream.
using StreamerCreator = std::function<std::shared_ptr<arrow::io::OutputStream>(std::shared_ptr<arrow::dataset::FileFragment>, const std::shared_ptr<arrow::ResizableBuffer>& buffer)>;

FragmentToBatch(StreamerCreator, std::shared_ptr<arrow::dataset::FileFragment>, arrow::MemoryPool* pool = arrow::default_memory_pool());
void setLabel(const char* label);
void fill(std::shared_ptr<arrow::dataset::FileFragment>, std::shared_ptr<arrow::Schema> dataSetSchema, std::shared_ptr<arrow::dataset::FileFormat>);
void fill(std::shared_ptr<arrow::Schema> dataSetSchema, std::shared_ptr<arrow::dataset::FileFormat>);
std::shared_ptr<arrow::RecordBatch> finalize();

std::shared_ptr<arrow::io::OutputStream> streamer(std::shared_ptr<arrow::ResizableBuffer> buffer)
{
return mCreator(mFragment, buffer);
}

private:
std::shared_ptr<arrow::dataset::FileFragment> mFragment;
arrow::MemoryPool* mArrowMemoryPool = nullptr;
std::string mTableLabel;
std::shared_ptr<arrow::RecordBatch> mRecordBatch;
StreamerCreator mCreator;
};

// -----------------------------------------------------------------------------
Expand Down
57 changes: 24 additions & 33 deletions Framework/Core/src/DataAllocator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -211,34 +211,6 @@ void doWriteTable(std::shared_ptr<FairMQResizableBuffer> b, arrow::Table* table)
}
}

void doWriteBatch(std::shared_ptr<FairMQResizableBuffer> b, arrow::RecordBatch* batch)
{
auto mock = std::make_shared<arrow::io::MockOutputStream>();
int64_t expectedSize = 0;
auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), batch->schema());
arrow::Status outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch);

expectedSize = mock->Tell().ValueOrDie();
auto reserve = b->Reserve(expectedSize);
if (reserve.ok() == false) {
throw std::runtime_error("Unable to reserve memory for table");
}

auto stream = std::make_shared<FairMQOutputStream>(b);
// This is a copy maybe we can finally get rid of it by having using the
// dataset API?
auto outBatch = arrow::ipc::MakeStreamWriter(stream.get(), batch->schema());
if (outBatch.ok() == false) {
throw ::std::runtime_error("Unable to create batch writer");
}

outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch);

if (outStatus.ok() == false) {
throw std::runtime_error("Unable to Write batch");
}
}

void DataAllocator::adopt(const Output& spec, LifetimeHolder<TableBuilder>& tb)
{
auto& timingInfo = mRegistry.get<TimingInfo>();
Expand Down Expand Up @@ -318,16 +290,35 @@ void DataAllocator::adopt(const Output& spec, LifetimeHolder<FragmentToBatch>& f
// Serialization happens in here, so that we can
// get rid of the intermediate tree 2 table object, saving memory.
auto batch = source.finalize();
doWriteBatch(buffer, batch.get());
auto mock = std::make_shared<arrow::io::MockOutputStream>();
int64_t expectedSize = 0;
auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), batch->schema());
arrow::Status outStatus = mockWriter.ValueOrDie()->WriteRecordBatch(*batch);

expectedSize = mock->Tell().ValueOrDie();
auto reserve = buffer->Reserve(expectedSize);
if (reserve.ok() == false) {
throw std::runtime_error("Unable to reserve memory for table");
}

auto deferredWriterStream = source.streamer(buffer);

auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream, batch->schema());
if (outBatch.ok() == false) {
throw ::std::runtime_error("Unable to create batch writer");
}

outStatus = outBatch.ValueOrDie()->WriteRecordBatch(*batch);

if (outStatus.ok() == false) {
throw std::runtime_error("Unable to Write batch");
}
// deletion happens in the caller
};

/// To finalise this we write the table to the buffer.
/// FIXME: most likely not a great idea. We should probably write to the buffer
/// directly in the TableBuilder, incrementally.
auto finalizer = [](std::shared_ptr<FairMQResizableBuffer> b) -> void {
// This is empty because we already serialised the object when
// the LifetimeHolder goes out of scope.
// the LifetimeHolder goes out of scope. See code above.
};

context.addBuffer(std::move(header), buffer, std::move(finalizer), routeIndex);
Expand Down
14 changes: 8 additions & 6 deletions Framework/Core/src/TableTreeHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "Framework/Endian.h"
#include "Framework/Signpost.h"

#include "arrow/type_traits.h"
#include <arrow/dataset/file_base.h>
#include <arrow/record_batch.h>
#include <arrow/type.h>
Expand Down Expand Up @@ -533,7 +532,7 @@ void TreeToTable::setLabel(const char* label)
mTableLabel = label;
}

void TreeToTable::fill(TTree*tree)
void TreeToTable::fill(TTree* tree)
{
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
std::vector<std::shared_ptr<arrow::Field>> fields;
Expand Down Expand Up @@ -569,8 +568,10 @@ std::shared_ptr<arrow::Table> TreeToTable::finalize()
return mTable;
}

FragmentToBatch::FragmentToBatch(arrow::MemoryPool* pool)
: mArrowMemoryPool{pool}
FragmentToBatch::FragmentToBatch(StreamerCreator creator, std::shared_ptr<arrow::dataset::FileFragment> fragment, arrow::MemoryPool* pool)
: mFragment{std::move(fragment)},
mArrowMemoryPool{pool},
mCreator{std::move(creator)}
{
}

Expand All @@ -579,13 +580,14 @@ void FragmentToBatch::setLabel(const char* label)
mTableLabel = label;
}

void FragmentToBatch::fill(std::shared_ptr<arrow::dataset::FileFragment> fragment, std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileFormat> format)
void FragmentToBatch::fill(std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileFormat> format)
{
auto options = std::make_shared<arrow::dataset::ScanOptions>();
options->dataset_schema = schema;
auto scanner = format->ScanBatchesAsync(options, fragment);
auto scanner = format->ScanBatchesAsync(options, mFragment);
auto batch = (*scanner)();
mRecordBatch = *batch.result();
// Notice that up to here the buffer was not yet filled.
}

std::shared_ptr<arrow::RecordBatch> FragmentToBatch::finalize()
Expand Down
4 changes: 2 additions & 2 deletions Framework/TestWorkflows/src/o2TestHistograms.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ struct EtaAndClsHistogramsSimple {
OutputObj<TH2F> etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)};
Produces<o2::aod::SkimmedExampleTrack> skimEx;

void process(aod::Tracks const& tracks)
void process(aod::Tracks const& tracks, aod::FT0s const&)
{
LOGP(info, "Invoking the simple one");
for (auto& track : tracks) {
Expand All @@ -54,7 +54,7 @@ struct EtaAndClsHistogramsIUSimple {
OutputObj<TH2F> etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)};
Produces<o2::aod::SkimmedExampleTrack> skimEx;

void process(aod::TracksIU const& tracks)
void process(aod::TracksIU const& tracks, aod::FT0s const&)
{
LOGP(info, "Invoking the simple one");
for (auto& track : tracks) {
Expand Down