Skip to content

Commit d53781c

Browse files
committed
DPL Analysis: improve grouping performance further
1 parent fb4df11 commit d53781c

File tree

3 files changed

+85
-67
lines changed

3 files changed

+85
-67
lines changed

Framework/Core/include/Framework/ASoA.h

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,15 +1065,19 @@ struct TableIterator : IP, C... {
10651065
: IP{policy},
10661066
C(columnData[framework::has_type_at_v<C>(all_columns{})])...
10671067
{
1068-
bind();
1068+
if (this->size() != 0) {
1069+
bind();
1070+
}
10691071
}
10701072

10711073
TableIterator(arrow::ChunkedArray* columnData[sizeof...(C)], IP&& policy)
10721074
requires(has_index<C...>)
10731075
: IP{policy},
10741076
C(columnData[framework::has_type_at_v<C>(all_columns{})])...
10751077
{
1076-
bind();
1078+
if (this->size() != 0) {
1079+
bind();
1080+
}
10771081
// In case we have an index column might need to constrain the actual
10781082
// number of rows in the view to the range provided by the index.
10791083
// FIXME: we should really understand what happens to an index when we
@@ -1086,14 +1090,18 @@ struct TableIterator : IP, C... {
10861090
: IP{static_cast<IP const&>(other)},
10871091
C(static_cast<C const&>(other))...
10881092
{
1089-
bind();
1093+
if (this->size() != 0) {
1094+
bind();
1095+
}
10901096
}
10911097

10921098
TableIterator& operator=(TableIterator other)
10931099
{
10941100
IP::operator=(static_cast<IP const&>(other));
10951101
(void(static_cast<C&>(*this) = static_cast<C>(other)), ...);
1096-
bind();
1102+
if (this->size() != 0) {
1103+
bind();
1104+
}
10971105
return *this;
10981106
}
10991107

@@ -1102,7 +1110,9 @@ struct TableIterator : IP, C... {
11021110
: IP{static_cast<IP const&>(other)},
11031111
C(static_cast<C const&>(other))...
11041112
{
1105-
bind();
1113+
if (this->size() != 0) {
1114+
bind();
1115+
}
11061116
}
11071117

11081118
TableIterator& operator++()
@@ -1543,18 +1553,22 @@ auto doSliceBy(T const* table, o2::framework::PresliceBase<C, Policy, OPT> const
15431553
uint64_t offset = 0;
15441554
auto out = container.getSliceFor(value, table->asArrowTable(), offset);
15451555
auto t = typename T::self_t({out}, offset);
1546-
table->copyIndexBindings(t);
1547-
t.bindInternalIndicesTo(table);
1556+
if (t.tableSize() != 0) {
1557+
table->copyIndexBindings(t);
1558+
t.bindInternalIndicesTo(table);
1559+
}
15481560
return t;
15491561
}
15501562

15511563
template <soa::is_filtered_table T>
15521564
auto doSliceByHelper(T const* table, gsl::span<const int64_t> const& selection)
15531565
{
15541566
auto t = soa::Filtered<typename T::base_t>({table->asArrowTable()}, selection);
1555-
table->copyIndexBindings(t);
1556-
t.bindInternalIndicesTo(table);
1557-
t.intersectWithSelection(table->getSelectedRows()); // intersect filters
1567+
if (t.tableSize() != 0) {
1568+
table->copyIndexBindings(t);
1569+
t.bindInternalIndicesTo(table);
1570+
t.intersectWithSelection(table->getSelectedRows()); // intersect filters
1571+
}
15581572
return t;
15591573
}
15601574

@@ -1563,8 +1577,10 @@ template <soa::is_table T>
15631577
auto doSliceByHelper(T const* table, gsl::span<const int64_t> const& selection)
15641578
{
15651579
auto t = soa::Filtered<T>({table->asArrowTable()}, selection);
1566-
table->copyIndexBindings(t);
1567-
t.bindInternalIndicesTo(table);
1580+
if (t.tableSize() != 0) {
1581+
table->copyIndexBindings(t);
1582+
t.bindInternalIndicesTo(table);
1583+
}
15681584
return t;
15691585
}
15701586

@@ -1588,12 +1604,16 @@ auto prepareFilteredSlice(T const* table, std::shared_ptr<arrow::Table> slice, u
15881604
{
15891605
if (offset >= static_cast<uint64_t>(table->tableSize())) {
15901606
Filtered<typename T::base_t> fresult{{{slice}}, SelectionVector{}, 0};
1591-
table->copyIndexBindings(fresult);
1607+
if (fresult.tableSize() != 0) {
1608+
table->copyIndexBindings(fresult);
1609+
}
15921610
return fresult;
15931611
}
15941612
auto slicedSelection = sliceSelection(table->getSelectedRows(), slice->num_rows(), offset);
15951613
Filtered<typename T::base_t> fresult{{{slice}}, std::move(slicedSelection), offset};
1596-
table->copyIndexBindings(fresult);
1614+
if (fresult.tableSize() != 0) {
1615+
table->copyIndexBindings(fresult);
1616+
}
15971617
return fresult;
15981618
}
15991619

@@ -1617,7 +1637,9 @@ auto doSliceByCached(T const* table, framework::expressions::BindingNode const&
16171637
auto localCache = cache.ptr->getCacheFor({o2::soa::getLabelFromTypeForKey<T>(node.name), node.name});
16181638
auto [offset, count] = localCache.getSliceFor(value);
16191639
auto t = typename T::self_t({table->asArrowTable()->Slice(static_cast<uint64_t>(offset), count)}, static_cast<uint64_t>(offset));
1620-
table->copyIndexBindings(t);
1640+
if (t.tableSize() != 0) {
1641+
table->copyIndexBindings(t);
1642+
}
16211643
return t;
16221644
}
16231645

@@ -1636,12 +1658,16 @@ auto doSliceByCachedUnsorted(T const* table, framework::expressions::BindingNode
16361658
auto localCache = cache.ptr->getCacheUnsortedFor({o2::soa::getLabelFromTypeForKey<T>(node.name), node.name});
16371659
if constexpr (soa::is_filtered_table<T>) {
16381660
auto t = typename T::self_t({table->asArrowTable()}, localCache.getSliceFor(value));
1639-
t.intersectWithSelection(table->getSelectedRows());
1640-
table->copyIndexBindings(t);
1661+
if (t.tableSize() != 0) {
1662+
t.intersectWithSelection(table->getSelectedRows());
1663+
table->copyIndexBindings(t);
1664+
}
16411665
return t;
16421666
} else {
16431667
auto t = Filtered<T>({table->asArrowTable()}, localCache.getSliceFor(value));
1644-
table->copyIndexBindings(t);
1668+
if (t.tableSize() != 0) {
1669+
table->copyIndexBindings(t);
1670+
}
16451671
return t;
16461672
}
16471673
}
@@ -3209,12 +3235,16 @@ struct JoinFull : Table<o2::aod::Hash<"JOIN"_h>, D, o2::aod::Hash<"JOIN"_h>, Ts.
32093235
JoinFull(std::shared_ptr<arrow::Table>&& table, uint64_t offset = 0)
32103236
: base{std::move(table), offset}
32113237
{
3212-
bindInternalIndicesTo(this);
3238+
if (this->tableSize() != 0) {
3239+
bindInternalIndicesTo(this);
3240+
}
32133241
}
32143242
JoinFull(std::vector<std::shared_ptr<arrow::Table>>&& tables, uint64_t offset = 0)
32153243
: base{ArrowHelpers::joinTables(std::move(tables), std::span{base::originalLabels}), offset}
32163244
{
3217-
bindInternalIndicesTo(this);
3245+
if (this->tableSize() != 0) {
3246+
bindInternalIndicesTo(this);
3247+
}
32183248
}
32193249
using base::bindExternalIndices;
32203250
using base::bindInternalIndicesTo;

Framework/Core/include/Framework/ArrowTableSlicingCache.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@ struct ArrowTableSlicingCache {
6464
constexpr static ServiceKind service_kind = ServiceKind::Stream;
6565

6666
Cache bindingsKeys;
67-
std::vector<std::shared_ptr<arrow::NumericArray<arrow::Int32Type>>> values;
68-
std::vector<std::shared_ptr<arrow::NumericArray<arrow::Int64Type>>> counts;
6967
std::vector<std::vector<int64_t>> offsets;
7068
std::vector<std::vector<int64_t>> sizes;
7169

Framework/Core/src/ArrowTableSlicingCache.cxx

Lines changed: 35 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,8 @@ void updatePairList(Cache& list, std::string const& binding, std::string const&
3131

3232
std::pair<int64_t, int64_t> SliceInfoPtr::getSliceFor(int value) const
3333
{
34-
int64_t offset = 0;
35-
if (offsets.empty()) {
36-
return {offset, 0};
37-
}
3834
if ((size_t)value >= offsets.size()) {
39-
return {offset, 0};
35+
return {0, 0};
4036
}
4137

4238
return {offsets[value], sizes[value]};
@@ -68,8 +64,6 @@ ArrowTableSlicingCache::ArrowTableSlicingCache(Cache&& bsks, Cache&& bsksUnsorte
6864
: bindingsKeys{bsks},
6965
bindingsKeysUnsorted{bsksUnsorted}
7066
{
71-
values.resize(bindingsKeys.size());
72-
counts.resize(bindingsKeys.size());
7367
offsets.resize(bindingsKeys.size());
7468
sizes.resize(bindingsKeys.size());
7569

@@ -81,10 +75,6 @@ void ArrowTableSlicingCache::setCaches(Cache&& bsks, Cache&& bsksUnsorted)
8175
{
8276
bindingsKeys = bsks;
8377
bindingsKeysUnsorted = bsksUnsorted;
84-
values.clear();
85-
values.resize(bindingsKeys.size());
86-
counts.clear();
87-
counts.resize(bindingsKeys.size());
8878
offsets.clear();
8979
offsets.resize(bindingsKeys.size());
9080
sizes.clear();
@@ -97,8 +87,6 @@ void ArrowTableSlicingCache::setCaches(Cache&& bsks, Cache&& bsksUnsorted)
9787

9888
arrow::Status ArrowTableSlicingCache::updateCacheEntry(int pos, std::shared_ptr<arrow::Table> const& table)
9989
{
100-
values[pos].reset();
101-
counts[pos].reset();
10290
offsets[pos].clear();
10391
sizes[pos].clear();
10492
if (table->num_rows() == 0) {
@@ -109,41 +97,50 @@ arrow::Status ArrowTableSlicingCache::updateCacheEntry(int pos, std::shared_ptr<
10997
throw runtime_error_f("Disabled cache %s/%s update requested", b.c_str(), k.c_str());
11098
}
11199
validateOrder(bindingsKeys[pos], table);
112-
arrow::Datum value_counts;
113-
auto options = arrow::compute::ScalarAggregateOptions::Defaults();
114-
ARROW_ASSIGN_OR_RAISE(value_counts,
115-
arrow::compute::CallFunction("value_counts", {table->GetColumnByName(bindingsKeys[pos].key)},
116-
&options));
117-
auto pair = static_cast<arrow::StructArray>(value_counts.array());
118-
values[pos].reset();
119-
counts[pos].reset();
120-
values[pos] = std::make_shared<arrow::NumericArray<arrow::Int32Type>>(pair.field(0)->data());
121-
counts[pos] = std::make_shared<arrow::NumericArray<arrow::Int64Type>>(pair.field(1)->data());
122100

123101
int maxValue = -1;
124-
for (auto i = values[pos]->length() - 1; i >= 0; --i) {
125-
if (values[pos]->Value(i) < 0) {
126-
continue;
127-
} else {
128-
maxValue = values[pos]->Value(i);
102+
auto column = table->GetColumnByName(k);
103+
104+
// starting from the end, find the first positive value, in a sorted column it is the largest index
105+
for (auto iChunk = column->num_chunks() - 1; iChunk >=0; --iChunk) {
106+
auto chunk = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(iChunk)->data());
107+
for (auto iElement = chunk.length() - 1; iElement >= 0; --iElement) {
108+
auto value = chunk.Value(iElement);
109+
if (value < 0) {
110+
continue;
111+
} else {
112+
maxValue = value;
113+
break;
114+
}
115+
}
116+
if (maxValue >= 0) {
129117
break;
130118
}
131119
}
132120

133121
offsets[pos].resize(maxValue + 1);
134122
sizes[pos].resize(maxValue + 1);
135-
std::fill(offsets[pos].begin(), offsets[pos].end(), 0);
136-
std::fill(sizes[pos].begin(), sizes[pos].end(), 0);
137-
int64_t offset = 0;
138-
for (auto i = 0U; i < values[pos]->length(); ++i) {
139-
auto value = values[pos]->Value(i);
140-
auto count = counts[pos]->Value(i);
141-
if (value >= 0) {
142-
offsets[pos][value] = offset;
143-
sizes[pos][value] = count;
123+
124+
// loop over the index and collect size/offset
125+
int lastValue = std::numeric_limits<int>::max();
126+
int globalRow = 0;
127+
for (auto iChunk = 0; iChunk < column->num_chunks(); ++iChunk) {
128+
auto chunk = static_cast<arrow::NumericArray<arrow::Int32Type>>(column->chunk(iChunk)->data());
129+
for (auto iElement = 0; iElement < chunk.length(); ++iElement) {
130+
auto v = chunk.Value(iElement);
131+
if (v >= 0) {
132+
if (v == lastValue) {
133+
++sizes[pos][v];
134+
} else {
135+
lastValue = v;
136+
++sizes[pos][v];
137+
offsets[pos][v] = globalRow;
138+
}
139+
}
140+
++globalRow;
144141
}
145-
offset += count;
146142
}
143+
147144
return arrow::Status::OK();
148145
}
149146

@@ -238,13 +235,6 @@ SliceInfoUnsortedPtr ArrowTableSlicingCache::getCacheUnsortedFor(const Entry& bi
238235

239236
SliceInfoPtr ArrowTableSlicingCache::getCacheForPos(int pos) const
240237
{
241-
if (values[pos] == nullptr && counts[pos] == nullptr) {
242-
return {
243-
{}, //
244-
{} //
245-
};
246-
}
247-
248238
return {
249239
gsl::span{offsets[pos].data(), offsets[pos].size()}, //
250240
gsl::span(sizes[pos].data(), sizes[pos].size()) //

0 commit comments

Comments
 (0)