Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 42 additions & 15 deletions doc/modules/ROOT/pages/4.coroutines/4b.launching.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,6 @@ run_async(ex,

When no handlers are provided, results are discarded and exceptions are rethrown (causing `std::terminate` if uncaught).

=== Stop Token Support

Pass a stop token to enable cooperative cancellation:

[source,cpp]
----
std::stop_source source;
run_async(ex, source.get_token())(cancellable_task());

// Later, to request cancellation:
source.request_stop();
----

The stop token is propagated to the task and all tasks it awaits.

== run: Executor Hopping Within Coroutines

Inside a coroutine, use `run` to execute a child task on a different executor:
Expand Down Expand Up @@ -136,6 +121,48 @@ This pattern is useful for:
* Performing I/O on an I/O-specific context
* Ensuring UI updates happen on the UI thread

== Stop Token Propagation

Both `run_async` and `run` propagate stop tokens to the launched task and all tasks it awaits. The task accesses its token via `co_await this_coro::stop_token`.

=== Injecting a Token with run_async

Since `run_async` is called from non-coroutine code, there is no caller token to inherit. Pass a stop token explicitly:

[source,cpp]
----
std::stop_source source;
run_async(ex, source.get_token())(cancellable_task());

// Later, to request cancellation:
source.request_stop();
----

=== Inheritance with run

`run` is called from within a coroutine, so it inherits the caller's stop token by default:

[source,cpp]
----
task<void> parent()
{
// Child automatically receives our stop token
co_await run(pool.get_executor())(child_task());
}
----

To override with a different token, pass it explicitly:

[source,cpp]
----
task<void> parent()
{
std::stop_source local;
// Child gets local's token, not our caller's
co_await run(pool.get_executor(), local.get_token())(child_task());
}
----

== Handler Threading

Handlers passed to `run_async` are invoked on whatever thread the executor schedules:
Expand Down
19 changes: 15 additions & 4 deletions doc/modules/ROOT/pages/4.coroutines/4e.cancellation.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -316,25 +316,36 @@ task<> with_timeout(task<> operation, std::chrono::seconds timeout)

=== User Cancellation

Connect UI cancellation to stop tokens:
Connect UI cancellation to stop tokens. Pass the token through `run_async` so it propagates automatically via the execution environment—the task accesses it with `co_await this_coro::stop_token` instead of receiving it as a function argument:

[source,cpp]
----
class download_manager
{
executor_ref executor_;
std::stop_source stop_source_;

public:
void start_download(std::string url)
{
run_async(executor_)(download(url, stop_source_.get_token()));
// Token propagated via io_env, not as a function argument
run_async(executor_, stop_source_.get_token())(download(url));
}

void cancel()
{
stop_source_.request_stop();
}
};

task<void> download(std::string url)
{
auto token = co_await this_coro::stop_token; // From run_async's io_env
while (!token.stop_requested())
{
co_await fetch_next_chunk(url);
}
}
----

=== Graceful Shutdown
Expand Down
94 changes: 94 additions & 0 deletions test/unit/ex/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,97 @@ struct run_test
BOOST_TEST(called);
}

//----------------------------------------------------------
// Stop Token Propagation
//----------------------------------------------------------

static task<bool>
check_stop_requested()
{
auto token = co_await this_coro::stop_token;
co_return token.stop_requested();
}

void
testStopTokenInheritance()
{
// Verify run(ex) inherits the caller's stop token
int dispatch_count = 0;
test_executor ex(1, dispatch_count);
std::stop_source source;
source.request_stop();
bool result = false;

auto outer = [&]() -> task<bool> {
// run(ex) with no explicit stop token should inherit
// the caller's token (which is stopped)
co_return co_await capy::run(ex)(check_stop_requested());
};

run_async(ex, source.get_token(),
[&](bool v) { result = v; })(outer());

BOOST_TEST(result);
}

void
testStopTokenOverrideInnerStopped()
{
// Stop the inner (override) token only.
// Inner task should see stopped; outer should not.
int dispatch_count = 0;
test_executor ex(1, dispatch_count);
std::stop_source caller_source;
std::stop_source override_source;
override_source.request_stop();

bool outer_stopped = true;
bool inner_stopped = false;

auto outer = [&]() -> task<void> {
auto token = co_await this_coro::stop_token;
outer_stopped = token.stop_requested();
inner_stopped = co_await capy::run(ex, override_source.get_token())(
check_stop_requested());
};

run_async(ex, caller_source.get_token())(outer());

BOOST_TEST(!outer_stopped);
BOOST_TEST(inner_stopped);
}

void
testStopTokenOverrideOuterStopped()
{
// Stop the outer (caller) token only.
// Outer task should see stopped; inner (override) should not.
int dispatch_count = 0;
test_executor ex(1, dispatch_count);
std::stop_source caller_source;
caller_source.request_stop();
std::stop_source override_source;

bool outer_stopped = false;
bool inner_stopped = true;

auto outer = [&]() -> task<void> {
auto token = co_await this_coro::stop_token;
outer_stopped = token.stop_requested();
inner_stopped = co_await capy::run(ex, override_source.get_token())(
check_stop_requested());
};

run_async(ex, caller_source.get_token())(outer());

BOOST_TEST(outer_stopped);
BOOST_TEST(!inner_stopped);
}

//----------------------------------------------------------
// Allocator Propagation
//----------------------------------------------------------

void
testAllocatorPropagation()
{
Expand Down Expand Up @@ -394,6 +485,9 @@ struct run_test
testStopTokenWithAllocator();
testVoidWithStopToken();
testVoidWithMemoryResource();
testStopTokenInheritance();
testStopTokenOverrideInnerStopped();
testStopTokenOverrideOuterStopped();
testAllocatorPropagation();
testAllocatorPropagationThroughRun();
}
Expand Down
41 changes: 41 additions & 0 deletions test/unit/ex/run_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,46 @@ struct run_async_test
BOOST_TEST(result);
}

void
testScopedCancellation()
{
// Three tasks on the same executor: one with a scoped stop token,
// two with the default (empty) token. Cancelling the scoped token
// should only affect that task, not the others.
std::queue<std::coroutine_handle<>> queue;
queue_executor d(queue);

bool default_1_stopped = true;
bool scoped_stopped = false;
bool default_2_stopped = true;

std::stop_source scoped_source;

run_async(d, [&](bool v) { default_1_stopped = v; })(
check_stop_requested());
run_async(d, scoped_source.get_token(),
[&](bool v) { scoped_stopped = v; })(
check_stop_requested());
run_async(d, [&](bool v) { default_2_stopped = v; })(
check_stop_requested());

BOOST_TEST_EQ(queue.size(), 3u);

// Cancel the scoped source before draining
scoped_source.request_stop();

while(!queue.empty())
{
auto h = queue.front();
queue.pop();
h.resume();
}

BOOST_TEST(!default_1_stopped);
BOOST_TEST(scoped_stopped);
BOOST_TEST(!default_2_stopped);
}

//----------------------------------------------------------
// Allocator Propagation
//----------------------------------------------------------
Expand Down Expand Up @@ -641,6 +681,7 @@ struct run_async_test
// Stop Token
testStopTokenPropagation();
testCancellationVisible();
testScopedCancellation();

// Allocator Propagation
testAllocatorPropagation();
Expand Down
Loading