Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Framework/Core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ add_executable(o2-test-framework-core
test/test_Variants.cxx
test/test_WorkflowHelpers.cxx
test/test_WorkflowSerialization.cxx
test/test_TreeToTable.cxx
test/test_DataOutputDirector.cxx
test/unittest_SimpleOptionsRetriever.cxx
test/unittest_DataSpecUtils.cxx
Expand Down Expand Up @@ -348,7 +347,6 @@ foreach(b
EventMixing
HistogramRegistry
TableToTree
TreeToTable
ExternalFairMQDeviceProxies
)
o2_add_executable(benchmark-${b}
Expand Down
14 changes: 0 additions & 14 deletions Framework/Core/include/Framework/DataAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,6 @@ class DataAllocator
return tb;
}

template <typename T, typename... Args>
requires(requires { static_cast<struct TreeToTable>(std::declval<std::decay_t<T>>()); })
decltype(auto) make(const Output& spec, Args... args)
{
auto t2t = std::move(LifetimeHolder<TreeToTable>(new std::decay_t<T>(args...)));
adopt(spec, t2t);
return t2t;
}

template <typename T, typename... Args>
requires(requires { static_cast<struct FragmentToBatch>(std::declval<std::decay_t<T>>()); })
decltype(auto) make(const Output& spec, Args... args)
Expand Down Expand Up @@ -288,11 +279,6 @@ class DataAllocator
void
adopt(const Output& spec, LifetimeHolder<struct TableBuilder>&);

/// Adopt a Tree2Table in the framework and serialise / send
/// it as an Arrow table to all consumers of @a spec once done
void
adopt(const Output& spec, LifetimeHolder<struct TreeToTable>&);

/// Adopt a Source2Batch in the framework and serialise / send
/// it as an Arrow Dataset to all consumers of @a spec once done
void
Expand Down
54 changes: 0 additions & 54 deletions Framework/Core/include/Framework/TableTreeHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,6 @@ namespace o2::framework
// OR t2t.addBranch(column.get(), field.get()), ...;
// . t2t.process();
//
// .............................................................................
// -----------------------------------------------------------------------------
// TreeToTable allows to fill the contents of a given TTree to an arrow::Table
// ColumnIterator is used by TreeToTable
//
// To copy the contents of a tree tr to a table ta do:
// . TreeToTable t2t(tr);
// . t2t.addColumn(columnname1); t2t.addColumn(columnname2); ...
// OR
// t2t.addAllColumns();
// . auto ta = t2t.process();
//
// .............................................................................
struct ROOTTypeInfo {
EDataType type;
char suffix[3];
Expand All @@ -58,29 +45,6 @@ struct ROOTTypeInfo {
auto arrowTypeFromROOT(EDataType type, int size);
auto basicROOTTypeFromArrow(arrow::Type::type id);

class BranchToColumn
{
public:
BranchToColumn(TBranch* branch, bool VLA, std::string name, EDataType type, int listSize, arrow::MemoryPool* pool);
// BranchToColumn(TBranch* branch, TBranch* sizeBranch, std::string name, EDataType type, arrow::MemoryPool* pool);
~BranchToColumn() = default;
TBranch* branch();

std::pair<std::shared_ptr<arrow::ChunkedArray>, std::shared_ptr<arrow::Field>> read(TBuffer* buffer);

private:
TBranch* mBranch = nullptr;
bool mVLA = false;
std::string mColumnName;
EDataType mType;
std::shared_ptr<arrow::DataType> mArrowType;
arrow::ArrayBuilder* mValueBuilder = nullptr;
std::unique_ptr<arrow::ArrayBuilder> mListBuilder = nullptr;
int mListSize = 1;
std::unique_ptr<arrow::ArrayBuilder> mBuilder = nullptr;
arrow::MemoryPool* mPool = nullptr;
};

class ColumnToBranch
{
public:
Expand Down Expand Up @@ -127,24 +91,6 @@ class TableToTree
std::vector<std::unique_ptr<ColumnToBranch>> mColumnReaders;
};

class TreeToTable
{
public:
TreeToTable(arrow::MemoryPool* pool = arrow::default_memory_pool());
void setLabel(const char* label);
void addAllColumns(TTree* tree, std::vector<std::string>&& names = {});
void fill(TTree*);
std::shared_ptr<arrow::Table> finalize();

private:
arrow::MemoryPool* mArrowMemoryPool;
std::vector<std::unique_ptr<BranchToColumn>> mBranchReaders;
std::string mTableLabel;
std::shared_ptr<arrow::Table> mTable;

void addReader(TBranch* branch, std::string const& name, bool VLA);
};

class FragmentToBatch
{
public:
Expand Down
32 changes: 0 additions & 32 deletions Framework/Core/src/DataAllocator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -241,38 +241,6 @@ void DataAllocator::adopt(const Output& spec, LifetimeHolder<TableBuilder>& tb)
context.addBuffer(std::move(header), buffer, std::move(finalizer), routeIndex);
}

void DataAllocator::adopt(const Output& spec, LifetimeHolder<TreeToTable>& t2t)
{
auto& timingInfo = mRegistry.get<TimingInfo>();
RouteIndex routeIndex = matchDataHeader(spec, timingInfo.timeslice);

auto header = headerMessageFromOutput(spec, routeIndex, o2::header::gSerializationMethodArrow, 0);
auto& context = mRegistry.get<ArrowContext>();

auto creator = [transport = context.proxy().getOutputTransport(routeIndex)](size_t s) -> std::unique_ptr<fair::mq::Message> {
return transport->CreateMessage(s);
};
auto buffer = std::make_shared<FairMQResizableBuffer>(creator);

t2t.callback = [buffer = buffer, transport = context.proxy().getOutputTransport(routeIndex)](TreeToTable& tree) {
// Serialization happens in here, so that we can
// get rid of the intermediate tree 2 table object, saving memory.
auto table = tree.finalize();
doWriteTable(buffer, table.get());
// deletion happens in the caller
};

/// To finalise this we write the table to the buffer.
/// FIXME: most likely not a great idea. We should probably write to the buffer
/// directly in the TableBuilder, incrementally.
auto finalizer = [](std::shared_ptr<FairMQResizableBuffer> b) -> void {
// This is empty because we already serialised the object when
// the LifetimeHolder goes out of scope.
};

context.addBuffer(std::move(header), buffer, std::move(finalizer), routeIndex);
}

void DataAllocator::adopt(const Output& spec, LifetimeHolder<FragmentToBatch>& f2b)
{
auto& timingInfo = mRegistry.get<TimingInfo>();
Expand Down
Loading