Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 191 additions & 12 deletions ext/pgsql/pgsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "zend_attributes.h"
#include "zend_interfaces.h"
#include "php_network.h"
#include "zend_async_API.h"

#ifdef HAVE_PGSQL

Expand Down Expand Up @@ -294,6 +295,181 @@ static zend_string *_php_pgsql_trim_message(const char *message)
zend_string_release(msgbuf); \
} \

/* Concurrent execution helpers */

/**
* Await PostgreSQL socket readiness in coroutine context.
*
* This function suspends the current coroutine until the PostgreSQL connection socket
* becomes ready for the specified I/O operation (read/write). Uses TrueAsync event loop
* for efficient concurrent I/O without blocking the thread.
*
* @param pgsql PostgreSQL connection handle
* @param events Poll events to wait for (ASYNC_READABLE, ASYNC_WRITABLE, etc.)
* @param timeout_ms Timeout in milliseconds (0 = infinite)
* @return bool true on success, false on error or timeout
*
* @note This function should only be called from within a coroutine context.
* If called outside a coroutine, it will return false immediately.
*/
static bool php_pgsql_await_socket(PGconn *pgsql, async_poll_event events, zend_ulong timeout_ms)
{
ZEND_ASSERT(pgsql != NULL);

zend_coroutine_t *coroutine = ZEND_ASYNC_CURRENT_COROUTINE;

if (coroutine == NULL) {
return false;
}

const php_socket_t socket = PQsocket(pgsql);

if (socket < 0) {
return false;
}

zend_async_poll_event_t *poll_event = ZEND_ASYNC_NEW_SOCKET_EVENT(socket, events);

if (poll_event == NULL || EG(exception) != NULL) {
return false;
}

zend_async_waker_new_with_timeout(coroutine, timeout_ms, NULL);

if (EG(exception) != NULL) {
ZEND_ASYNC_EVENT_RELEASE(&poll_event->base);
return false;
}

zend_async_resume_when(coroutine, &poll_event->base, true,
zend_async_waker_callback_resolve, NULL);

if (EG(exception) != NULL) {
zend_async_waker_clean(coroutine);
return false;
}

const bool suspend_result = ZEND_ASYNC_SUSPEND();
zend_async_waker_clean(coroutine);

return suspend_result && EG(exception) == NULL;
}

/**
* Concurrent wrapper for PQexec() with automatic async/sync mode detection.
*
* Executes a SQL query concurrently if called from within a coroutine context,
* or falls back to synchronous execution otherwise. This provides transparent
* concurrent execution without requiring API changes.
*
* When in coroutine context:
* - Uses PQsendQuery() for non-blocking query submission
* - Awaits socket writability during flush
* - Awaits socket readability for result
* - Does not block other coroutines
*
* When not in coroutine context:
* - Falls back to standard PQexec() (blocking)
* - Maintains backward compatibility
*
* @param pgsql PostgreSQL connection handle
* @param query SQL query string
* @return PGresult* Query result or NULL on error
*/
static PGresult* php_pgsql_exec_concurrent(PGconn *pgsql, const char *query)
{
ZEND_ASSERT(pgsql != NULL);
ZEND_ASSERT(query != NULL);

zend_coroutine_t *coroutine = ZEND_ASYNC_CURRENT_COROUTINE;

if (coroutine == NULL) {
return PQexec(pgsql, query);
}

if (!PQsendQuery(pgsql, query)) {
return NULL;
}

int flush_result;
while ((flush_result = PQflush(pgsql)) > 0) {
if (!php_pgsql_await_socket(pgsql, ASYNC_WRITABLE, 0)) {
return NULL;
}
}

if (flush_result < 0) {
return NULL;
}

if (!php_pgsql_await_socket(pgsql, ASYNC_READABLE, 0)) {
return NULL;
}

if (!PQconsumeInput(pgsql)) {
return NULL;
}

return PQgetResult(pgsql);
}

/**
* Concurrent wrapper for PQexecParams() with automatic async/sync mode detection.
*
* Executes a parameterized SQL query concurrently if called from within a coroutine context,
* or falls back to synchronous execution otherwise.
*
* @param pgsql PostgreSQL connection handle
* @param query SQL query string with parameter placeholders ($1, $2, etc.)
* @param nParams Number of parameters
* @param paramTypes Array of parameter type OIDs (can be NULL)
* @param paramValues Array of parameter values as strings
* @param paramLengths Array of parameter lengths (can be NULL)
* @param paramFormats Array of parameter formats (can be NULL, 0=text, 1=binary)
* @param resultFormat Result format (0=text, 1=binary)
* @return PGresult* Query result or NULL on error
*/
static PGresult* php_pgsql_exec_params_concurrent(PGconn *pgsql, const char *query,
int nParams, const Oid *paramTypes, const char *const *paramValues,
const int *paramLengths, const int *paramFormats, int resultFormat)
{
ZEND_ASSERT(pgsql != NULL);
ZEND_ASSERT(query != NULL);

zend_coroutine_t *coroutine = ZEND_ASYNC_CURRENT_COROUTINE;

if (coroutine == NULL) {
return PQexecParams(pgsql, query, nParams, paramTypes, paramValues,
paramLengths, paramFormats, resultFormat);
}

if (!PQsendQueryParams(pgsql, query, nParams, paramTypes, paramValues,
paramLengths, paramFormats, resultFormat)) {
return NULL;
}

int flush_result;
while ((flush_result = PQflush(pgsql)) > 0) {
if (!php_pgsql_await_socket(pgsql, ASYNC_WRITABLE, 0)) {
return NULL;
}
}

if (flush_result < 0) {
return NULL;
}

if (!php_pgsql_await_socket(pgsql, ASYNC_READABLE, 0)) {
return NULL;
}

if (!PQconsumeInput(pgsql)) {
return NULL;
}

return PQgetResult(pgsql);
}

static void php_pgsql_set_default_link(zend_object *obj)
{
GC_ADDREF(obj);
Expand Down Expand Up @@ -768,13 +944,18 @@ static void php_pgsql_do_connect(INTERNAL_FUNCTION_PARAMETERS, int persistent)
link->persistent = 1;
} else { /* Non persistent connection */
zval *index_ptr;
const bool in_async_context = (ZEND_ASYNC_CURRENT_COROUTINE != NULL);

/* first we check the hash for the hashed_details key. If it exists,
* it should point us to the right offset where the actual pgsql link sits.
* if it doesn't, open a new pgsql link, add it to the resource list,
* and add a pointer to it with hashed_details as the key.
*
* In async context, always create a new connection to avoid race conditions
* when multiple coroutines try to use the same connection simultaneously.
*/
if (!(connect_type & PGSQL_CONNECT_FORCE_NEW)
if (!in_async_context
&& !(connect_type & PGSQL_CONNECT_FORCE_NEW)
&& (index_ptr = zend_hash_find(&PGG(connections), str.s)) != NULL) {
php_pgsql_set_default_link(Z_OBJ_P(index_ptr));
ZVAL_COPY(return_value, index_ptr);
Expand Down Expand Up @@ -814,13 +995,11 @@ static void php_pgsql_do_connect(INTERNAL_FUNCTION_PARAMETERS, int persistent)
link->notices = NULL;
link->persistent = 0;

/* add it to the hash */
zend_hash_update(&PGG(connections), str.s, return_value);

/* Keep track of link => hash mapping, so we can remove the hash entry from connections
* when the connection is closed. This uses the address of the connection rather than the
* zend_resource, because the resource destructor is passed a stack copy of the resource
* structure. */
/* In async context, don't cache the connection to ensure each coroutine
* gets its own isolated connection. */
if (!in_async_context) {
zend_hash_update(&PGG(connections), str.s, return_value);
}

PGG(num_links)++;
}
Expand Down Expand Up @@ -1185,11 +1364,11 @@ PHP_FUNCTION(pg_query)
if (leftover) {
php_error_docref(NULL, E_NOTICE, "Found results on this connection. Use pg_get_result() to get these results first");
}
pgsql_result = PQexec(pgsql, query);
pgsql_result = php_pgsql_exec_concurrent(pgsql, query);
if ((PGG(auto_reset_persistent) & 2) && PQstatus(pgsql) != CONNECTION_OK) {
PQclear(pgsql_result);
PQreset(pgsql);
pgsql_result = PQexec(pgsql, query);
pgsql_result = php_pgsql_exec_concurrent(pgsql, query);
}

if (pgsql_result) {
Expand Down Expand Up @@ -1320,12 +1499,12 @@ PHP_FUNCTION(pg_query_params)
RETURN_THROWS();
}

pgsql_result = PQexecParams(pgsql, query, num_params,
pgsql_result = php_pgsql_exec_params_concurrent(pgsql, query, num_params,
NULL, (const char * const *)params, NULL, NULL, 0);
if ((PGG(auto_reset_persistent) & 2) && PQstatus(pgsql) != CONNECTION_OK) {
PQclear(pgsql_result);
PQreset(pgsql);
pgsql_result = PQexecParams(pgsql, query, num_params,
pgsql_result = php_pgsql_exec_params_concurrent(pgsql, query, num_params,
NULL, (const char * const *)params, NULL, NULL, 0);
}

Expand Down
57 changes: 57 additions & 0 deletions ext/pgsql/tests/async_001_basic_query.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
--TEST--
PostgreSQL concurrent query - basic usage
--EXTENSIONS--
pgsql
true_async
--SKIPIF--
<?php include("inc/skipif.inc"); ?>
--FILE--
<?php

use function Async\spawn;
use function Async\await;

include('inc/config.inc');

$db = pg_connect($conn_str);
$table_name = "table_concurrent_001";

// Create test table
pg_query($db, "DROP TABLE IF EXISTS {$table_name}");
pg_query($db, "CREATE TABLE {$table_name} (id int, value text)");
pg_query($db, "INSERT INTO {$table_name} VALUES (1, 'test1'), (2, 'test2'), (3, 'test3')");

echo "Testing concurrent pg_query()...\n";

// Test 1: Simple query in coroutine
$coroutine = spawn(function() use ($db, $table_name) {
echo "Coroutine: executing query\n";
$result = pg_query($db, "SELECT * FROM {$table_name} WHERE id = 1");

if (!$result) {
echo "ERROR: Query failed\n";
return;
}

$row = pg_fetch_assoc($result);
echo "Coroutine: got result id={$row['id']}, value={$row['value']}\n";

pg_free_result($result);
});

$result = await($coroutine);

echo "Main: coroutine completed\n";

// Cleanup
pg_query($db, "DROP TABLE {$table_name}");
pg_close($db);

echo "OK\n";
?>
--EXPECT--
Testing concurrent pg_query()...
Coroutine: executing query
Coroutine: got result id=1, value=test1
Main: coroutine completed
OK
Loading
Loading