Skip to content

Commit 8dddc15

Browse files
committed
DPL: Use X9 to make AsyncQueue atomic
1 parent 13baa03 commit 8dddc15

File tree

3 files changed

+45
-1
lines changed

3 files changed

+45
-1
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ o2_add_library(Framework
163163
O2::Headers
164164
O2::MemoryResources
165165
O2::PCG
166+
O2::X9
166167
RapidJSON::RapidJSON
167168
Arrow::arrow_shared
168169
Microsoft.GSL::GSL

Framework/Core/include/Framework/AsyncQueue.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
#include "Framework/TimesliceSlot.h"
1515
#include <string>
1616
#include <vector>
17+
#include <atomic>
18+
19+
typedef struct x9_inbox_internal x9_inbox;
20+
typedef struct x9_node_internal x9_node;
1721

1822
namespace o2::framework
1923
{
@@ -89,6 +93,12 @@ struct AsyncQueue {
8993
std::vector<AsyncTaskSpec> prototypes;
9094
std::vector<AsyncTask> tasks;
9195
size_t iteration = 0;
96+
97+
std::atomic<bool> first = true;
98+
99+
// Inbox for the message queue used to append
100+
// tasks to this queue.
101+
x9_inbox* inbox = nullptr;
92102
AsyncQueue();
93103
};
94104

@@ -104,6 +114,8 @@ struct AsyncQueueHelpers {
104114
/// 3. only execute the highest (timeslice, debounce) value
105115
static void run(AsyncQueue& queue, TimesliceId oldestPossibleTimeslice);
106116

117+
// Flush tasks which were posted but not yet committed to the queue
118+
static void flushPending(AsyncQueue& queue);
107119
/// Reset the queue to its initial state
108120
static void reset(AsyncQueue& queue);
109121
};

Framework/Core/src/AsyncQueue.cxx

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,17 @@
1111

1212
#include "Framework/AsyncQueue.h"
1313
#include "Framework/Signpost.h"
14+
#include "x9.h"
1415
#include <numeric>
1516

1617
O2_DECLARE_DYNAMIC_LOG(async_queue);
1718

1819
namespace o2::framework
1920
{
2021
AsyncQueue::AsyncQueue()
22+
: inbox(x9_create_inbox(16, "async_queue", sizeof(AsyncTask)))
2123
{
24+
this->inbox = x9_create_inbox(16, "async_queue", sizeof(AsyncTask));
2225
}
2326

2427
auto AsyncQueueHelpers::create(AsyncQueue& queue, AsyncTaskSpec spec) -> AsyncTaskId
@@ -31,11 +34,39 @@ auto AsyncQueueHelpers::create(AsyncQueue& queue, AsyncTaskSpec spec) -> AsyncTa
3134

3235
auto AsyncQueueHelpers::post(AsyncQueue& queue, AsyncTask const& task) -> void
3336
{
34-
queue.tasks.push_back(task);
37+
// Until we do not manage to write to the inbox, keep removing
38+
// items from the queue if you are the first one which fails to
39+
// write.
40+
while (!x9_write_to_inbox(queue.inbox, sizeof(AsyncTask), &task)) {
41+
AsyncQueueHelpers::flushPending(queue);
42+
}
43+
}
44+
45+
auto AsyncQueueHelpers::flushPending(AsyncQueue& queue) -> void
46+
{
47+
bool isFirst = true;
48+
if (!std::atomic_compare_exchange_strong(&queue.first, &isFirst, false)) {
49+
// Not the first, try again.
50+
return;
51+
}
52+
// First thread which does not manage to write to the queue.
53+
// Flush it a bit before we try again.
54+
AsyncTask toFlush;
55+
// This potentially stalls if the inserting tasks are faster to insert
56+
// than we are to retrieve. We should probably have a cut-off
57+
while (x9_read_from_inbox(queue.inbox, sizeof(AsyncTask), &toFlush)) {
58+
queue.tasks.push_back(toFlush);
59+
}
60+
queue.first = true;
3561
}
3662

3763
auto AsyncQueueHelpers::run(AsyncQueue& queue, TimesliceId oldestPossible) -> void
3864
{
65+
// We synchronize right before we run to get as many
66+
// tasks as possible. Notice we might still miss some
67+
// which will have to handled on a subsequent iteration.
68+
AsyncQueueHelpers::flushPending(queue);
69+
3970
if (queue.tasks.empty()) {
4071
return;
4172
}

0 commit comments

Comments
 (0)