Skip to content

Commit 4d20c8d

Browse files
authored
DPL Analysis: improve/fix join error by not relying on metadata (#14249)
1 parent 88baea9 commit 4d20c8d

File tree

6 files changed

+33
-18
lines changed

6 files changed

+33
-18
lines changed

Framework/Core/include/Framework/ASoA.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1245,7 +1245,7 @@ struct TableIterator : IP, C... {
12451245
};
12461246

12471247
struct ArrowHelpers {
1248-
static std::shared_ptr<arrow::Table> joinTables(std::vector<std::shared_ptr<arrow::Table>>&& tables);
1248+
static std::shared_ptr<arrow::Table> joinTables(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::span<const char* const> labels);
12491249
static std::shared_ptr<arrow::Table> concatTables(std::vector<std::shared_ptr<arrow::Table>>&& tables);
12501250
};
12511251

@@ -1683,6 +1683,7 @@ class Table
16831683
using table_t = self_t;
16841684

16851685
static constexpr const auto originals = computeOriginals<ref, Ts...>();
1686+
static constexpr const auto originalLabels = []<size_t N, std::array<TableRef, N> refs, size_t... Is>(std::index_sequence<Is...>) { return std::array<const char*, N>{o2::aod::label<refs[Is]>()...}; }.template operator()<originals.size(), originals>(std::make_index_sequence<originals.size()>());
16861687

16871688
template <size_t N, std::array<TableRef, N> bindings>
16881689
requires(ref.origin_hash == "CONC"_h)
@@ -1931,7 +1932,7 @@ class Table
19311932

19321933
Table(std::vector<std::shared_ptr<arrow::Table>>&& tables, uint64_t offset = 0)
19331934
requires(ref.origin_hash != "CONC"_h)
1934-
: Table(ArrowHelpers::joinTables(std::move(tables)), offset)
1935+
: Table(ArrowHelpers::joinTables(std::move(tables), std::span{originalLabels}), offset)
19351936
{
19361937
}
19371938

@@ -3213,7 +3214,7 @@ struct JoinFull : Table<o2::aod::Hash<"JOIN"_h>, D, o2::aod::Hash<"JOIN"_h>, Ts.
32133214
bindInternalIndicesTo(this);
32143215
}
32153216
JoinFull(std::vector<std::shared_ptr<arrow::Table>>&& tables, uint64_t offset = 0)
3216-
: base{ArrowHelpers::joinTables(std::move(tables)), offset}
3217+
: base{ArrowHelpers::joinTables(std::move(tables), std::span{base::originalLabels}), offset}
32173218
{
32183219
bindInternalIndicesTo(this);
32193220
}
@@ -3223,6 +3224,7 @@ struct JoinFull : Table<o2::aod::Hash<"JOIN"_h>, D, o2::aod::Hash<"JOIN"_h>, Ts.
32233224
using self_t = JoinFull<D, Ts...>;
32243225
using table_t = base;
32253226
static constexpr const auto originals = base::originals;
3227+
static constexpr const auto originalLabels = base::originalLabels;
32263228
using columns_t = typename table_t::columns_t;
32273229
using persistent_columns_t = typename table_t::persistent_columns_t;
32283230
using iterator = table_t::template iterator_template<DefaultIndexPolicy, self_t, Ts...>;
@@ -3293,7 +3295,7 @@ using Join = JoinFull<o2::aod::Hash<"JOIN/0"_h>, Ts...>;
32933295
template <typename... Ts>
32943296
constexpr auto join(Ts const&... t)
32953297
{
3296-
return Join<Ts...>(ArrowHelpers::joinTables({t.asArrowTable()...}));
3298+
return Join<Ts...>(ArrowHelpers::joinTables({t.asArrowTable()...}, std::span{Join<Ts...>::base::originalLabels}));
32973299
}
32983300

32993301
template <typename T>

Framework/Core/include/Framework/AnalysisManagers.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,14 +282,14 @@ template <is_spawns T>
282282
bool prepareOutput(ProcessingContext& context, T& spawns)
283283
{
284284
using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
285-
auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context));
285+
auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
286286
if (originalTable->schema()->fields().empty() == true) {
287287
using base_table_t = typename T::base_table_t::table_t;
288288
originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
289289
}
290290

291291
spawns.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>>(originalTable, o2::aod::label<metadata::extension_table_t::ref>(), spawns.projector));
292-
spawns.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({spawns.extension->asArrowTable(), originalTable}));
292+
spawns.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({spawns.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
293293
return true;
294294
}
295295

@@ -304,14 +304,14 @@ template <is_defines T>
304304
bool prepareOutput(ProcessingContext& context, T& defines)
305305
{
306306
using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
307-
auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context));
307+
auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
308308
if (originalTable->schema()->fields().empty() == true) {
309309
using base_table_t = typename T::base_table_t::table_t;
310310
originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
311311
}
312312

313313
defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>>(originalTable, o2::aod::label<metadata::extension_table_t::ref>(), defines.projectors.data(), defines.projector));
314-
defines.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({defines.extension->asArrowTable(), originalTable}));
314+
defines.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({defines.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
315315
return true;
316316
}
317317

Framework/Core/include/Framework/AnalysisTask.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,9 @@ struct AnalysisDataProcessorBuilder {
201201
std::shared_ptr<arrow::Table> table = nullptr;
202202
auto joiner = [&record]<size_t N, std::array<soa::TableRef, N> refs, size_t... Is>(std::index_sequence<Is...>) { return std::vector{extractTableFromRecord<refs[Is]>(record)...}; };
203203
if constexpr (soa::is_iterator<T>) {
204-
table = o2::soa::ArrowHelpers::joinTables(joiner.template operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()));
204+
table = o2::soa::ArrowHelpers::joinTables(joiner.template operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()), std::span{T::parent_t::originalLabels});
205205
} else {
206-
table = o2::soa::ArrowHelpers::joinTables(joiner.template operator()<T::originals.size(), T::originals>(std::make_index_sequence<T::originals.size()>()));
206+
table = o2::soa::ArrowHelpers::joinTables(joiner.template operator()<T::originals.size(), T::originals>(std::make_index_sequence<T::originals.size()>()), std::span{T::originalLabels});
207207
}
208208
expressions::updateFilterInfo(info, table);
209209
if constexpr (!o2::soa::is_smallgroups<std::decay_t<T>>) {

Framework/Core/include/Framework/TableBuilder.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -771,7 +771,7 @@ template <aod::is_aod_hash D>
771771
auto spawner(std::vector<std::shared_ptr<arrow::Table>>&& tables, const char* name, o2::framework::expressions::Projector* projectors, std::shared_ptr<gandiva::Projector>& projector)
772772
{
773773
using placeholders_pack_t = typename o2::aod::MetadataTrait<D>::metadata::placeholders_pack_t;
774-
auto fullTable = soa::ArrowHelpers::joinTables(std::move(tables));
774+
auto fullTable = soa::ArrowHelpers::joinTables(std::move(tables), std::span{o2::aod::MetadataTrait<D>::metadata::base_table_t::originalLabels});
775775
if (fullTable->num_rows() == 0) {
776776
return makeEmptyTable(name, placeholders_pack_t{});
777777
}
@@ -798,7 +798,7 @@ template <aod::is_aod_hash D>
798798
auto spawner(std::vector<std::shared_ptr<arrow::Table>>&& tables, const char* name, std::shared_ptr<gandiva::Projector>& projector)
799799
{
800800
using expression_pack_t = typename o2::aod::MetadataTrait<D>::metadata::expression_pack_t;
801-
auto fullTable = soa::ArrowHelpers::joinTables(std::move(tables));
801+
auto fullTable = soa::ArrowHelpers::joinTables(std::move(tables), std::span{o2::aod::MetadataTrait<D>::metadata::base_table_t::originalLabels});
802802
if (fullTable->num_rows() == 0) {
803803
return makeEmptyTable(name, expression_pack_t{});
804804
}
@@ -834,7 +834,8 @@ auto spawner(std::shared_ptr<arrow::Table> const& fullTable, const char* name, s
834834
template <typename... C>
835835
auto spawner(framework::pack<C...> columns, std::vector<std::shared_ptr<arrow::Table>>&& tables, const char* name, std::shared_ptr<gandiva::Projector>& projector)
836836
{
837-
auto fullTable = soa::ArrowHelpers::joinTables(std::move(tables));
837+
std::array<const char*, 1> labels{"original"};
838+
auto fullTable = soa::ArrowHelpers::joinTables(std::move(tables), std::span<const char* const>{labels});
838839
if (fullTable->num_rows() == 0) {
839840
return makeEmptyTable(name, framework::pack<C...>{});
840841
}

Framework/Core/src/ASoA.cxx

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,15 @@ SelectionVector sliceSelection(gsl::span<int64_t const> const& mSelectedRows, in
6464
return slicedSelection;
6565
}
6666

67-
std::shared_ptr<arrow::Table> ArrowHelpers::joinTables(std::vector<std::shared_ptr<arrow::Table>>&& tables)
67+
std::shared_ptr<arrow::Table> ArrowHelpers::joinTables(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::span<const char* const> labels)
6868
{
6969
if (tables.size() == 1) {
7070
return tables[0];
7171
}
7272
for (auto i = 0U; i < tables.size() - 1; ++i) {
7373
if (tables[i]->num_rows() != tables[i + 1]->num_rows()) {
7474
throw o2::framework::runtime_error_f("Tables %s and %s have different sizes (%d vs %d) and cannot be joined!",
75-
tables[i]->schema()->metadata()->Get("label").ValueOrDie().c_str(),
76-
tables[i + 1]->schema()->metadata()->Get("label").ValueOrDie().c_str(),
77-
tables[i]->num_rows(),
78-
tables[i + 1]->num_rows());
75+
labels[i], labels[i + 1], tables[i]->num_rows(), tables[i + 1]->num_rows());
7976
}
8077
}
8178
std::vector<std::shared_ptr<arrow::Field>> fields;

Framework/Core/test/test_ASoA.cxx

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ namespace test
3131
DECLARE_SOA_COLUMN(X, x, int);
3232
DECLARE_SOA_COLUMN(Y, y, int);
3333
DECLARE_SOA_COLUMN(Z, z, int);
34+
DECLARE_SOA_COLUMN(W, w, int);
3435
DECLARE_SOA_DYNAMIC_COLUMN(Sum, sum, [](int x, int y) { return x + y; });
3536
DECLARE_SOA_EXPRESSION_COLUMN(ESum, esum, int, test::x + test::y);
3637
} // namespace test
@@ -268,9 +269,17 @@ TEST_CASE("TestJoinedTables")
268269
rowWriterZ(0, 8);
269270
auto tableZ = builderZ.finalize();
270271

272+
TableBuilder builderW;
273+
auto rowWriterW = builderW.persist<int32_t>({"fW"});
274+
rowWriterW(0, 8);
275+
rowWriterW(0, 8);
276+
rowWriterW(0, 8);
277+
auto tableW = builderW.finalize();
278+
271279
using TestX = InPlaceTable<"A0"_h, o2::aod::test::X>;
272280
using TestY = InPlaceTable<"A1"_h, o2::aod::test::Y>;
273281
using TestZ = InPlaceTable<"A2"_h, o2::aod::test::Z>;
282+
using TestW = InPlaceTable<"A3"_h, o2::aod::test::W>;
274283
using Test = Join<TestX, TestY>;
275284

276285
REQUIRE(Test::contains<TestX>());
@@ -303,6 +312,12 @@ TEST_CASE("TestJoinedTables")
303312
for (auto& test : tests4) {
304313
REQUIRE(15 == test.x() + test.y() + test.z());
305314
}
315+
316+
try {
317+
auto testF = join(TestZ{tableZ}, TestW{tableW});
318+
} catch (RuntimeErrorRef ref) {
319+
REQUIRE(std::string{error_from_ref(ref).what} == "Tables TEST and TEST have different sizes (8 vs 3) and cannot be joined!");
320+
}
306321
}
307322

308323
TEST_CASE("TestConcatTables")

0 commit comments

Comments
 (0)