Skip to content

Commit 4ec73c1

Browse files
authored
DPL: introduce range based views to navigate data model (#15061)
1 parent 87fa1f6 commit 4ec73c1

File tree

1 file changed

+239
-0
lines changed

1 file changed

+239
-0
lines changed
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
#ifndef O2_FRAMEWORK_DATASPECVIEWS_H_
12+
#define O2_FRAMEWORK_DATASPECVIEWS_H_
13+
14+
#include <fairmq/FwdDecls.h>
15+
#include <fairmq/Message.h>
16+
#include "DomainInfoHeader.h"
17+
#include "SourceInfoHeader.h"
18+
#include "Headers/DataHeader.h"
19+
#include <ranges>
20+
21+
namespace o2::framework
22+
{
23+
24+
struct count_payloads {
25+
// ends the pipeline, returns the container
26+
template <typename R>
27+
requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
28+
friend size_t operator|(R&& r, count_payloads self)
29+
{
30+
size_t count = 0;
31+
size_t mi = 0;
32+
while (mi < r.size()) {
33+
auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
34+
if (!header) {
35+
throw std::runtime_error("Not a DataHeader");
36+
}
37+
if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
38+
count += header->splitPayloadParts;
39+
mi += header->splitPayloadParts + 1;
40+
} else {
41+
count += header->splitPayloadParts ? header->splitPayloadParts : 1;
42+
mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
43+
}
44+
}
45+
return count;
46+
}
47+
};
48+
49+
struct count_parts {
50+
// ends the pipeline, returns the number of parts
51+
template <typename R>
52+
requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
53+
friend size_t operator|(R&& r, count_parts self)
54+
{
55+
size_t count = 0;
56+
size_t mi = 0;
57+
while (mi < r.size()) {
58+
auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
59+
auto* sih = o2::header::get<o2::framework::SourceInfoHeader*>(r[mi]->GetData());
60+
auto* dih = o2::header::get<o2::framework::DomainInfoHeader*>(r[mi]->GetData());
61+
if (!header && !sih && !dih) {
62+
throw std::runtime_error("Header information not found");
63+
}
64+
// We skip oldest possible timeframe / end of stream and not consider it
65+
// as actual parts.
66+
if (dih || sih) {
67+
count += 1;
68+
mi += 2;
69+
} else if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
70+
count += 1;
71+
mi += header->splitPayloadParts + 1;
72+
} else {
73+
count += header->splitPayloadParts;
74+
mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
75+
}
76+
}
77+
return count;
78+
}
79+
};
80+
81+
struct DataRefIndices {
82+
size_t headerIdx;
83+
size_t payloadIdx;
84+
};
85+
86+
struct get_pair {
87+
size_t pairId;
88+
template <typename R>
89+
requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
90+
friend DataRefIndices operator|(R&& r, get_pair self)
91+
{
92+
size_t count = 0;
93+
size_t mi = 0;
94+
while (mi < r.size()) {
95+
auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
96+
if (!header) {
97+
throw std::runtime_error("Not a DataHeader");
98+
}
99+
size_t diff = self.pairId - count;
100+
if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
101+
count += header->splitPayloadParts;
102+
if (self.pairId < count) {
103+
return {mi, mi + 1 + diff};
104+
}
105+
mi += header->splitPayloadParts + 1;
106+
} else {
107+
count += header->splitPayloadParts ? header->splitPayloadParts : 1;
108+
if (self.pairId < count) {
109+
return {mi, mi + 2 * diff + 1};
110+
}
111+
mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
112+
}
113+
}
114+
throw std::runtime_error("Payload not found");
115+
}
116+
};
117+
118+
struct get_dataref_indices {
119+
size_t part;
120+
size_t subPart;
121+
// ends the pipeline, returns the number of parts
122+
template <typename R>
123+
requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
124+
friend DataRefIndices operator|(R&& r, get_dataref_indices self)
125+
{
126+
size_t count = 0;
127+
size_t mi = 0;
128+
while (mi < r.size()) {
129+
auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
130+
if (!header) {
131+
throw std::runtime_error("Not a DataHeader");
132+
}
133+
if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
134+
if (self.part == count) {
135+
return {mi, mi + 1 + self.subPart};
136+
}
137+
count += 1;
138+
mi += header->splitPayloadParts + 1;
139+
} else {
140+
if (self.part == count) {
141+
return {mi, mi + 2 * self.subPart + 1};
142+
}
143+
count += 1;
144+
mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
145+
}
146+
}
147+
throw std::runtime_error("Payload not found");
148+
}
149+
};
150+
151+
struct get_header {
152+
size_t id;
153+
// ends the pipeline, returns the number of parts
154+
template <typename R>
155+
requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
156+
friend fair::mq::MessagePtr& operator|(R&& r, get_header self)
157+
{
158+
return r[(r | get_dataref_indices{self.id, 0}).headerIdx];
159+
}
160+
};
161+
162+
struct get_payload {
163+
size_t part;
164+
size_t subPart;
165+
// ends the pipeline, returns the number of parts
166+
template <typename R>
167+
requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
168+
friend fair::mq::MessagePtr& operator|(R&& r, get_payload self)
169+
{
170+
return r[(r | get_dataref_indices{self.part, self.subPart}).payloadIdx];
171+
}
172+
};
173+
174+
struct get_num_payloads {
175+
size_t id;
176+
// ends the pipeline, returns the number of parts
177+
template <typename R>
178+
requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
179+
friend size_t operator|(R&& r, get_num_payloads self)
180+
{
181+
size_t count = 0;
182+
size_t mi = 0;
183+
while (mi < r.size()) {
184+
auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
185+
if (!header) {
186+
throw std::runtime_error("Not a DataHeader");
187+
}
188+
if (self.id == count) {
189+
if (header->splitPayloadParts > 1 && (header->splitPayloadIndex == header->splitPayloadParts)) {
190+
return header->splitPayloadParts;
191+
} else {
192+
return 1;
193+
}
194+
}
195+
if (header->splitPayloadParts > 1 && (header->splitPayloadIndex == header->splitPayloadParts)) {
196+
count += 1;
197+
mi += header->splitPayloadParts + 1;
198+
} else {
199+
count += 1;
200+
mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
201+
}
202+
}
203+
return 0;
204+
}
205+
};
206+
207+
struct MessageSet;
208+
209+
struct MessageStore {
210+
std::span<MessageSet> sets;
211+
size_t inputsPerSlot = 0;
212+
};
213+
214+
struct inputs_for_slot {
215+
TimesliceSlot slot;
216+
template <typename R>
217+
requires requires(R r) { std::ranges::random_access_range<decltype(r.sets)>; }
218+
friend std::span<o2::framework::MessageSet> operator|(R&& r, inputs_for_slot self)
219+
{
220+
return std::span(r.sets[self.slot.index * r.inputsPerSlot]);
221+
}
222+
};
223+
224+
struct messages_for_input {
225+
size_t inputIdx;
226+
template <typename R>
227+
requires std::ranges::random_access_range<R>
228+
friend std::span<fair::mq::MessagePtr> operator|(R&& r, messages_for_input self)
229+
{
230+
return r[self.inputIdx].messages;
231+
}
232+
};
233+
234+
// FIXME: we should use special index classes in place of size_t
235+
// FIXME: we need something to substitute a range in the store with another
236+
237+
} // namespace o2::framework
238+
239+
#endif // O2_FRAMEWORK_DATASPECVIEWS_H_

0 commit comments

Comments
 (0)