Skip to content

Commit c6f3d4e

Browse files
committed
exec::repeat_effect_until: Throwing Decay-Copy & Value Category Preservation
The asynchronous loop of exec::repeat_effect_until proceeds until the child operation sends a value which converts to true. Previously this process proceeded as follows: 1. Accept the child operation's result by value to avoid dangling references into the operation state (see next step) 2. Destroy the child operation state 3. Convert the result accepted in step 1 to bool and check if it's true, if so end the operation, otherwise 4. Connect the child sender 5. Start the new child operation Unfortunately step 1 meant that the result of the child operation would be decay-copied to pass it by value. This occurred within a noexcept function and therefore if that decay-copy threw std::terminate would be called. Moreover the previous implementation did not forward the result in step 3. This meant that if the child's result type was only rvalue convertible to bool compilation would fail. Additionally the same pass-by-value strategy was used for errors. However when handling an error there's no need to destroy the child operation state due to the fact the operation is ending and therefore doesn't need to reconnect the child sender for the next iteration (note this logic also applies to successful completion). Fixed all of the above by handling completion of the child operation as follows: 1. If the child completed with error or stopped simply forward that completion through (note the child operation state is not destroyed and will be cleaned up by the destructor of the operation state for exec::repeat_effect_until) (note that this saves one decay-copy over the previous implementation but requires a branch in the destructor, which was already present), otherwise 2. Forward the result, convert that forwarded expression to bool, and check if it's true, if so end the operation (note that once again the child operation state is not destroyed and once again a decay-copy is eliminated), otherwise 3. Destroy the child operation state 4. Connect the child sender 5. Start the new child operation
1 parent 5ffee46 commit c6f3d4e

File tree

3 files changed

+126
-17
lines changed

3 files changed

+126
-17
lines changed

include/exec/repeat_effect_until.hpp

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
#include "trampoline_scheduler.hpp"
2525
#include "sequence.hpp"
2626

27-
#include "../stdexec/__detail/__atomic.hpp"
2827
#include <exception>
2928
#include <type_traits>
3029

@@ -82,7 +81,7 @@ namespace exec {
8281
using __child_op_t = stdexec::connect_result_t<__child_on_sched_sender_t, __receiver_t>;
8382

8483
__child_t __child_;
85-
__std::atomic_flag __started_{};
84+
bool __has_child_op_ = false;
8685
stdexec::__manual_lifetime<__child_op_t> __child_op_;
8786
trampoline_scheduler __sched_;
8887

@@ -93,11 +92,7 @@ namespace exec {
9392
}
9493

9594
~__repeat_effect_state() {
96-
if (!__started_.test(__std::memory_order_acquire)) {
97-
__std::atomic_thread_fence(__std::memory_order_release);
98-
// TSan does not support __std::atomic_thread_fence, so we
99-
// need to use the TSan-specific __tsan_release instead:
100-
STDEXEC_WHEN(STDEXEC_TSAN(), __tsan_release(&__started_));
95+
if (__has_child_op_) {
10196
__child_op_.__destroy();
10297
}
10398
}
@@ -107,30 +102,42 @@ namespace exec {
107102
return stdexec::connect(
108103
exec::sequence(stdexec::schedule(__sched_), __child_), __receiver_t{this});
109104
});
105+
__has_child_op_ = true;
106+
}
107+
108+
void __destroy() noexcept {
109+
__child_op_.__destroy();
110+
__has_child_op_ = false;
110111
}
111112

112113
void __start() noexcept {
113-
const bool __already_started [[maybe_unused]]
114-
= __started_.test_and_set(__std::memory_order_relaxed);
115-
STDEXEC_ASSERT(!__already_started);
114+
STDEXEC_ASSERT(__has_child_op_);
116115
stdexec::start(__child_op_.__get());
117116
}
118117

119118
template <class _Tag, class... _Args>
120-
void __complete(_Tag, _Args... __args) noexcept { // Intentionally by value...
121-
__child_op_.__destroy(); // ... because this could potentially invalidate them.
119+
void __complete(_Tag, _Args &&...__args) noexcept {
122120
if constexpr (same_as<_Tag, set_value_t>) {
123121
// If the sender completed with true, we're done
124122
STDEXEC_TRY {
125-
const bool __done = (static_cast<bool>(__args) && ...);
123+
const bool __done = (static_cast<bool>(static_cast<_Args &&>(__args)) && ...);
126124
if (__done) {
127125
stdexec::set_value(static_cast<_Receiver &&>(this->__receiver()));
128-
} else {
126+
return;
127+
}
128+
__destroy();
129+
STDEXEC_TRY {
129130
__connect();
130-
stdexec::start(__child_op_.__get());
131131
}
132+
STDEXEC_CATCH_ALL {
133+
stdexec::set_error(
134+
static_cast<_Receiver &&>(this->__receiver()), std::current_exception());
135+
return;
136+
}
137+
stdexec::start(__child_op_.__get());
132138
}
133139
STDEXEC_CATCH_ALL {
140+
__destroy();
134141
stdexec::set_error(
135142
static_cast<_Receiver &&>(this->__receiver()), std::current_exception());
136143
}

test/exec/test_repeat_effect_until.cpp

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626

2727
#include <catch2/catch.hpp>
2828

29+
#include <limits>
2930
#include <memory>
31+
#include <stdexcept>
3032
#include <utility>
3133

3234
namespace ex = stdexec;
@@ -197,4 +199,104 @@ namespace {
197199
ex::start(op);
198200
REQUIRE(counter == 10);
199201
}
202+
203+
TEST_CASE(
204+
"repeat_effect works correctly when the child operation sends an error type which throws when "
205+
"decay-copied",
206+
"[adaptors][repeat_effect]") {
207+
struct error_type {
208+
explicit error_type(unsigned& throw_after) noexcept
209+
: throw_after_(throw_after) {
210+
}
211+
error_type(const error_type& other)
212+
: throw_after_(other.throw_after_) {
213+
if (!throw_after_) {
214+
throw std::logic_error("TEST");
215+
}
216+
--throw_after_;
217+
}
218+
unsigned& throw_after_;
219+
};
220+
struct receiver {
221+
using receiver_concept = ::stdexec::receiver_t;
222+
void set_value() && noexcept {
223+
FAIL_CHECK("Unexpected value completion signal");
224+
}
225+
void set_stopped() && noexcept {
226+
FAIL_CHECK("Unexpected stopped completion signal");
227+
}
228+
void set_error(std::exception_ptr) && noexcept {
229+
CHECK(!done_);
230+
}
231+
void set_error(const error_type&) && noexcept {
232+
CHECK(!done_);
233+
done_ = true;
234+
}
235+
bool& done_;
236+
};
237+
unsigned throw_after = 0;
238+
bool done = false;
239+
do {
240+
const auto tmp = throw_after;
241+
throw_after = std::numeric_limits<unsigned>::max();
242+
auto op =
243+
ex::connect(exec::repeat_effect(ex::just_error(error_type(throw_after))), receiver(done));
244+
throw_after = tmp;
245+
ex::start(op);
246+
throw_after = tmp;
247+
++throw_after;
248+
} while (!done);
249+
}
250+
251+
TEST_CASE(
252+
"repeat_effect_until works correctly when the child operation sends type which throws when "
253+
"decay-copied, and when converted to bool, and which is only rvalue convertible to bool",
254+
"[adaptors][repeat_effect_until]") {
255+
class value_type {
256+
void maybe_throw_() const {
257+
if (!throw_after_) {
258+
throw std::logic_error("TEST");
259+
}
260+
--throw_after_;
261+
}
262+
public:
263+
explicit value_type(unsigned& throw_after) noexcept
264+
: throw_after_(throw_after) {
265+
}
266+
value_type(const value_type& other)
267+
: throw_after_(other.throw_after_) {
268+
maybe_throw_();
269+
}
270+
unsigned& throw_after_;
271+
operator bool() && {
272+
maybe_throw_();
273+
return true;
274+
}
275+
};
276+
struct receiver {
277+
using receiver_concept = ::stdexec::receiver_t;
278+
void set_value() && noexcept {
279+
done_ = true;
280+
}
281+
void set_stopped() && noexcept {
282+
FAIL_CHECK("Unexpected stopped completion signal");
283+
}
284+
void set_error(std::exception_ptr) && noexcept {
285+
CHECK(!done_);
286+
}
287+
bool& done_;
288+
};
289+
unsigned throw_after = 0;
290+
bool done = false;
291+
do {
292+
const auto tmp = throw_after;
293+
throw_after = std::numeric_limits<unsigned>::max();
294+
auto op =
295+
ex::connect(exec::repeat_effect_until(ex::just(value_type(throw_after))), receiver(done));
296+
throw_after = tmp;
297+
ex::start(op);
298+
throw_after = tmp;
299+
++throw_after;
300+
} while (!done);
301+
}
200302
} // namespace

test/test_common/receivers.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,15 +358,15 @@ namespace {
358358
FAIL_CHECK("set_stopped called on expect_error_receiver");
359359
}
360360

361-
void set_error(T err) noexcept {
361+
void set_error(T&& err) noexcept {
362362
this->set_called();
363363
if (error_) {
364364
CHECK(to_comparable(err) == to_comparable(*error_));
365365
}
366366
}
367367

368368
template <class E>
369-
void set_error(E) noexcept {
369+
void set_error(E&&) noexcept {
370370
FAIL_CHECK("set_error called on expect_error_receiver with wrong error type");
371371
}
372372

0 commit comments

Comments
 (0)