Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions Zend/zend_async_API.c
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,16 @@ zend_async_new_iterator_t zend_async_new_iterator_fn = NULL;
/* Context API */
zend_async_new_context_t zend_async_new_context_fn = new_context;

/* Async IO API */
zend_async_io_create_t zend_async_io_create_fn = NULL;
zend_async_io_read_t zend_async_io_read_fn = NULL;
zend_async_io_write_t zend_async_io_write_fn = NULL;
zend_async_io_close_t zend_async_io_close_fn = NULL;
zend_async_io_await_t zend_async_io_await_fn = NULL;
zend_async_io_flush_t zend_async_io_flush_fn = NULL;
zend_async_io_stat_t zend_async_io_stat_fn = NULL;
zend_async_io_seek_t zend_async_io_seek_fn = NULL;

/* Internal Context API - now uses direct functions */

ZEND_API bool zend_async_is_enabled(void)
Expand Down Expand Up @@ -1642,6 +1652,47 @@ ZEND_API bool zend_async_socket_listening_register(
return true;
}

/* Registration lock for async IO */
static zend_atomic_bool io_lock = { 0 };
static char *io_module_name = NULL;

ZEND_API bool zend_async_io_register(char *module, bool allow_override,
zend_async_io_create_t create_fn, zend_async_io_read_t read_fn,
zend_async_io_write_t write_fn, zend_async_io_close_t close_fn,
zend_async_io_await_t await_fn, zend_async_io_flush_t flush_fn,
zend_async_io_stat_t stat_fn, zend_async_io_seek_t seek_fn)
{
if (zend_atomic_bool_exchange(&io_lock, 1)) {
return false;
}

if (io_module_name == module) {
return true;
}

if (io_module_name != NULL && false == allow_override) {
zend_error(E_CORE_ERROR,
"The module %s is trying to override Async IO API, which was registered by "
"the module %s.",
module, io_module_name);
return false;
}

io_module_name = module;
zend_async_io_create_fn = create_fn;
zend_async_io_read_fn = read_fn;
zend_async_io_write_fn = write_fn;
zend_async_io_close_fn = close_fn;
zend_async_io_await_fn = await_fn;
zend_async_io_flush_fn = flush_fn;
zend_async_io_stat_fn = stat_fn;
zend_async_io_seek_fn = seek_fn;

zend_atomic_bool_store(&io_lock, 0);

return true;
}

///////////////////////////////////////////////////////////////
/// Coroutine Switch Handlers Implementation
///////////////////////////////////////////////////////////////
Expand Down
81 changes: 81 additions & 0 deletions Zend/zend_async_API.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "zend_API.h"
#include "zend_globals.h"
#include "zend_stream.h"

#define ZEND_ASYNC_API "TrueAsync API v0.9.0"
#define ZEND_ASYNC_API_VERSION_MAJOR 0
Expand Down Expand Up @@ -91,6 +92,22 @@ typedef struct io_descriptor_s {
io_descriptor_type type;
} io_descriptor_t;

/* Async IO API — abstract I/O handle for pipes and files */

typedef enum {
ZEND_ASYNC_IO_TYPE_PIPE,
ZEND_ASYNC_IO_TYPE_FILE
} zend_async_io_type;

#define ZEND_ASYNC_IO_READABLE (1 << 0)
#define ZEND_ASYNC_IO_WRITABLE (1 << 1)
#define ZEND_ASYNC_IO_CLOSED (1 << 2)
#define ZEND_ASYNC_IO_EOF (1 << 3)
#define ZEND_ASYNC_IO_APPEND (1 << 4)

typedef struct _zend_async_io_s zend_async_io_t;
typedef struct _zend_async_io_req_s zend_async_io_req_t;

/**
* php_exec
* If type==0, only last line of output is returned (exec)
Expand Down Expand Up @@ -353,6 +370,17 @@ typedef void (*zend_async_task_run_t)(zend_async_task_t *task);

typedef void (*zend_async_microtask_handler_t)(zend_async_microtask_t *microtask);

/* Async IO function pointer types */
typedef zend_async_io_t *(*zend_async_io_create_t)(
zend_file_descriptor_t fd, zend_async_io_type type, uint32_t state);
typedef zend_async_io_req_t *(*zend_async_io_read_t)(zend_async_io_t *io, size_t max_size);
typedef zend_async_io_req_t *(*zend_async_io_write_t)(zend_async_io_t *io, const char *buf, size_t count);
typedef int (*zend_async_io_close_t)(zend_async_io_t *io);
typedef int (*zend_async_io_await_t)(zend_async_io_t *io, uint32_t events, struct timeval *timeout);
typedef zend_async_io_req_t *(*zend_async_io_flush_t)(zend_async_io_t *io);
typedef zend_async_io_req_t *(*zend_async_io_stat_t)(zend_async_io_t *io, zend_stat_t *buf);
typedef void (*zend_async_io_seek_t)(zend_async_io_t *io, zend_off_t offset);

struct _zend_fcall_s {
zend_fcall_info fci;
zend_fcall_info_cache fci_cache;
Expand Down Expand Up @@ -549,6 +577,26 @@ struct _zend_async_event_s {
zend_async_event_callbacks_notify_t notify_handler;
};

/* Async IO handle — full definition (requires zend_async_event_t) */
struct _zend_async_io_s {
zend_async_event_t event;
zend_file_descriptor_t fd;
zend_async_io_type type;
uint32_t state;
};

/* Async IO request — one-shot operation request */
struct _zend_async_io_req_s {
union {
ssize_t result;
ssize_t transferred;
};
zend_object *exception;
char *buf;
bool completed;
void (*dispose)(zend_async_io_req_t *req);
};

/**
* Event reference. A special data structure that allows representing an object with the Awaitable
* interface, but which does not store the event directly—instead, it holds only a reference to it.
Expand Down Expand Up @@ -1620,6 +1668,16 @@ ZEND_API extern zend_async_queue_task_t zend_async_queue_task_fn;
/* Trigger Event API */
ZEND_API extern zend_async_new_trigger_event_t zend_async_new_trigger_event_fn;

/* Async IO API */
ZEND_API extern zend_async_io_create_t zend_async_io_create_fn;
ZEND_API extern zend_async_io_read_t zend_async_io_read_fn;
ZEND_API extern zend_async_io_write_t zend_async_io_write_fn;
ZEND_API extern zend_async_io_close_t zend_async_io_close_fn;
ZEND_API extern zend_async_io_await_t zend_async_io_await_fn;
ZEND_API extern zend_async_io_flush_t zend_async_io_flush_fn;
ZEND_API extern zend_async_io_stat_t zend_async_io_stat_fn;
ZEND_API extern zend_async_io_seek_t zend_async_io_seek_fn;

ZEND_API bool zend_async_scheduler_register(char *module, bool allow_override,
zend_async_scheduler_launch_t scheduler_launch_fn,
zend_async_new_coroutine_t new_coroutine_fn, zend_async_new_scope_t new_scope_fn,
Expand Down Expand Up @@ -1667,6 +1725,12 @@ ZEND_API void zend_async_pool_api_register(
ZEND_API bool zend_async_socket_listening_register(
char *module, bool allow_override, zend_async_socket_listen_t socket_listen_fn);

ZEND_API bool zend_async_io_register(char *module, bool allow_override,
zend_async_io_create_t create_fn, zend_async_io_read_t read_fn,
zend_async_io_write_t write_fn, zend_async_io_close_t close_fn,
zend_async_io_await_t await_fn, zend_async_io_flush_t flush_fn,
zend_async_io_stat_t stat_fn, zend_async_io_seek_t seek_fn);

ZEND_API zend_string *zend_coroutine_gen_info(
zend_coroutine_t *coroutine, char *zend_coroutine_name);

Expand Down Expand Up @@ -1831,6 +1895,13 @@ END_EXTERN_C()

#define ZEND_ASYNC_SCHEDULER_LAUNCH() zend_async_scheduler_launch_fn()

#define ZEND_ASYNC_SCHEDULER_INIT() \
do { \
if (UNEXPECTED(ZEND_ASYNC_CURRENT_COROUTINE == NULL)) { \
zend_async_scheduler_launch_fn(); \
} \
} while (0)

#define ZEND_ASYNC_REACTOR_IS_ENABLED() zend_async_reactor_is_enabled()
#define ZEND_ASYNC_REACTOR_STARTUP() zend_async_reactor_startup_fn()
#define ZEND_ASYNC_REACTOR_SHUTDOWN() zend_async_reactor_shutdown_fn()
Expand Down Expand Up @@ -1904,6 +1975,16 @@ END_EXTERN_C()
#define ZEND_ASYNC_SOCKET_LISTEN_EX(host, port, backlog, extra_size) \
zend_async_socket_listen_fn(host, port, backlog, extra_size)

/* Async IO API Macros */
#define ZEND_ASYNC_IO_CREATE(fd, type, state) zend_async_io_create_fn(fd, type, state)
#define ZEND_ASYNC_IO_READ(io, max_size) zend_async_io_read_fn(io, max_size)
#define ZEND_ASYNC_IO_WRITE(io, buf, count) zend_async_io_write_fn(io, buf, count)
#define ZEND_ASYNC_IO_CLOSE(io) zend_async_io_close_fn(io)
#define ZEND_ASYNC_IO_AWAIT(io, events, tv) zend_async_io_await_fn(io, events, tv)
#define ZEND_ASYNC_IO_FLUSH(io) zend_async_io_flush_fn(io)
#define ZEND_ASYNC_IO_STAT(io, buf) zend_async_io_stat_fn(io, buf)
#define ZEND_ASYNC_IO_SEEK(io, offset) zend_async_io_seek_fn(io, offset)

/* Iterator API Macros */
#define ZEND_ASYNC_NEW_ITERATOR_SCOPE( \
array, zend_iterator, fcall, handler, scope, concurrency, priority) \
Expand Down
5 changes: 4 additions & 1 deletion ext/standard/streamsfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,10 @@ PHP_FUNCTION(stream_select)
tv_p_async = &tv_async;
}
retval = network_async_stream_select(r_array, w_array, e_array, tv_p_async);
RETURN_LONG(retval >= 0 ? retval : 0);
if (retval != -2) {
RETURN_LONG(retval >= 0 ? retval : 0);
}
/* retval == -2: stream does not support async poll, fall through to regular select() */
}

FD_ZERO(&rfds);
Expand Down
4 changes: 2 additions & 2 deletions main/network_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -943,8 +943,8 @@ static zend_always_inline bool process_stream_array(
*result = -1;
return false;
} else if (UNEXPECTED(poll_event == NULL)) {
zend_throw_error(NULL, "Stream does not support async I/O");
*result = -1;
/* Stream does not support async poll — signal fallback to regular select() */
*result = -2;
return false;
}

Expand Down
5 changes: 4 additions & 1 deletion main/php_streams.h
Original file line number Diff line number Diff line change
Expand Up @@ -512,10 +512,13 @@ END_EXTERN_C()

#define php_stream_sync_supported(stream) (_php_stream_set_option((stream), PHP_STREAM_OPTION_SYNC_API, PHP_STREAM_SYNC_SUPPORTED, NULL) == PHP_STREAM_OPTION_RETURN_OK ? 1 : 0)

/* Get or create async event handle for socket streams.
/* Get or create async event handle for socket streams.
* value = events mask, ptrparam = zend_async_poll_event_t** */
#define PHP_STREAM_OPTION_ASYNC_EVENT_HANDLE 15

/* Align internal stream position after external fd manipulation (e.g. copy_file_range).
* ptrparam = zend_off_t* pointing to the new position */
#define PHP_STREAM_OPTION_ALIGN_POSITION 16

#define PHP_STREAM_OPTION_RETURN_OK 0 /* option set OK */
#define PHP_STREAM_OPTION_RETURN_ERR -1 /* problem setting option */
Expand Down
Loading
Loading