Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
** xref:7.examples/7h.custom-dynamic-buffer.adoc[Custom Dynamic Buffer]
** xref:7.examples/7i.echo-server-corosio.adoc[Echo Server with Corosio]
** xref:7.examples/7j.stream-pipeline.adoc[Stream Pipeline]
** xref:7.examples/7k.strand-serialization.adoc[Strand Serialization]
** xref:7.examples/7l.async-mutex.adoc[Async Mutex]
** xref:7.examples/7m.parallel-tasks.adoc[Parallel Tasks]
** xref:7.examples/7n.custom-executor.adoc[Custom Executor]
* xref:8.design/8.intro.adoc[Design]
** xref:8.design/8a.CapyLayering.adoc[Layered Abstractions]
** xref:8.design/8b.Separation.adoc[Why Capy Is Separate]
Expand Down
205 changes: 77 additions & 128 deletions doc/modules/ROOT/pages/7.examples/7i.echo-server-corosio.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ A complete echo server using Corosio for real network I/O.
== What You Will Learn

* Integrating Capy with Corosio networking
* Accepting TCP connections
* Accepting TCP connections with `tcp_acceptor`
* Handling multiple clients concurrently

== Prerequisites
Expand All @@ -22,132 +22,76 @@ A complete echo server using Corosio for real network I/O.
#include <boost/corosio.hpp>
#include <iostream>

namespace corosio = boost::corosio;
using namespace boost::capy;
namespace tcp = boost::corosio::tcp;

// Echo handler: receives data and sends it back
task<> echo_session(any_stream& stream, std::string client_info)
task<> echo_session(corosio::tcp_socket sock)
{
std::cout << "[" << client_info << "] Session started\n";

char buffer[1024];
std::size_t total_bytes = 0;

try
{
for (;;)
{
// Read some data
auto [ec, n] = co_await stream.read_some(mutable_buffer(buffer));

if (ec == cond::eof)
{
std::cout << "[" << client_info << "] Client disconnected\n";
break;
}

if (ec.failed())
{
std::cout << "[" << client_info << "] Read error: "
<< ec.message() << "\n";
break;
}

total_bytes += n;

// Echo it back
auto [wec, wn] = co_await write(stream, const_buffer(buffer, n));

if (wec.failed())
{
std::cout << "[" << client_info << "] Write error: "
<< wec.message() << "\n";
break;
}
}
}
catch (std::exception const& e)
char buf[1024];

for (;;)
{
std::cout << "[" << client_info << "] Exception: " << e.what() << "\n";
auto [ec, n] = co_await sock.read_some(
mutable_buffer(buf, sizeof(buf)));

if (ec)
break;

auto [wec, wn] = co_await write(
sock, const_buffer(buf, n));

if (wec)
break;
}

std::cout << "[" << client_info << "] Session ended, "
<< total_bytes << " bytes echoed\n";

sock.close();
}

// Accept loop: accepts connections and spawns handlers
task<> accept_loop(tcp::acceptor& acceptor, executor_ref ex)
task<> accept_loop(
corosio::tcp_acceptor& acc,
corosio::io_context& ioc)
{
std::cout << "Server listening on port "
<< acceptor.local_endpoint().port() << "\n";

int connection_id = 0;

auto ep = acc.local_endpoint();
std::cout << "Listening on port " << ep.port() << "\n";

for (;;)
{
// Accept a connection
auto [ec, socket] = co_await acceptor.async_accept();
if (ec.failed())
corosio::tcp_socket peer(ioc);
auto [ec] = co_await acc.accept(peer);

if (ec)
{
std::cout << "Accept error: " << ec.message() << "\n";
continue;
}

// Build client info string
auto remote = socket.remote_endpoint();
std::string client_info =
std::to_string(++connection_id) + ":" +
remote.address().to_string() + ":" +
std::to_string(remote.port());

std::cout << "[" << client_info << "] Connection accepted\n";

// Wrap socket and spawn handler
// Note: socket ownership transfers to the lambda
run_async(ex)(
[](tcp::socket sock, std::string info) -> task<> {
any_stream stream{sock};
co_await echo_session(stream, std::move(info));
}(std::move(socket), std::move(client_info))
);

auto remote = peer.remote_endpoint();
std::cout << "Connection from ";
if (remote.is_v4())
std::cout << remote.v4_address();
else
std::cout << remote.v6_address();
std::cout << ":" << remote.port() << "\n";

run_async(ioc.get_executor())(
echo_session(std::move(peer)));
}
}

int main(int argc, char* argv[])
{
try
{
// Parse port from command line
unsigned short port = 8080;
if (argc > 1)
port = static_cast<unsigned short>(std::stoi(argv[1]));

// Create I/O context and thread pool
boost::corosio::io_context ioc;
thread_pool pool(4);

// Create acceptor
tcp::endpoint endpoint(tcp::v4(), port);
tcp::acceptor acceptor(ioc, endpoint);
acceptor.set_option(tcp::acceptor::reuse_address(true));

std::cout << "Starting echo server...\n";

// Run accept loop
run_async(pool.get_executor())(
accept_loop(acceptor, pool.get_executor())
);

// Run the I/O context (this blocks)
ioc.run();
}
catch (std::exception const& e)
{
std::cerr << "Error: " << e.what() << "\n";
return 1;
}

unsigned short port = 8080;
if (argc > 1)
port = static_cast<unsigned short>(std::atoi(argv[1]));

corosio::io_context ioc;
corosio::tcp_acceptor acc(ioc, corosio::endpoint(port));

run_async(ioc.get_executor())(
accept_loop(acc, ioc));

ioc.run();

return 0;
}
----
Expand All @@ -156,10 +100,8 @@ int main(int argc, char* argv[])

[source,cmake]
----
find_package(Corosio REQUIRED)

add_executable(echo_server echo_server.cpp)
target_link_libraries(echo_server PRIVATE capy Corosio::corosio)
target_link_libraries(echo_server PRIVATE Boost::capy Boost::corosio)
----

== Walkthrough
Expand All @@ -168,47 +110,56 @@ target_link_libraries(echo_server PRIVATE capy Corosio::corosio)

[source,cpp]
----
tcp::endpoint endpoint(tcp::v4(), port);
tcp::acceptor acceptor(ioc, endpoint);
corosio::io_context ioc;
corosio::tcp_acceptor acc(ioc, corosio::endpoint(port));
----

The acceptor listens for incoming connections on the specified port.
The `io_context` drives all asynchronous I/O. The `tcp_acceptor` listens on the specified port. Corosio uses a flat namespace -- types like `tcp_socket`, `tcp_acceptor`, and `endpoint` live directly in `boost::corosio`.

=== Accept Loop

[source,cpp]
----
for (;;)
{
auto [ec, socket] = co_await acceptor.async_accept();
corosio::tcp_socket peer(ioc);
auto [ec] = co_await acc.accept(peer);
// ... handle connection ...
}
----

The accept loop runs forever, accepting connections and spawning handlers. Each connection runs in its own task.
The accept loop runs forever, creating a new `tcp_socket` for each connection. `acc.accept(peer)` suspends the coroutine until a client connects.

=== Type Erasure
=== Echo Session

[source,cpp]
----
any_stream stream{sock};
co_await echo_session(stream, std::move(info));
auto [ec, n] = co_await sock.read_some(
mutable_buffer(buf, sizeof(buf)));
// ...
auto [wec, wn] = co_await write(
sock, const_buffer(buf, n));
----

The `echo_session` function accepts `any_stream&`. The concrete `tcp::socket` is wrapped at the call site. This keeps the echo logic transport-independent.
Each session reads data with `read_some` and writes it back with `write`. When the client disconnects, `read_some` returns an error and the loop exits.

=== Concurrent Clients

Each client connection spawns a new task via `run_async`. Multiple clients are handled concurrently on the thread pool.
[source,cpp]
----
run_async(ioc.get_executor())(
echo_session(std::move(peer)));
----

Each accepted connection moves the socket into a new task via `run_async`. The coroutine owns the socket for the lifetime of the session. Multiple clients are handled concurrently on the same `io_context`.

== Testing

Start the server:

----
$ ./echo_server 8080
Starting echo server...
Server listening on port 8080
Listening on port 8080
----

Connect with netcat:
Expand All @@ -225,10 +176,8 @@ World
Server output:

----
[1:127.0.0.1:54321] Connection accepted
[1:127.0.0.1:54321] Session started
[1:127.0.0.1:54321] Client disconnected
[1:127.0.0.1:54321] Session ended, 12 bytes echoed
Listening on port 8080
Connection from 127.0.0.1:54321
----

== Exercises
Expand All @@ -239,4 +188,4 @@ Server output:

== Next Steps

* xref:7.examples/7j.stream-pipeline.adoc[Stream Pipeline] Data transformation chains
* xref:7.examples/7j.stream-pipeline.adoc[Stream Pipeline] -- Data transformation chains
7 changes: 7 additions & 0 deletions doc/modules/ROOT/pages/7.examples/7j.stream-pipeline.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ Output (52 bytes):
2. Implement a ROT13 transform
3. Create a filtering stage that drops lines matching a pattern

== Next Steps

* xref:7.examples/7k.strand-serialization.adoc[Strand Serialization] -- Lock-free shared state with strands

== Summary

This example catalog demonstrated:
Expand All @@ -406,5 +410,8 @@ This example catalog demonstrated:
* Custom buffer implementations
* Real network I/O with Corosio
* Data transformation pipelines
* Strand-based serialization and async mutexes
* Parallel task distribution across thread pools
* Custom executor implementations

These patterns form the foundation for building robust, efficient I/O applications with Capy.
Loading