Skip to content

Commit 8bb2de7

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
++
1 parent 675c32c commit 8bb2de7

20 files changed

Lines changed: 44 additions & 554 deletions

include/ipfixprobe/outputPlugin/outputStorage/allocationBuffer3.hpp

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,10 @@ class AllocationBuffer3 : public AllocationBufferBase<ElementType> {
3434

3535
ElementType* allocate(const uint8_t writerIndex) noexcept override
3636
{
37-
static thread_local std::mt19937 gen(std::random_device {}());
38-
static thread_local std::uniform_int_distribution<> dist(0, 31);
39-
static thread_local uint64_t threadQueueIndex = dist(gen);
4037
WriterData& writerData = m_writersData[writerIndex].get();
4138
while (true) {
42-
threadQueueIndex = (threadQueueIndex + 1) % m_queues.size();
43-
// writerData.queueIndex = (writerData.queueIndex + 1) % m_queues.size();
44-
ElementType* res = m_queues[threadQueueIndex]->tryPop();
39+
writerData.queueIndex = (writerData.queueIndex + 1) % m_queues.size();
40+
ElementType* res = m_queues[writerData.queueIndex]->tryPop();
4541
if (res) {
4642
return res;
4743
}
@@ -50,15 +46,10 @@ class AllocationBuffer3 : public AllocationBufferBase<ElementType> {
5046

5147
void deallocate(ElementType* element, const uint8_t writerIndex) noexcept override
5248
{
53-
static thread_local std::mt19937 gen(std::random_device {}());
54-
static thread_local std::uniform_int_distribution<> dist(0, 31);
55-
static thread_local uint64_t threadQueueIndex = dist(gen);
5649
WriterData& writerData = m_writersData[writerIndex].get();
5750
while (true) {
58-
threadQueueIndex = (threadQueueIndex + 1) % m_queues.size();
59-
// writerData.queueIndex = (writerData.queueIndex + 1) % m_queues.size();
60-
// const uint64_t queueIndex = m_nextQueue++ % m_queues.size();
61-
if (m_queues[threadQueueIndex]->tryPush(element)) {
51+
writerData.queueIndex = (writerData.queueIndex + 1) % m_queues.size();
52+
if (m_queues[writerData.queueIndex]->tryPush(element)) {
6253
return;
6354
}
6455
}
@@ -108,7 +99,6 @@ class AllocationBuffer3 : public AllocationBufferBase<ElementType> {
10899
std::vector<CacheAlligned<WriterData>> m_writersData;
109100

110101
std::array<CacheAlligned<Queue>, 32> m_queues;
111-
// std::atomic<uint64_t> m_nextQueue {0};
112102
};
113103

114104
} // namespace ipxp::output

include/ipfixprobe/outputPlugin/outputStorage/allocationBufferB.hpp

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,6 @@ class AllocationBufferB : public AllocationBufferBase<ElementType> {
2424
constexpr static std::size_t WINDOW_SIZE = 16;
2525

2626
public:
27-
/*__attribute__((noinline)) std::size_t d_test(auto& container)
28-
{
29-
return std::ranges::count_if(container, [](const auto& bucket) {
30-
return bucket.load(std::memory_order_acquire) != std::numeric_limits<uint16_t>::max();
31-
});
32-
}*/
33-
3427
explicit AllocationBufferB(const std::size_t capacity, const uint8_t writersCount) noexcept
3528
: m_objectPool(capacity + writersCount * BUCKET_SIZE)
3629
, m_buckets(m_objectPool.size() / BUCKET_SIZE)
@@ -42,8 +35,6 @@ class AllocationBufferB : public AllocationBufferBase<ElementType> {
4235
throw std::invalid_argument("Number of buckets must be a multiple of writers count");
4336
}
4437

45-
// m_fullBuckets.reserve(m_buckets.size());
46-
// m_emptyBuckets.reserve(m_buckets.size());
4738
for (ElementType& element : m_objectPool) {
4839
const std::size_t elementIndex = &element - m_objectPool.data();
4940
const std::size_t bucketIndex = elementIndex / BUCKET_SIZE;
@@ -69,8 +60,6 @@ class AllocationBufferB : public AllocationBufferBase<ElementType> {
6960
}
7061
}
7162

72-
// void unregisterWriter(const uint8_t writerIndex) noexcept override {}
73-
7463
ElementType* allocate(const uint8_t writerIndex) noexcept override
7564
{
7665
WriterData& writerData = m_writersData[writerIndex].get();
@@ -137,20 +126,11 @@ class AllocationBufferB : public AllocationBufferBase<ElementType> {
137126

138127
void pushBucket(auto& buckets, const std::size_t bucketIndex, std::size_t& pushRank) noexcept
139128
{
140-
// std::size_t offset = 0;
141129
while (true) {
142130
uint16_t expected = buckets[pushRank].load(std::memory_order_acquire);
143-
/*if (++offset % 100'000'000 == 0) {
144-
std::cout << "d_test(push)=" << d_test(buckets) << "\n";
145-
}*/
146131
if (expected != Bucket::PLACEHOLDER) {
147-
const std::size_t newPushRank
148-
= ((pushRank / INDEXES_IN_CACHE_LINE + 1) * INDEXES_IN_CACHE_LINE)
132+
pushRank = ((pushRank / INDEXES_IN_CACHE_LINE + 1) * INDEXES_IN_CACHE_LINE)
149133
% buckets.size();
150-
if (newPushRank < pushRank) {
151-
// offset++;
152-
}
153-
pushRank = newPushRank;
154134
continue;
155135
}
156136
if (buckets[pushRank].compare_exchange_weak(
@@ -166,12 +146,8 @@ class AllocationBufferB : public AllocationBufferBase<ElementType> {
166146

167147
uint16_t popBucket(auto& buckets, std::size_t& popRank) noexcept
168148
{
169-
// std::size_t offset = 0;
170149
popRank = (popRank - 1 + buckets.size()) % buckets.size();
171150
while (true) {
172-
/*if (++offset % 100'000'000 == 0) {
173-
std::cout << "d_test(pop)=" << d_test(buckets) << "\n";
174-
}*/
175151
uint16_t expected = buckets[popRank].load(std::memory_order_acquire);
176152
if (expected == Bucket::PLACEHOLDER) {
177153
popRank
@@ -184,16 +160,13 @@ class AllocationBufferB : public AllocationBufferBase<ElementType> {
184160
Bucket::PLACEHOLDER,
185161
std::memory_order_release,
186162
std::memory_order_acquire)) {
187-
// popRank = (popRank - 1 + buckets.size()) % buckets.size();
188163
return expected;
189164
}
190165
}
191166
}
192167

193168
std::vector<ElementType> m_objectPool;
194169
std::vector<Bucket> m_buckets;
195-
// std::array<std::atomic<uint16_t>, 65536> m_fullBuckets;
196-
// std::array<std::atomic<uint16_t>, 65536> m_emptyBuckets;
197170
std::deque<std::atomic<uint16_t>> m_fullBuckets;
198171
std::deque<std::atomic<uint16_t>> m_emptyBuckets;
199172
std::vector<CacheAlligned<WriterData>> m_writersData;

include/ipfixprobe/outputPlugin/outputStorage/allocationBufferBase.hpp

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,6 @@ class AllocationBufferBase {
1818
virtual void registerWriter([[maybe_unused]] const uint8_t writerIndex) noexcept {}
1919

2020
virtual ~AllocationBufferBase() = default;
21-
22-
/*void replace(ElementType*& oldValue, ElementType* newValue, const uint8_t writerId) noexcept
23-
{
24-
if (oldValue != nullptr) {
25-
deallocate(oldValue, writerId);
26-
}
27-
oldValue = newValue;
28-
}*/
2921
};
3022

3123
} // namespace ipxp::output

include/ipfixprobe/outputPlugin/outputStorage/allocationBufferS.hpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,6 @@ class AllocationBufferS : public AllocationBufferBase<ElementType> {
105105
}
106106
}
107107

108-
// m_helpStates[writerIndex]->store(HelpState {false, false}, std::memory_order_release);
109-
110108
void steal(const uint8_t writerIndex) noexcept
111109
{
112110
WriterData& writerData = m_writersData[writerIndex].get();

include/ipfixprobe/outputPlugin/outputStorage/b2OutputStorage.hpp

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,6 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
8383
this->m_buckets[writerData.writePosition].bucketIndex = writerData.bucketAllocation.reset(
8484
this->m_buckets[writerData.writePosition].bucketIndex);
8585

86-
/*if (this->m_highestReaderGeneration.load(std::memory_order_acquire) >= x) {
87-
throw std::runtime_error("Shnejne?");
88-
}*/
89-
9086
const uint8_t correspondingReaderIndex
9187
= writerData.writePosition % this->m_expectedReadersCount;
9288
uint64_t generationToStore = 0;
@@ -98,7 +94,7 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
9894
generationToStore,
9995
std::memory_order_release);
10096
// TODO REMOVE DEBUG COUNTER
101-
d_reads.fetch_add(1, std::memory_order_acq_rel);
97+
// d_reads.fetch_add(1, std::memory_order_acq_rel);
10298
} while (this->m_readersData[correspondingReaderIndex]->generation.load(
10399
std::memory_order_acquire)
104100
>= generationToStore);
@@ -115,7 +111,6 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
115111
{
116112
typename BOutputStorage<ElementType>::ReaderData& readerData
117113
= this->m_readersData[readerIndex].get();
118-
// const uint64_t readPosition = readerData.readPosition;
119114
if (readerData.bucketAllocation.containersLeft()) {
120115
return &this->getNextElement(readerData.bucketAllocation).getData();
121116
}
@@ -126,8 +121,6 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
126121
do {
127122
const bool overflowed = readerData.shift(this->m_expectedReadersCount, readerIndex);
128123

129-
// auto& y = this->m_buckets[readerData.readPosition];
130-
// if (readerData.isOnBufferBegin(this->m_expectedReadersCount)) {
131124
if (overflowed) {
132125
if (!this->writersPresent()) {
133126
readerData.generation.fetch_add(1, std::memory_order_release);
@@ -148,7 +141,6 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
148141
this->m_buckets[readerData.readPosition].lock.lock();
149142
cachedGeneration = this->m_buckets[readerData.readPosition].generation.load(
150143
std::memory_order_acquire);
151-
// std::atomic_thread_fence(std::memory_order_acquire);
152144
cachedBucketIndex = this->m_buckets[readerData.readPosition].bucketIndex;
153145
if (cachedGeneration > readerData.generation.load(std::memory_order_acquire)) {
154146
readerData.seenValidBucket = true;
@@ -164,12 +156,6 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
164156
return &this->getNextElement(readerData.bucketAllocation).getData();
165157
}
166158

167-
/*bool finished([[maybe_unused]] const std::size_t readerGroupIndex) noexcept override
168-
{
169-
return !writersPresent() && m_highestWriterGeneration + 200000 <
170-
m_lowestReaderGeneration;
171-
}*/
172-
173159
protected:
174160
void updateLowestReaderGeneration() noexcept
175161
{
@@ -183,11 +169,7 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
183169
| std::ranges::to<boost::container::static_vector<
184170
uint64_t,
185171
OutputStorage<ElementType>::MAX_READERS_COUNT>>();
186-
// const uint64_t highestReaderGeneration = *std::ranges::max_element(readerGenerations);
187-
// casMax(this->m_highestReaderGeneration, highestReaderGeneration);
188-
// m_highestReaderGeneration = highestReaderGeneration;
189172
const uint64_t lowestReaderGeneration = *std::ranges::min_element(readerGenerations);
190-
// casMin(m_lowestReaderGeneration, lowestReaderGeneration);
191173
this->m_lowestReaderGeneration.store(lowestReaderGeneration, std::memory_order_release);
192174
}
193175

0 commit comments

Comments
 (0)