Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ else()
tests/main.test.cpp
tests/basics.test.cpp
tests/blocked_by.test.cpp
# tests/cancel.test.cpp
tests/cancel.test.cpp
tests/guards.test.cpp
tests/proxy.test.cpp
)
Expand All @@ -108,6 +108,7 @@ else()
-fno-rtti)

target_link_options(async_unit_test PRIVATE
-g
-fsanitize=address)
target_link_libraries(async_unit_test PRIVATE async_context Boost::ut)

Expand Down
33 changes: 23 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,6 @@ Both pipelines completed successfully!
- **Exception propagation** - Proper exception handling through the coroutine chain
- **Cancellation support** - Clean cancellation with RAII-based resource cleanup

> [!WARNING]
>
> Cancellation support is not implemented yet.

## Requirements

- C++23 compiler with coroutine support
Expand Down Expand Up @@ -361,24 +357,41 @@ exception will be thrown from a call to `.resume()`.

```cpp
async::future<void> may_throw(async::context& p_ctx) {
throw std::runtime_error("error");
co_return;
throw std::runtime_error("error");
co_return;
}

async::future<void> just_calls(async::context& p_ctx) {
co_await may_throw(p_ctx);
co_return;
co_await may_throw(p_ctx);
co_return;
}

simple_context ctx;
auto future = may_throw(ctx);

try {
future.resume();
future.resume();
} catch (const std::runtime_error& e) {
// Handle exception
// Handle exception
}
```

### Avoid operation stacking

Operation stacking is when you load an additional operation into a coroutine's
stack memory before finishing the previous operation. This results in a memory
leak where the previous coroutines frame is no longer accessible and cannot be
deallocated, permanently reducing the memory of the context and preserving the
lifetime of the objects held within. It is UB to allow the context to be
destroyed at this point.

```cpp
my_context ctx;
auto future1 = async_op1(ctx); // ✅ Okay, may create some objects on its stack
auto future2 = async_op2(ctx); // ❌ Memory leak! Don't do this
// UB to allow ctx to be destroyed at this point 😱
```

### Proxy Context for Timeouts

```cpp
Expand Down
63 changes: 57 additions & 6 deletions modules/async_context.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ public:
return m_active_handle == std::noop_coroutine();
}

void cancel();

void resume()
{
// We cannot resume the a coroutine blocked by time.
Expand All @@ -285,7 +287,6 @@ public:
return capacity() - memory_used();
}

// TODO(#40): Perform cancellation on context destruction
virtual ~context() = default;

private:
Expand Down Expand Up @@ -409,8 +410,11 @@ public:

~proxy_context() override
{
// Unshrink parent stack, by setting its range to be the start of its
// stack and the end to be the end of this stack.
// Cancel any operations still on this context
cancel();

// Restore parent stack, by setting its range to be the start of its
// stack and the end of our stack.
m_proxy.parent->m_stack = { m_proxy.parent->m_stack.begin(),
m_stack.end() };
}
Expand Down Expand Up @@ -733,12 +737,25 @@ public:
return m_continuation;
}

void cancel()
{
// Set future state to cancelled
m_cancel(this);
// Pop self off context stack
pop_active_coroutine();
// Destroy promise objects & deallocate memory
std::coroutine_handle<promise_base>::from_promise(*this).destroy();
}

protected:
using cancellation_fn = void(void*);

// Consider m_continuation as the return address of the coroutine. The
// coroutine handle for the coroutine that called and awaited the future that
// generated this promise is stored here.
std::coroutine_handle<> m_continuation = std::noop_coroutine();
class context* m_context; // left uninitialized, compiler should warn me
class context* m_context = nullptr;
cancellation_fn* m_cancel = nullptr;
};

export template<typename T>
Expand Down Expand Up @@ -856,6 +873,12 @@ public:
*promise_return_base<T>::m_future_state = std::current_exception();
}

static void cancel_promise(void* p_self)
{
auto* self = static_cast<promise<T>*>(p_self);
*self->m_future_state = cancelled_state{};
}

constexpr future<T> get_return_object() noexcept;
};

Expand Down Expand Up @@ -886,14 +909,28 @@ public:
* @brief Reports if this async object has finished its operation and now
* contains a value.
*
* @return true - operation is either finished
* @return true - this operation is finished and either contains the value of
* type T, an exception_ptr, or this future is in a cancelled state.
* @return false - operation has yet to completed and does have a value.
*/
[[nodiscard]] constexpr bool done() const
{
return not std::holds_alternative<handle_type>(m_state);
}

void cancel()
{
if (done()) {
return;
}

auto handle = std::get<handle_type>(m_state);
full_handle_type::from_address(handle.address())
.promise()
.get_context()
.cancel();
}

/**
* @brief Reports if this async object has finished with an value
*
Expand Down Expand Up @@ -1021,7 +1058,11 @@ public:
constexpr ~future()
{
if (std::holds_alternative<handle_type>(m_state)) {
std::get<handle_type>(m_state).destroy();
auto handle = std::get<handle_type>(m_state);
full_handle_type::from_address(handle.address())
.promise()
.get_context()
.cancel();
}
}

Expand All @@ -1038,6 +1079,7 @@ private:
{
auto& promise = p_handle.promise();
promise.m_future_state = &m_state;
promise.m_cancel = &promise_type::cancel_promise;
}

future_state<T> m_state{};
Expand All @@ -1058,4 +1100,13 @@ constexpr future<T> promise<T>::get_return_object() noexcept
m_context->active_handle(handle);
return future<T>{ handle };
}

void context::cancel()
{
while (not done()) {
std::coroutine_handle<promise_base>::from_address(m_active_handle.address())
.promise()
.cancel();
}
}
} // namespace async::inline v0
20 changes: 10 additions & 10 deletions test_package/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import async_context;

struct round_robin_scheduler
{
bool run_until_all_done(int p_iterations = 100)
bool run_all_until_done(int p_iterations = 100)
{
for (int i = 0; i < p_iterations; i++) {
bool all_done = true;
Expand Down Expand Up @@ -95,9 +95,9 @@ struct test_context : public async::context
async::future<int> read_sensor(async::context& ctx, std::string_view p_name)
{
using namespace std::chrono_literals;
std::println(" ['{}': Sensor] Starting read...", p_name);
std::println("['{}': Sensor] Starting read...", p_name);
co_await ctx.block_by_io(); // Simulate I/O operation
std::println(" ['{}': Sensor] Read complete: 42", p_name);
std::println("['{}': Sensor] Read complete: 42", p_name);
co_return 42;
}

Expand All @@ -107,10 +107,10 @@ async::future<int> process_data(async::context& ctx,
int value)
{
using namespace std::chrono_literals;
std::println(" ['{}': Process] Processing {}...", p_name, value);
std::println("['{}': Process] Processing {}...", p_name, value);
co_await 10ms; // Simulate processing time
int result = value * 2;
std::println(" ['{}': Process] Result: {}", p_name, result);
std::println("['{}': Process] Result: {}", p_name, result);
co_return result;
}

Expand All @@ -119,9 +119,9 @@ async::future<void> write_actuator(async::context& ctx,
std::string_view p_name,
int value)
{
std::println(" ['{}': Actuator] Writing {}...", p_name, value);
std::println("['{}': Actuator] Writing {}...", p_name, value);
co_await ctx.block_by_io();
std::println(" ['{}': Actuator] Write complete!", p_name);
std::println("['{}': Actuator] Write complete!", p_name);
}

// Coordinates the full pipeline
Expand All @@ -146,10 +146,10 @@ int main()
test_context ctx2(scheduler);

// Run two independent pipelines concurrently
auto pipeline1 = sensor_pipeline(ctx1, "System 1");
auto pipeline2 = sensor_pipeline(ctx2, "System 2");
auto pipeline1 = sensor_pipeline(ctx1, "🌟 System 1");
auto pipeline2 = sensor_pipeline(ctx2, "🔥 System 2");

scheduler->run_until_all_done();
scheduler->run_all_until_done();

assert(pipeline1.done());
assert(pipeline2.done());
Expand Down
Loading
Loading