Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches: [ "**" ]
pull_request:
branches: [ master ]
branches: [ master, develop ]
workflow_dispatch:

jobs:
Expand Down
6 changes: 3 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,12 @@ file(GLOB_RECURSE BOOST_COROSIO_HEADERS CONFIGURE_DEPENDS
"${CMAKE_CURRENT_SOURCE_DIR}/include/boost/corosio/*.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/include/boost/corosio.hpp")
file(GLOB_RECURSE BOOST_COROSIO_SOURCES CONFIGURE_DEPENDS
"${CMAKE_CURRENT_SOURCE_DIR}/src/src/*.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/src/src/*.cpp")
"${CMAKE_CURRENT_SOURCE_DIR}/src/*.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/src/*.cpp")

source_group("" FILES "include/boost/corosio.hpp")
source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}/include/boost/corosio" PREFIX "include" FILES ${BOOST_COROSIO_HEADERS})
source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}/src/src" PREFIX "src" FILES ${BOOST_COROSIO_SOURCES})
source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}/src" PREFIX "src" FILES ${BOOST_COROSIO_SOURCES})

function(boost_corosio_setup_properties target)
target_compile_features(${target} PUBLIC cxx_std_20)
Expand Down
13 changes: 0 additions & 13 deletions include/boost/corosio/acceptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,17 +267,6 @@ class acceptor : public io_object
BOOST_COROSIO_DECL
void cancel();

/** Return the execution context.

@return Reference to the execution context that owns this acceptor.
*/
auto
context() const noexcept ->
capy::execution_context&
{
return *ctx_;
}

struct acceptor_impl : io_object_impl
{
virtual void accept(
Expand All @@ -294,8 +283,6 @@ class acceptor : public io_object
{
return *static_cast<acceptor_impl*>(impl_);
}

capy::execution_context* ctx_;
};

} // namespace corosio
Expand Down
7 changes: 5 additions & 2 deletions src/src/acceptor.cpp → src/acceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
#include <boost/corosio/acceptor.hpp>

#ifdef _WIN32
#include "src/detail/win_iocp_sockets.hpp"
#include "detail/win_iocp_sockets.hpp"
#else
#include "detail/posix_sockets.hpp"
#endif

#include <boost/corosio/detail/except.hpp>
Expand All @@ -25,7 +27,8 @@ namespace {
using acceptor_service = detail::win_iocp_sockets;
using acceptor_impl_type = detail::win_acceptor_impl;
#else
#error "Unsupported platform"
using acceptor_service = detail::posix_sockets;
using acceptor_impl_type = detail::posix_acceptor_impl;
#endif

} // namespace
Expand Down
File renamed without changes.
File renamed without changes.
303 changes: 303 additions & 0 deletions src/detail/posix_op.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
//
// Copyright (c) 2026 Steve Gerbino
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/cppalliance/corosio
//

#ifndef BOOST_COROSIO_DETAIL_POSIX_OP_HPP
#define BOOST_COROSIO_DETAIL_POSIX_OP_HPP

#include <boost/corosio/detail/config.hpp>
#include <boost/corosio/io_object.hpp>
#include <boost/capy/ex/any_dispatcher.hpp>
#include <boost/capy/concept/affine_awaitable.hpp>
#include <boost/capy/ex/any_coro.hpp>
#include <boost/capy/error.hpp>
#include <boost/capy/ex/execution_context.hpp>
#include <boost/system/error_code.hpp>

#include <unistd.h>
#include <errno.h>

#include <atomic>
#include <cstddef>
#include <cstdint>
#include <optional>
#include <stop_token>

#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/uio.h>

namespace boost {
namespace corosio {
namespace detail {

/** Base class for POSIX async operations.

This class is analogous to overlapped_op on Windows.
It stores the coroutine handle, dispatcher, and result
pointers needed to complete an async operation.
*/
struct posix_op : capy::execution_context::handler
{
struct canceller
{
posix_op* op;
void operator()() const noexcept { op->request_cancel(); }
};

capy::any_coro h;
capy::any_dispatcher d;
system::error_code* ec_out = nullptr;
std::size_t* bytes_out = nullptr;

int fd = -1; // Socket file descriptor
std::uint32_t events = 0; // Requested epoll events (EPOLLIN/EPOLLOUT)
int error = 0; // errno on completion
std::size_t bytes_transferred = 0;

std::atomic<bool> cancelled{false};
std::optional<std::stop_callback<canceller>> stop_cb;

posix_op()
{
data_ = this;
}

void reset() noexcept
{
fd = -1;
events = 0;
error = 0;
bytes_transferred = 0;
cancelled.store(false, std::memory_order_relaxed);
}

void operator()() override
{
stop_cb.reset();

if (ec_out)
{
if (cancelled.load(std::memory_order_acquire))
*ec_out = make_error_code(system::errc::operation_canceled);
else if (error != 0)
*ec_out = system::error_code(error, system::system_category());
else if (is_read_operation() && bytes_transferred == 0)
{
// EOF: 0 bytes transferred with no error indicates end of stream
*ec_out = make_error_code(capy::error::eof);
}
}

if (bytes_out)
*bytes_out = bytes_transferred;

d(h).resume();
}

// Returns true if this is a read operation (for EOF detection)
virtual bool is_read_operation() const noexcept { return false; }

void destroy() override
{
stop_cb.reset();
}

void request_cancel() noexcept
{
cancelled.store(true, std::memory_order_release);
}

void start(std::stop_token token)
{
cancelled.store(false, std::memory_order_release);
stop_cb.reset();

if (token.stop_possible())
stop_cb.emplace(token, canceller{this});
}

void complete(int err, std::size_t bytes) noexcept
{
error = err;
bytes_transferred = bytes;
}

/** Called when epoll signals the fd is ready.
Derived classes override this to perform the actual I/O.
Sets error and bytes_transferred appropriately.
*/
virtual void perform_io() noexcept {}
};

inline posix_op*
get_posix_op(capy::execution_context::handler* h) noexcept
{
return static_cast<posix_op*>(h->data());
}

//------------------------------------------------------------------------------

/** Connect operation state. */
struct posix_connect_op : posix_op
{
void perform_io() noexcept override
{
// For connect, check SO_ERROR to see if connection succeeded
int err = 0;
socklen_t len = sizeof(err);
if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
err = errno;
complete(err, 0);
}
};

//------------------------------------------------------------------------------

/** Read operation state with buffer descriptors. */
struct posix_read_op : posix_op
{
static constexpr std::size_t max_buffers = 16;
iovec iovecs[max_buffers];
int iovec_count = 0;

bool is_read_operation() const noexcept override { return true; }

void reset() noexcept
{
posix_op::reset();
iovec_count = 0;
}

void perform_io() noexcept override
{
ssize_t n = ::readv(fd, iovecs, iovec_count);
if (n >= 0)
complete(0, static_cast<std::size_t>(n));
else
complete(errno, 0);
}
};

//------------------------------------------------------------------------------

/** Write operation state with buffer descriptors. */
struct posix_write_op : posix_op
{
static constexpr std::size_t max_buffers = 16;
iovec iovecs[max_buffers];
int iovec_count = 0;

void reset() noexcept
{
posix_op::reset();
iovec_count = 0;
}

void perform_io() noexcept override
{
ssize_t n = ::writev(fd, iovecs, iovec_count);
if (n >= 0)
complete(0, static_cast<std::size_t>(n));
else
complete(errno, 0);
}
};

//------------------------------------------------------------------------------

/** Accept operation state. */
struct posix_accept_op : posix_op
{
int accepted_fd = -1;
io_object::io_object_impl* peer_impl = nullptr;
io_object::io_object_impl** impl_out = nullptr;

// Function to create peer impl - set by posix_sockets
using create_peer_fn = io_object::io_object_impl* (*)(void*, int);
create_peer_fn create_peer = nullptr;
void* service_ptr = nullptr;

void reset() noexcept
{
posix_op::reset();
accepted_fd = -1;
peer_impl = nullptr;
impl_out = nullptr;
// Don't reset create_peer and service_ptr - they're set once
}

void perform_io() noexcept override
{
sockaddr_in addr{};
socklen_t addrlen = sizeof(addr);
int new_fd = ::accept4(fd, reinterpret_cast<sockaddr*>(&addr),
&addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);

if (new_fd >= 0)
{
accepted_fd = new_fd;
if (create_peer && service_ptr)
peer_impl = create_peer(service_ptr, new_fd);
complete(0, 0);
}
else
{
complete(errno, 0);
}
}

void operator()() override
{
stop_cb.reset();

bool success = (error == 0 && !cancelled.load(std::memory_order_acquire));

if (ec_out)
{
if (cancelled.load(std::memory_order_acquire))
*ec_out = make_error_code(system::errc::operation_canceled);
else if (error != 0)
*ec_out = system::error_code(error, system::system_category());
}

if (success && accepted_fd >= 0 && peer_impl)
{
// Pass impl to awaitable for assignment to peer socket
if (impl_out)
*impl_out = peer_impl;
peer_impl = nullptr;
}
else
{
// Cleanup on failure
if (accepted_fd >= 0)
{
::close(accepted_fd);
accepted_fd = -1;
}

if (peer_impl)
{
peer_impl->release();
peer_impl = nullptr;
}

if (impl_out)
*impl_out = nullptr;
}

d(h).resume();
}
};

} // namespace detail
} // namespace corosio
} // namespace boost

#endif
Loading
Loading