Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 32 additions & 28 deletions include/beman/execution/detail/counting_scope_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import std;
#else
#include <cstddef>
#include <exception>
#include <limits>
#include <mutex>
#include <utility>
#endif
#ifdef BEMAN_HAS_MODULES
import beman.execution.detail.immovable;
import beman.execution.detail.unreachable;
#else
#include <beman/execution/detail/immovable.hpp>
#include <beman/execution/detail/unreachable.hpp>
#endif

// ----------------------------------------------------------------------------
Expand All @@ -33,7 +36,7 @@ class beman::execution::detail::counting_scope_base : ::beman::execution::detail
counting_scope_base(counting_scope_base&&) = delete;
~counting_scope_base();

static constexpr ::std::size_t max_associations{8194u};
static constexpr ::std::size_t max_associations = ::std::numeric_limits<::std::size_t>::max();

auto close() noexcept -> void;

Expand Down Expand Up @@ -62,7 +65,7 @@ class beman::execution::detail::counting_scope_base : ::beman::execution::detail
}

auto operator=(assoc_t other) noexcept -> assoc_t& {
std::swap(scope, other.scope);
::std::swap(scope, other.scope);
return *this;
}

Expand Down Expand Up @@ -100,8 +103,9 @@ class beman::execution::detail::counting_scope_base : ::beman::execution::detail
};

auto try_associate() noexcept -> assoc_t;

auto disassociate() noexcept -> void;
auto complete() noexcept -> void;

auto add_node(node* n) noexcept -> void;

::std::mutex mutex;
Expand Down Expand Up @@ -142,11 +146,15 @@ inline auto beman::execution::detail::counting_scope_base::close() noexcept -> v
}

inline auto beman::execution::detail::counting_scope_base::add_node(node* n) noexcept -> void {
n->next = std::exchange(this->head, n);
n->next = ::std::exchange(this->head, n);
}

inline auto beman::execution::detail::counting_scope_base::try_associate() noexcept -> assoc_t {
::std::lock_guard lock(this->mutex);
if (this->count == max_associations) {
return assoc_t{};
}

switch (this->state) {
default:
return assoc_t{};
Expand All @@ -161,45 +169,41 @@ inline auto beman::execution::detail::counting_scope_base::try_associate() noexc
}

inline auto beman::execution::detail::counting_scope_base::disassociate() noexcept -> void {
{
::std::lock_guard lock(this->mutex);
if (0u < --this->count)
return;
this->state = state_t::joined;
}
this->complete();
}
::std::unique_lock guard(this->mutex);
if (--this->count > 0u || (this->state != state_t::open_and_joining && this->state != state_t::closed_and_joining))
return;

this->state = state_t::joined;
node* current = ::std::exchange(this->head, nullptr);
guard.unlock();

inline auto beman::execution::detail::counting_scope_base::complete() noexcept -> void {
node* current{[this] {
::std::lock_guard lock(this->mutex);
return ::std::exchange(this->head, nullptr);
}()};
while (current) {
::std::exchange(current, current->next)->complete();
}
}

inline auto beman::execution::detail::counting_scope_base::start_node(node* n) -> void {
::std::unique_lock guard(this->mutex);
switch (this->state) {
case ::beman::execution::detail::counting_scope_base::state_t::unused:
case ::beman::execution::detail::counting_scope_base::state_t::unused_and_closed:
case ::beman::execution::detail::counting_scope_base::state_t::joined:
this->state = ::beman::execution::detail::counting_scope_base::state_t::joined;
if (this->count == 0u) {
this->state = state_t::joined;
guard.unlock();
n->complete_inline();
return;
case ::beman::execution::detail::counting_scope_base::state_t::open:
this->state = ::beman::execution::detail::counting_scope_base::state_t::open_and_joining;
}

switch (this->state) {
case state_t::open:
this->state = state_t::open_and_joining;
break;
case ::beman::execution::detail::counting_scope_base::state_t::open_and_joining:
case state_t::open_and_joining:
break;
case ::beman::execution::detail::counting_scope_base::state_t::closed:
this->state = ::beman::execution::detail::counting_scope_base::state_t::closed_and_joining;
case state_t::closed:
this->state = state_t::closed_and_joining;
break;
case ::beman::execution::detail::counting_scope_base::state_t::closed_and_joining:
case state_t::closed_and_joining:
break;
default:
::beman::execution::detail::unreachable();
}
this->add_node(n);
}
Expand Down
26 changes: 24 additions & 2 deletions include/beman/execution/detail/counting_scope_join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import beman.execution.detail.make_sender;
import beman.execution.detail.receiver;
import beman.execution.detail.schedule;
import beman.execution.detail.set_value;
import beman.execution.detail.set_error;
import beman.execution.detail.set_stopped;
import beman.execution.detail.start;
#else
#include <beman/execution/detail/basic_sender.hpp>
Expand All @@ -39,6 +41,8 @@ import beman.execution.detail.start;
#include <beman/execution/detail/receiver.hpp>
#include <beman/execution/detail/schedule.hpp>
#include <beman/execution/detail/set_value.hpp>
#include <beman/execution/detail/set_error.hpp>
#include <beman/execution/detail/set_stopped.hpp>
#include <beman/execution/detail/start.hpp>
#endif

Expand Down Expand Up @@ -82,20 +86,38 @@ inline constexpr counting_scope_join_t counting_scope_join{};

template <::beman::execution::receiver Receiver>
struct beman::execution::detail::counting_scope_join_t::state : ::beman::execution::detail::counting_scope_base::node {
struct receiver_ref {
using receiver_concept = ::beman::execution::receiver_tag;

auto set_value() && noexcept -> void { ::beman::execution::set_value(::std::move(rcvr)); }

template <typename E>
auto set_error(E&& e) && noexcept -> void {
::beman::execution::set_error(::std::move(rcvr), ::std::forward<E>(e));
}

auto set_stopped() && noexcept -> void { ::beman::execution::set_stopped(::std::move(rcvr)); }

auto get_env() const noexcept { return ::beman::execution::get_env(rcvr); }

Receiver& rcvr;
};

using op_t = decltype(::beman::execution::connect(::beman::execution::schedule(::beman::execution::get_scheduler(
::beman::execution::get_env(::std::declval<Receiver&>()))),
::std::declval<Receiver&>()));
::std::declval<receiver_ref>()));

::beman::execution::detail::counting_scope_base* scope;
explicit state(::beman::execution::detail::counting_scope_base* s, Receiver& r)
: scope(s),
receiver(r),
op(::beman::execution::connect(::beman::execution::schedule(::beman::execution::get_scheduler(
::beman::execution::get_env(this->receiver))),
this->receiver)) {}
receiver_ref(this->receiver))) {}
virtual ~state() = default;

auto complete() noexcept -> void override { ::beman::execution::start(this->op); }

auto complete_inline() noexcept -> void override { ::beman::execution::set_value(::std::move(this->receiver)); }

auto start() noexcept -> void { this->scope->start_node(this); }
Expand Down
29 changes: 29 additions & 0 deletions include/beman/execution/detail/unreachable.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// include/beman/execution/detail/unreachable.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#ifndef INCLUDED_BEMAN_EXECUTION_DETAIL_UNREACHABLE
#define INCLUDED_BEMAN_EXECUTION_DETAIL_UNREACHABLE

#include <beman/execution/detail/common.hpp>
#ifdef BEMAN_HAS_IMPORT_STD
import std;
#else
#include <exception>
#include <utility>
#endif

// ----------------------------------------------------------------------------

namespace beman::execution::detail {
[[noreturn]] inline auto unreachable() -> void {
#ifdef __cpp_lib_unreachable
::std::unreachable();
#else
::std::terminate();
#endif
}
} // namespace beman::execution::detail

// ----------------------------------------------------------------------------

#endif // INCLUDED_BEMAN_EXECUTION_DETAIL_UNREACHABLE
2 changes: 2 additions & 0 deletions src/beman/execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ target_sources(
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/unspecified_promise.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/unstoppable.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/unstoppable_token.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/unreachable.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/valid_completion_for.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/valid_completion_signatures.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/valid_specialization.hpp
Expand Down Expand Up @@ -387,6 +388,7 @@ if(BEMAN_USE_MODULES)
unspecified_promise.cppm
unstoppable.cppm
unstoppable_token.cppm
unreachable.cppm
valid_completion_for.cppm
valid_completion_signatures.cppm
valid_specialization.cppm
Expand Down
11 changes: 11 additions & 0 deletions src/beman/execution/unreachable.cppm
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module;
// src/beman/execution/unreachable.cppm -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#include <beman/execution/detail/unreachable.hpp>

export module beman.execution.detail.unreachable;

namespace beman::execution::detail {
export using beman::execution::detail::unreachable;
} // namespace beman::execution::detail
14 changes: 14 additions & 0 deletions tests/beman/execution/exec-scope-counting.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import beman.execution;
#include <beman/execution/detail/counting_scope.hpp>
#include <beman/execution/detail/sender.hpp>
#include <beman/execution/detail/just.hpp>
#include <beman/execution/detail/spawn.hpp>
#include <beman/execution/detail/sync_wait.hpp>
#include <beman/execution/detail/then.hpp>
#include <beman/execution/detail/inline_scheduler.hpp>
#endif

Expand Down Expand Up @@ -113,11 +115,23 @@ auto token() -> void {
ASSERT(true == called);
}

auto spawn() -> void {
constexpr std::size_t expected = 10;
std::size_t counter = 0;
test_std::counting_scope scope;
for (std::size_t i = 0; i < expected; ++i) {
test_std::spawn(test_std::just() | test_std::then([&counter]() noexcept { ++counter; }), scope.get_token());
}
test_std::sync_wait(scope.join());
ASSERT(counter == expected);
}

} // namespace

TEST(exec_scope_counting) {
general();
ctor();
mem();
token();
spawn();
}
14 changes: 14 additions & 0 deletions tests/beman/execution/exec-scope-simple-counting.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import beman.execution;
#include <beman/execution/detail/simple_counting_scope.hpp>
#include <beman/execution/detail/sender.hpp>
#include <beman/execution/detail/just.hpp>
#include <beman/execution/detail/spawn.hpp>
#include <beman/execution/detail/sync_wait.hpp>
#include <beman/execution/detail/then.hpp>
#include <beman/execution/detail/inline_scheduler.hpp>
#endif

Expand Down Expand Up @@ -111,11 +113,23 @@ auto token() -> void {
ASSERT(true == called);
}

auto spawn() -> void {
constexpr std::size_t expected = 10;
std::size_t counter = 0;
test_std::simple_counting_scope scope;
for (std::size_t i = 0; i < expected; ++i) {
test_std::spawn(test_std::just() | test_std::then([&counter]() noexcept { ++counter; }), scope.get_token());
}
test_std::sync_wait(scope.join());
ASSERT(counter == expected);
}

} // namespace

TEST(exec_scope_simple_counting) {
general();
ctor();
mem();
token();
spawn();
}
Loading