Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions doc/modules/ROOT/pages/4.coroutines/4d.io-awaitable.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,54 @@ The key points:
2. Use the executor to dispatch completion
3. Respect the stop token for cancellation

=== Stop Callbacks Must Post, Not Resume

When implementing a stoppable awaitable, you may register a `std::stop_callback` to wake the coroutine when cancellation is requested. The callback fires synchronously on whatever thread calls `request_stop()`, which is typically *not* the executor's thread.

[WARNING]
====
*Never resume a coroutine handle directly from a stop_callback.* Doing so executes the coroutine on the wrong thread, corrupting the thread-local frame allocator. This causes use-after-free on the next coroutine allocation—potentially in completely unrelated code.
====

Use `stop_resume_callback` and `io_env::post_resume` from `<boost/capy/ex/io_env.hpp>` to post the resume through the executor:

[source,cpp]
----
#include <boost/capy/ex/io_env.hpp>

struct stoppable_awaitable
{
std::optional<stop_resume_callback> stop_cb_;

bool await_ready() { return false; }

std::coroutine_handle<> await_suspend(
std::coroutine_handle<> h, io_env const* env)
{
if (env->stop_token.stop_requested())
return h; // Already cancelled, resume immediately

// Post through executor when stop is requested
stop_cb_.emplace(env->stop_token, env->post_resume(h));

start_async_operation();
return std::noop_coroutine();
}

void await_resume() { /* ... */ }
};
----

The incorrect pattern—which compiles and appears to work but causes memory corruption—looks like this:

[source,cpp]
----
// WRONG: resumes coroutine on the calling thread
stop_cb_.emplace(env->stop_token, h); // h is a raw coroutine_handle
----

See xref:4.coroutines/4e.cancellation.adoc#stoppable-awaitables[Implementing Stoppable Awaitables] for a complete example.

== Reference

[cols="1,3"]
Expand Down
65 changes: 64 additions & 1 deletion doc/modules/ROOT/pages/4.coroutines/4e.cancellation.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,70 @@ Capy's I/O operations (provided by Corosio) respect stop tokens at the OS level:

When you request stop, pending I/O operations are cancelled at the OS level, providing immediate response rather than waiting for the operation to complete naturally.

== Part 8: Patterns
[[stoppable-awaitables]]
== Part 8: Implementing Stoppable Awaitables

The examples above show *polling* for cancellation with `token.stop_requested()`. For awaitables that suspend indefinitely—waiting for I/O, a lock, or an external event—you need a `std::stop_callback` to wake the coroutine when cancellation arrives.

=== The Dangerous Pattern

A `std::stop_callback` fires synchronously on whatever thread calls `request_stop()`. If the callback resumes the coroutine directly, the coroutine runs on the wrong thread:

[source,cpp]
----
// WRONG — causes use-after-free
std::optional<std::stop_callback<std::coroutine_handle<>>> stop_cb;

std::coroutine_handle<> await_suspend(
std::coroutine_handle<> h, io_env const* env)
{
stop_cb.emplace(env->stop_token, h); // Resumes inline!
return std::noop_coroutine();
}
----

When an external thread calls `request_stop()`, `h.resume()` executes the coroutine on that thread. The coroutine machinery sets the thread-local frame allocator to the executor's allocator—poisoning the calling thread's TLS. When the executor's pool destructs, the TLS pointer becomes dangling. The next coroutine allocation on that thread dereferences freed memory.

This bug is deterministic, not a race condition. It manifests as a heap-use-after-free in *unrelated* code—wherever the next coroutine frame happens to be allocated on the poisoned thread.

=== The Correct Pattern: resume_via_post

Use `stop_resume_callback` and `io_env::post_resume` to post the resume through the executor, ensuring the coroutine runs on the correct thread:

[source,cpp]
----
#include <boost/capy/ex/io_env.hpp>

struct my_stoppable_awaitable
{
std::optional<stop_resume_callback> stop_cb_;
// ... other members for the async operation ...

bool await_ready() { return false; }

std::coroutine_handle<> await_suspend(
std::coroutine_handle<> h, io_env const* env)
{
if (env->stop_token.stop_requested())
return h; // Already cancelled

stop_cb_.emplace(env->stop_token, env->post_resume(h));

start_async_operation(h, env);
return std::noop_coroutine();
}

void await_resume() { /* check result or throw */ }
};
----

`stop_resume_callback` is a type alias for `std::stop_callback<resume_via_post>`. `post_resume(h)` creates a `resume_via_post` callable that posts the coroutine handle through this environment's executor.

When `request_stop()` fires the callback, the coroutine handle is posted to the executor's queue instead of resumed inline. The executor's worker thread picks it up and resumes it in the correct execution context.

NOTE: Capy's built-in I/O awaitables (via Corosio) already use the post-back pattern internally. This guidance applies when writing your own custom awaitables.

== Part 9: Patterns

=== Timeout Pattern

Expand Down
69 changes: 59 additions & 10 deletions doc/unlisted/execution-thread-pool.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,28 +92,68 @@ run_async(pool.get_executor())(compute(), [](int result) {

== Lifetime and Shutdown

The pool destructor waits for all work to complete:
=== Waiting for Work: join()

Call `join()` to block until all outstanding work completes:

[source,cpp]
----
thread_pool pool(4);

for (auto& item : batch)
run_async(pool.get_executor())(process(item));

pool.join(); // Blocks until all tasks finish
// Pool is now stopped; worker threads are joined
----

`join()` releases the pool's internal work guard and blocks until the
outstanding work count (tracked by `on_work_started()` / `on_work_finished()`)
reaches zero. After all work completes, the worker threads are joined.

The pool cannot be reused after `join()`. Calling `join()` more than once
is safe (subsequent calls are no-ops).

=== Immediate Stop: stop()

Call `stop()` to abandon remaining work:

[source,cpp]
----
pool.stop(); // Workers exit after current item; queued work is abandoned
pool.join(); // Wait for threads to finish
----

If `join()` is blocking on another thread, calling `stop()` causes it to
stop waiting for outstanding work. The `join()` call still waits for worker
threads to finish their current item and exit before returning.

=== Destructor Behavior

The destructor calls `stop()` then `join()`:

[source,cpp]
----
{
thread_pool pool(4);
run_async(pool.get_executor())(long_running_task());
// Destructor blocks until long_running_task completes
// Destructor: stop() -> join() -> shutdown services -> destroy services
// Queued work that hasn't started is abandoned
}
----

This ensures orderly shutdown without orphaned coroutines.
To wait for all work to complete before shutdown, call `join()` explicitly
before the pool goes out of scope.

=== Destruction Order

When a pool is destroyed:

1. Threads are signaled to stop accepting new work
2. Pending work continues to completion
1. Workers are signaled to stop (pending work is abandoned)
2. Worker threads are joined
3. Services are shut down (in reverse order of creation)
4. Services are destroyed
5. Threads are joined
5. Remaining queued work items are destroyed

== Executor Operations

Expand Down Expand Up @@ -161,7 +201,9 @@ Since callers are never "inside" the thread pool's execution context,

== Work Tracking

Work tracking keeps the pool alive while operations are outstanding:
Work tracking keeps the pool alive while operations are outstanding.
When `join()` has been called, the pool waits until the outstanding work
count reaches zero before stopping the worker threads.

[source,cpp]
----
Expand All @@ -179,13 +221,14 @@ The `work_guard` RAII wrapper simplifies this:
{
work_guard guard(ex);
// Work count incremented

// ... do work ...

} // Work count decremented
----

`run_async` handles work tracking automatically.
`run_async` handles work tracking automatically — each launched task
holds a `work_guard` for its lifetime.

== Services

Expand Down Expand Up @@ -323,6 +366,12 @@ void process_batch()
| `get_executor()`
| Get an executor for the pool

| `join()`
| Wait for all outstanding work to complete

| `stop()`
| Immediately stop the pool, abandoning queued work

| Services
| Polymorphic components owned by the pool
|===
Expand Down
80 changes: 79 additions & 1 deletion include/boost/capy/ex/io_env.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,39 @@
#include <boost/capy/detail/config.hpp>
#include <boost/capy/ex/executor_ref.hpp>

#include <coroutine>
#include <memory_resource>
#include <stop_token>

namespace boost {
namespace capy {

/** Callable that posts a coroutine handle to an executor.

Use this as the callback type for `std::stop_callback` instead
of a raw `std::coroutine_handle<>`. Raw handles resume the
coroutine inline on whatever thread calls `request_stop()`,
which bypasses the executor and corrupts the thread-local
frame allocator.

Prefer @ref io_env::post_resume and the @ref stop_resume_callback
alias to construct these—see examples there.

@see io_env::post_resume, stop_resume_callback
*/
struct resume_via_post
{
executor_ref ex;
std::coroutine_handle<> h;

// post() must not throw; stop_callback requires a
// non-throwing invocable.
void operator()() const noexcept
{
ex.post(h);
}
};

/** Execution environment for IoAwaitables.

This struct bundles the execution context passed through
Expand All @@ -33,11 +60,27 @@ namespace capy {
chain. Awaitables receive `io_env const*` in `await_suspend`
and should store it directly, never copy the pointed-to object.

@par Stop Callback Contract

Awaitables that register a `std::stop_callback` **must not**
resume the coroutine handle directly. The callback fires
synchronously on the thread that calls `request_stop()`, which
may not be an executor-managed thread. Resuming inline poisons
that thread's TLS frame allocator with the pool's allocator,
causing use-after-free on the next coroutine allocation.

Use @ref io_env::post_resume and @ref stop_resume_callback:
@code
std::optional<stop_resume_callback> stop_cb_;
// In await_suspend:
stop_cb_.emplace(env->stop_token, env->post_resume(h));
@endcode

@par Thread Safety
The referenced executor and allocator must remain valid
for the lifetime of any coroutine using this environment.

@see IoAwaitable, IoRunnable
@see IoAwaitable, IoRunnable, resume_via_post
*/
struct io_env
{
Expand All @@ -52,8 +95,43 @@ struct io_env
When null, the default allocator is used.
*/
std::pmr::memory_resource* frame_allocator = nullptr;

/** Create a resume_via_post callable for this environment.

Convenience method for registering @ref stop_resume_callback
instances. Equivalent to `resume_via_post{executor, h}`.

@par Example
@code
stop_cb_.emplace(env->stop_token, env->post_resume(h));
@endcode

@param h The coroutine handle to post on cancellation.

@return A @ref resume_via_post callable that holds a
non-owning @ref executor_ref and the coroutine handle.
The callable must not outlive the executor it references.

@see resume_via_post, stop_resume_callback
*/
resume_via_post
post_resume(std::coroutine_handle<> h) const noexcept
{
return resume_via_post{executor, h};
}
};

/** Type alias for a stop callback that posts through the executor.

Use this to declare the stop callback member in your awaitable:
@code
std::optional<stop_resume_callback> stop_cb_;
@endcode

@see resume_via_post, io_env::post_resume
*/
using stop_resume_callback = std::stop_callback<resume_via_post>;

} // capy
} // boost

Expand Down
Loading
Loading