Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/stdexec/__detail/__basic_sender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ namespace STDEXEC

static constexpr auto __get_state = //
[]<class _Sender, class _Receiver>(_Sender&& __sndr, _Receiver&& __rcvr) noexcept(
__nothrow_decay_copyable<__data_of<_Sender>>) -> decltype(auto)
__nothrow_decay_copyable<__data_of<_Sender>>)
-> __state<std::remove_cvref_t<_Receiver>, std::remove_cvref_t<__data_of<_Sender>>>
{
return __state{static_cast<_Receiver&&>(__rcvr),
STDEXEC::__get<1>(static_cast<_Sender&&>(__sndr))};
Expand Down
112 changes: 112 additions & 0 deletions test/stdexec/algos/adaptors/test_write_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <type_traits>
#include <utility>

#include <exec/single_thread_context.hpp>
#include <stdexec/execution.hpp>
#include <test_common/receivers.hpp>

Expand Down Expand Up @@ -68,4 +69,115 @@ namespace
receiver<state>{{}, &s});
::STDEXEC::start(op);
}

template <typename IncompleteType, typename Env = STDEXEC::env_of_t<IncompleteType>>
struct ReceiverIncomplete
{
using receiver_concept = STDEXEC::receiver_tag;

IncompleteType* m_ptr;

void set_value() && noexcept
{
STDEXEC::set_value(std::move(m_ptr->rcvr));
}

template <typename Error>
void set_error(Error&& error) && noexcept
{
STDEXEC::set_error(std::move(m_ptr->rcvr), std::forward<Error>(error));
}

[[nodiscard]]
constexpr auto get_env() const noexcept -> Env
{
return STDEXEC::get_env(*m_ptr);
}
};

template <STDEXEC::sender Sndr, STDEXEC::receiver Rcvr>
struct OpStateIncomplete
{
using operation_state_concept = STDEXEC::operation_state_tag;

using rcvr_t = ReceiverIncomplete<OpStateIncomplete, STDEXEC::env_of_t<Rcvr>>;
using inner_opstate_t = STDEXEC::connect_result_t<Sndr, rcvr_t>;

Rcvr rcvr;
inner_opstate_t inner_opstate;

OpStateIncomplete(Sndr&& sndr, Rcvr rcvr_)
: rcvr(std::move(rcvr_))
, inner_opstate(STDEXEC::connect(std::forward<Sndr>(sndr), rcvr_t{this}))
{}

void start() & noexcept
{
STDEXEC::start(inner_opstate);
}

[[nodiscard]]
constexpr auto get_env() const noexcept -> STDEXEC::env_of_t<Rcvr>
{
return STDEXEC::get_env(rcvr);
}
};

template <STDEXEC::sender Sndr>
struct SenderIncomplete
{
using sender_concept = STDEXEC::sender_tag;

template <class Self, class... Env>
static consteval auto get_completion_signatures()
-> STDEXEC::__completion_signatures_of_t<STDEXEC::__copy_cvref_t<Self, Sndr>, Env...>
{
return {};
}

template <STDEXEC::receiver Rcvr>
auto connect(Rcvr rcvr) && -> OpStateIncomplete<Sndr, Rcvr>
{
return {std::forward<Sndr>(sndr), std::move(rcvr)};
}

Sndr sndr;
};

struct incomplete_t
{
constexpr auto operator()() const noexcept
{
return STDEXEC::__closure(*this);
}

template <typename Sndr>
constexpr auto operator()(Sndr&& sndr) const -> SenderIncomplete<Sndr>
{
return {std::forward<Sndr>(sndr)};
}
};

inline constexpr incomplete_t incomplete{};

TEST_CASE("write_env with a receiver pointing to an incomplete operation state",
"[adaptors][write_env]")
{
exec::single_thread_context stc{};

int value = 0;

STDEXEC::sender auto sndr =
STDEXEC::read_env(STDEXEC::get_allocator)
| STDEXEC::then([](auto allocator)
{ static_assert(std::same_as<decltype(allocator), std::allocator<int>>); })
| STDEXEC::continues_on(stc.get_scheduler()) | incomplete()
| STDEXEC::write_env(STDEXEC::prop{STDEXEC::get_allocator, std::allocator<int>{}})
| STDEXEC::then([&value]() { ++value; });

STDEXEC::sync_wait(std::move(sndr));

CHECK(value == 1);
}

} // namespace