Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **stream_select() ignoring PHP-buffered data in async context**: When `fgets()`/`fread()` pulled more data into PHP's internal stream buffer than returned, a subsequent `stream_select()` would not detect the buffered data because the async path (libuv poll) only checks OS-level file descriptors. This caused hangs in `run-tests.php -j` parallel workers on macOS where TCP delivered multiple messages in a single segment. Fixed by checking `stream_array_emulate_read_fd_set()` before entering the async poll path.

### Added
- **TCP/UDP Socket I/O**: Efficient non-blocking TCP/UDP socket functions without poll overhead via libuv handles. Includes `sendto`/`recvfrom` for UDP, socket options API (`broadcast`, `multicast`, TCP `nodelay`/`keepalive`), and unified close callback for all I/O handle types.
- **Async File and Pipe I/O**: Non-blocking I/O for plain files and pipes via `php_stdiop_read`/`php_stdiop_write` async path. Supported functions: `fread`, `fwrite`, `fseek`, `ftell`, `rewind`, `fgets`, `fgetc`, `fgetcsv`, `fputcsv`, `ftruncate`, `fflush`, `fscanf`, `file_get_contents`, `file_put_contents`, `file()`, `copy`, `tmpfile`, `readfile`, `fpassthru`, `stream_get_contents`, `stream_copy_to_stream`
- **Async IO Seek API**: `ZEND_ASYNC_IO_SEEK` for syncing libuv file offset after `fseek`/`rewind`
- **Async IO Append Flag**: `ZEND_ASYNC_IO_APPEND` flag for correct append-mode file offset initialization
Expand Down
273 changes: 261 additions & 12 deletions libuv_reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -2584,7 +2584,7 @@ zend_async_trigger_event_t *libuv_new_trigger_event(size_t extra_size)
/// Async IO API
/////////////////////////////////////////////////////////////////////////////////

static void io_pipe_close_cb(uv_handle_t *pipe_handle);
static void io_close_cb(uv_handle_t *pipe_handle);

/* {{{ IO event methods */
static bool libuv_io_event_start(zend_async_event_t *event)
Expand Down Expand Up @@ -2625,7 +2625,17 @@ static bool libuv_io_event_dispose(zend_async_event_t *event)
&& !(io->base.state & ZEND_ASYNC_IO_CLOSED)) {
io->base.state |= ZEND_ASYNC_IO_CLOSED;
io->handle.pipe.data = io;
uv_close((uv_handle_t *) &io->handle.pipe, io_pipe_close_cb);
uv_close((uv_handle_t *) &io->handle.pipe, io_close_cb);
} else if (io->base.type == ZEND_ASYNC_IO_TYPE_TCP
&& !(io->base.state & ZEND_ASYNC_IO_CLOSED)) {
io->base.state |= ZEND_ASYNC_IO_CLOSED;
io->handle.tcp.data = io;
uv_close((uv_handle_t *) &io->handle.tcp, io_close_cb);
} else if (io->base.type == ZEND_ASYNC_IO_TYPE_UDP
&& !(io->base.state & ZEND_ASYNC_IO_CLOSED)) {
io->base.state |= ZEND_ASYNC_IO_CLOSED;
io->handle.udp.data = io;
uv_close((uv_handle_t *) &io->handle.udp, io_close_cb);
} else if (io->base.type == ZEND_ASYNC_IO_TYPE_FILE) {
pefree(io, 0);
}
Expand Down Expand Up @@ -2721,7 +2731,7 @@ static void io_pipe_write_cb(uv_write_t *write_request, int status)
IF_EXCEPTION_STOP_REACTOR;
}

static void io_pipe_close_cb(uv_handle_t *pipe_handle)
static void io_close_cb(uv_handle_t *pipe_handle)
{
pefree(pipe_handle->data, 0);
}
Expand Down Expand Up @@ -2829,15 +2839,25 @@ static zend_async_io_t *libuv_io_create(

async_io_t *io = pecalloc(1, sizeof(async_io_t), 0);

io->base.descriptor.fd = fd;
io->base.type = type;
io->base.state = state;

/* Set descriptor based on type */
if (type == ZEND_ASYNC_IO_TYPE_TCP || type == ZEND_ASYNC_IO_TYPE_UDP) {
io->base.descriptor.socket = (zend_socket_t) fd;
#ifdef PHP_WIN32
io->crt_fd = (int)(intptr_t) fd;
#else
io->crt_fd = (int) fd;
#endif
} else {
io->base.descriptor.fd = fd;
#ifdef PHP_WIN32
io->crt_fd = (int)(intptr_t) fd;
io->crt_fd = (int)(intptr_t) fd;
#else
io->crt_fd = (int) fd;
io->crt_fd = (int) fd;
#endif
}

io->base.type = type;
io->base.state = state;

io->base.event.ref_count = 1;
io->base.event.add_callback = libuv_add_callback;
Expand All @@ -2860,12 +2880,33 @@ static zend_async_io_t *libuv_io_create(
if (UNEXPECTED(error < 0)) {
async_throw_error("Failed to open pipe handle: %s", uv_strerror(error));
io->handle.pipe.data = io;
uv_close((uv_handle_t *) &io->handle.pipe, io_pipe_close_cb);
uv_close((uv_handle_t *) &io->handle.pipe, io_close_cb);
return NULL;
}

io->handle.pipe.data = io;
} else if (type == ZEND_ASYNC_IO_TYPE_TCP) {
int error = uv_tcp_init(UVLOOP, &io->handle.tcp);

if (UNEXPECTED(error < 0)) {
async_throw_error("Failed to initialize TCP handle: %s", uv_strerror(error));
pefree(io, 0);
return NULL;
}

io->handle.tcp.data = io;
} else if (type == ZEND_ASYNC_IO_TYPE_UDP) {
int error = uv_udp_init(UVLOOP, &io->handle.udp);

if (UNEXPECTED(error < 0)) {
async_throw_error("Failed to initialize UDP handle: %s", uv_strerror(error));
pefree(io, 0);
return NULL;
}

io->handle.udp.data = io;
} else {
/* FILE type */
if (state & ZEND_ASYNC_IO_APPEND) {
const zend_off_t end = zend_lseek(io->crt_fd, 0, SEEK_END);
io->handle.file.offset = (end >= 0) ? end : 0;
Expand Down Expand Up @@ -3126,7 +3167,7 @@ static int libuv_io_close(zend_async_io_t *io_base)
uv_read_stop((uv_stream_t *) &io->handle.pipe);
zend_async_callbacks_free(&io->base.event);
io->handle.pipe.data = io;
uv_close((uv_handle_t *) &io->handle.pipe, io_pipe_close_cb);
uv_close((uv_handle_t *) &io->handle.pipe, io_close_cb);
return 1;
}

Expand Down Expand Up @@ -3508,6 +3549,213 @@ zend_async_listen_event_t *libuv_socket_listen(const char *host, int port, int b

/* }}} */

/* {{{ UDP callbacks and functions */

static void udp_send_cb(uv_udp_send_t *send_req, int status)
{
async_udp_req_t *req = (async_udp_req_t *) send_req->data;

if (status < 0) {
req->base.exception = async_throw_input_output("UDP send failed: %s", uv_strerror(status));
} else {
req->base.transferred = (ssize_t) req->max_size;
}

req->base.completed = true;
ZEND_ASYNC_CALLBACKS_NOTIFY(&req->io->base.event, &req->base, req->base.exception);
}

static void udp_recv_alloc_cb(uv_handle_t *udp_handle, size_t suggested_size, uv_buf_t *output)
{
const async_io_t *io = (async_io_t *) udp_handle->data;
async_udp_req_t *req = (async_udp_req_t *) io->active_req;

if (UNEXPECTED(req == NULL || req->base.buf == NULL)) {
output->base = NULL;
output->len = 0;
return;
}

output->base = req->base.buf;
output->len = (unsigned int) req->max_size;
}

static void udp_recv_cb(uv_udp_t *udp_handle, ssize_t nread, const uv_buf_t *buf,
const struct sockaddr *addr, unsigned flags)
{
async_io_t *io = (async_io_t *) udp_handle->data;
async_udp_req_t *req = (async_udp_req_t *) io->active_req;

if (UNEXPECTED(req == NULL)) {
return;
}

uv_udp_recv_stop(udp_handle);
io->active_req = NULL;

if (nread < 0) {
req->base.exception = async_throw_input_output("UDP recv failed: %s", uv_strerror((int) nread));
} else if (nread == 0) {
/* Empty datagram or EAGAIN */
return;
} else {
req->base.transferred = nread;
if (addr != NULL) {
memcpy(&req->base.addr, addr, sizeof(struct sockaddr_storage));
req->base.addr_len = (addr->sa_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
}
}

req->base.completed = true;
ZEND_ASYNC_CALLBACKS_NOTIFY(&io->base.event, &req->base, req->base.exception);
}

static void udp_req_dispose(zend_async_udp_req_t *base_req)
{
async_udp_req_t *req = (async_udp_req_t *) base_req;

if (req->base.buf != NULL) {
pefree(req->base.buf, 0);
}

if (req->base.exception != NULL) {
zend_object_release(req->base.exception);
}

pefree(req, 0);
}

static zend_async_udp_req_t *libuv_udp_sendto(zend_async_io_t *io_base, const char *buf,
size_t count, const struct sockaddr *addr, socklen_t addr_len)
{
async_io_t *io = (async_io_t *) io_base;

if (UNEXPECTED(io->base.state & ZEND_ASYNC_IO_CLOSED)) {
async_throw_error("Cannot send to closed UDP socket");
return NULL;
}

if (UNEXPECTED(io->base.type != ZEND_ASYNC_IO_TYPE_UDP)) {
async_throw_error("IO handle is not UDP type");
return NULL;
}

async_udp_req_t *req = pecalloc(1, sizeof(async_udp_req_t), 0);
req->base.dispose = udp_req_dispose;
req->io = io;
req->max_size = count;

/* Copy address to request */
memcpy(&req->base.addr, addr, addr_len);
req->base.addr_len = addr_len;

/* Prepare buffer */
uv_buf_t uv_buf = uv_buf_init((char *) buf, (unsigned int) count);

req->send_req.data = req;
const int error = uv_udp_send(&req->send_req, &io->handle.udp, &uv_buf, 1, addr, udp_send_cb);

if (UNEXPECTED(error < 0)) {
async_throw_error("Failed to start UDP send: %s", uv_strerror(error));
udp_req_dispose(&req->base);
return NULL;
}

return &req->base;
}

static zend_async_udp_req_t *libuv_udp_recvfrom(zend_async_io_t *io_base, size_t max_size)
{
async_io_t *io = (async_io_t *) io_base;

if (UNEXPECTED(io->base.state & ZEND_ASYNC_IO_CLOSED)) {
async_throw_error("Cannot receive from closed UDP socket");
return NULL;
}

if (UNEXPECTED(io->base.type != ZEND_ASYNC_IO_TYPE_UDP)) {
async_throw_error("IO handle is not UDP type");
return NULL;
}

async_udp_req_t *req = pecalloc(1, sizeof(async_udp_req_t), 0);
req->base.dispose = udp_req_dispose;
req->io = io;
req->max_size = max_size;
req->base.buf = pemalloc(max_size, 0);

io->active_req = (async_io_req_t *) req;

const int error = uv_udp_recv_start(&io->handle.udp, udp_recv_alloc_cb, udp_recv_cb);

if (UNEXPECTED(error < 0)) {
io->active_req = NULL;
async_throw_error("Failed to start UDP recv: %s", uv_strerror(error));
udp_req_dispose(&req->base);
return NULL;
}

return &req->base;
}

static int libuv_io_set_option(zend_async_io_t *io_base, zend_async_socket_option_t option, int value)
{
async_io_t *io = (async_io_t *) io_base;
int result = 0;

switch (option) {
case ZEND_ASYNC_SOCKET_OPT_BROADCAST:
if (io->base.type == ZEND_ASYNC_IO_TYPE_UDP) {
result = uv_udp_set_broadcast(&io->handle.udp, value);
}
break;
case ZEND_ASYNC_SOCKET_OPT_MULTICAST_LOOP:
if (io->base.type == ZEND_ASYNC_IO_TYPE_UDP) {
result = uv_udp_set_multicast_loop(&io->handle.udp, value);
}
break;
case ZEND_ASYNC_SOCKET_OPT_MULTICAST_TTL:
if (io->base.type == ZEND_ASYNC_IO_TYPE_UDP) {
result = uv_udp_set_multicast_ttl(&io->handle.udp, value);
}
break;
case ZEND_ASYNC_SOCKET_OPT_TTL:
if (io->base.type == ZEND_ASYNC_IO_TYPE_UDP) {
result = uv_udp_set_ttl(&io->handle.udp, value);
}
break;
case ZEND_ASYNC_SOCKET_OPT_NODELAY:
if (io->base.type == ZEND_ASYNC_IO_TYPE_TCP) {
result = uv_tcp_nodelay(&io->handle.tcp, value);
}
break;
case ZEND_ASYNC_SOCKET_OPT_KEEPALIVE:
if (io->base.type == ZEND_ASYNC_IO_TYPE_TCP) {
result = uv_tcp_keepalive(&io->handle.tcp, value, 60);
}
break;
default:
return -1;
}

return result;
}

static int libuv_udp_set_membership(zend_async_io_t *io_base, const char *multicast_addr,
const char *interface_addr, bool join)
{
async_io_t *io = (async_io_t *) io_base;

if (io->base.type != ZEND_ASYNC_IO_TYPE_UDP) {
return -1;
}

uv_membership membership = join ? UV_JOIN_GROUP : UV_LEAVE_GROUP;
return uv_udp_set_membership(&io->handle.udp, multicast_addr, interface_addr, membership);
}

/* }}} */

void async_libuv_reactor_register(void)
{
zend_async_reactor_register(LIBUV_REACTOR_NAME,
Expand Down Expand Up @@ -3536,5 +3784,6 @@ void async_libuv_reactor_register(void)
zend_async_io_register(LIBUV_REACTOR_NAME, false,
libuv_io_create, libuv_io_read, libuv_io_write,
libuv_io_close, libuv_io_await, libuv_io_flush, libuv_io_stat,
libuv_io_seek, NULL, NULL, NULL, NULL);
libuv_io_seek, libuv_udp_sendto, libuv_udp_recvfrom,
libuv_io_set_option, libuv_udp_set_membership);
}
16 changes: 14 additions & 2 deletions libuv_reactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
#ifndef LIBUV_REACTOR_H
#define LIBUV_REACTOR_H

#define LIBUV_REACTOR_VERSION "0.5.0"
#define LIBUV_REACTOR_NAME "Libuv Reactor 0.5.0"
#define LIBUV_REACTOR_VERSION "0.8.0"
#define LIBUV_REACTOR_NAME "Libuv Reactor 0.8.0"
#include <Zend/zend_async_API.h>

#ifdef PHP_WIN32
Expand Down Expand Up @@ -123,6 +123,8 @@ struct _async_io_t
async_io_req_t *active_req;
union {
uv_pipe_t pipe;
uv_tcp_t tcp;
uv_udp_t udp;
struct {
zend_off_t offset;
} file;
Expand All @@ -140,6 +142,16 @@ struct _async_io_req_t
};
};

typedef struct _async_udp_req_t async_udp_req_t;

struct _async_udp_req_t
{
zend_async_udp_req_t base;
async_io_t *io;
size_t max_size;
uv_udp_send_t send_req;
};

void async_libuv_reactor_register(void);

#endif // LIBUV_REACTOR_H
Loading