Skip to content

Php 8.3 pgsql pipeline mode #13223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 158 additions & 50 deletions ext/pgsql/pgsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,18 @@ static void pgsql_link_free(pgsql_link_handle *link)
{
PGresult *res;

#ifdef LIBPQ_HAS_PIPELINING
if (!link->synced) {
PGcancel *c;
char err[256];

c = PQgetCancel(link->conn);
PQcancel(c, err, sizeof(err));
PQfreeCancel(c);
PQpipelineSync(link->conn);
}
#endif

while ((res = PQgetResult(link->conn))) {
PQclear(res);
}
Expand Down Expand Up @@ -359,6 +371,20 @@ static int _rollback_transactions(zval *el)
return -1;
}

if (PQtransactionStatus(link) != PQTRANS_IDLE) {
PGcancel *c;
char err[256];

c = PQgetCancel(link);
PQcancel(c, err, sizeof(err));
PQfreeCancel(c);
#ifdef LIBPQ_HAS_PIPELINING
if (PQpipelineSync(link)) {
PQexitPipelineMode(link);
}
#endif
}

while ((res = PQgetResult(link))) {
PQclear(res);
}
Expand Down Expand Up @@ -626,6 +652,9 @@ static void php_pgsql_do_connect(INTERNAL_FUNCTION_PARAMETERS, int persistent)
link->hash = zend_string_copy(str.s);
link->notices = NULL;
link->persistent = 1;
#ifdef LIBPQ_HAS_PIPELINING
link->synced = 1;
#endif
} else { /* Non persistent connection */
zval *index_ptr;

Expand Down Expand Up @@ -673,6 +702,9 @@ static void php_pgsql_do_connect(INTERNAL_FUNCTION_PARAMETERS, int persistent)
link->hash = zend_string_copy(str.s);
link->notices = NULL;
link->persistent = 0;
#ifdef LIBPQ_HAS_PIPELINING
link->synced = 1;
#endif

/* add it to the hash */
zend_hash_update(&PGG(connections), str.s, return_value);
Expand Down Expand Up @@ -988,17 +1020,23 @@ PHP_FUNCTION(pg_query)

pgsql = link->conn;

if (PQsetnonblocking(pgsql, 0)) {
php_error_docref(NULL, E_NOTICE,"Cannot set connection to blocking mode");
RETURN_FALSE;
}
while ((pgsql_result = PQgetResult(pgsql))) {
PQclear(pgsql_result);
leftover = 1;
}
if (leftover) {
php_error_docref(NULL, E_NOTICE, "Found results on this connection. Use pg_get_result() to get these results first");
#ifdef LIBPQ_HAS_PIPELINING
if (PQpipelineStatus(pgsql) == PQ_PIPELINE_OFF) {
#endif
if (PQsetnonblocking(pgsql, 0)) {
php_error_docref(NULL, E_NOTICE,"Cannot set connection to blocking mode");
RETURN_FALSE;
}
while ((pgsql_result = PQgetResult(pgsql))) {
PQclear(pgsql_result);
leftover = 1;
}
if (leftover) {
php_error_docref(NULL, E_NOTICE, "Found results on this connection. Use pg_get_result() to get these results first");
}
#ifdef LIBPQ_HAS_PIPELINING
}
#endif
pgsql_result = PQexec(pgsql, query);
if ((PGG(auto_reset_persistent) & 2) && PQstatus(pgsql) != CONNECTION_OK) {
PQclear(pgsql_result);
Expand Down Expand Up @@ -1082,17 +1120,23 @@ PHP_FUNCTION(pg_query_params)

pgsql = link->conn;

if (PQsetnonblocking(pgsql, 0)) {
php_error_docref(NULL, E_NOTICE,"Cannot set connection to blocking mode");
RETURN_FALSE;
}
while ((pgsql_result = PQgetResult(pgsql))) {
PQclear(pgsql_result);
leftover = 1;
}
if (leftover) {
php_error_docref(NULL, E_NOTICE, "Found results on this connection. Use pg_get_result() to get these results first");
#ifdef LIBPQ_HAS_PIPELINING
if (PQpipelineStatus(pgsql) == PQ_PIPELINE_OFF) {
#endif
if (PQsetnonblocking(pgsql, 0)) {
php_error_docref(NULL, E_NOTICE,"Cannot set connection to blocking mode");
RETURN_FALSE;
}
while ((pgsql_result = PQgetResult(pgsql))) {
PQclear(pgsql_result);
leftover = 1;
}
if (leftover) {
php_error_docref(NULL, E_NOTICE, "Found results on this connection. Use pg_get_result() to get these results first");
}
#ifdef LIBPQ_HAS_PIPELINING
}
#endif

num_params = zend_hash_num_elements(Z_ARRVAL_P(pv_param_arr));
if (num_params > 0) {
Expand Down Expand Up @@ -1187,17 +1231,23 @@ PHP_FUNCTION(pg_prepare)

pgsql = link->conn;

if (PQsetnonblocking(pgsql, 0)) {
php_error_docref(NULL, E_NOTICE,"Cannot set connection to blocking mode");
RETURN_FALSE;
}
while ((pgsql_result = PQgetResult(pgsql))) {
PQclear(pgsql_result);
leftover = 1;
}
if (leftover) {
php_error_docref(NULL, E_NOTICE, "Found results on this connection. Use pg_get_result() to get these results first");
#ifdef LIBPQ_HAS_PIPELINING
if (PQpipelineStatus(pgsql) == PQ_PIPELINE_OFF) {
#endif
if (PQsetnonblocking(pgsql, 0)) {
php_error_docref(NULL, E_NOTICE,"Cannot set connection to blocking mode");
RETURN_FALSE;
}
while ((pgsql_result = PQgetResult(pgsql))) {
PQclear(pgsql_result);
leftover = 1;
}
if (leftover) {
php_error_docref(NULL, E_NOTICE, "Found results on this connection. Use pg_get_result() to get these results first");
}
#ifdef LIBPQ_HAS_PIPELINING
}
#endif
pgsql_result = PQprepare(pgsql, stmtname, query, 0, NULL);
if ((PGG(auto_reset_persistent) & 2) && PQstatus(pgsql) != CONNECTION_OK) {
PQclear(pgsql_result);
Expand Down Expand Up @@ -1268,17 +1318,23 @@ PHP_FUNCTION(pg_execute)

pgsql = link->conn;

if (PQsetnonblocking(pgsql, 0)) {
php_error_docref(NULL, E_NOTICE,"Cannot set connection to blocking mode");
RETURN_FALSE;
}
while ((pgsql_result = PQgetResult(pgsql))) {
PQclear(pgsql_result);
leftover = 1;
}
if (leftover) {
php_error_docref(NULL, E_NOTICE, "Found results on this connection. Use pg_get_result() to get these results first");
#ifdef LIBPQ_HAS_PIPELINING
if (PQpipelineStatus(pgsql) == PQ_PIPELINE_OFF) {
#endif
if (PQsetnonblocking(pgsql, 0)) {
php_error_docref(NULL, E_NOTICE,"Cannot set connection to blocking mode");
RETURN_FALSE;
}
while ((pgsql_result = PQgetResult(pgsql))) {
PQclear(pgsql_result);
leftover = 1;
}
if (leftover) {
php_error_docref(NULL, E_NOTICE, "Found results on this connection. Use pg_get_result() to get these results first");
}
#ifdef LIBPQ_HAS_PIPELINING
}
#endif

num_params = zend_hash_num_elements(Z_ARRVAL_P(pv_param_arr));
if (num_params > 0) {
Expand Down Expand Up @@ -3519,6 +3575,9 @@ static void php_pgsql_do_async(INTERNAL_FUNCTION_PARAMETERS, int entry_type)
pgsql_link_handle *link;
PGconn *pgsql;
PGresult *pgsql_result;
#ifdef LIBPQ_HAS_PIPELINING
bool is_pipeline_mode;
#endif

if (zend_parse_parameters(ZEND_NUM_ARGS(), "O", &pgsql_link, pgsql_link_ce) == FAILURE) {
RETURN_THROWS();
Expand All @@ -3528,10 +3587,17 @@ static void php_pgsql_do_async(INTERNAL_FUNCTION_PARAMETERS, int entry_type)
CHECK_PGSQL_LINK(link);
pgsql = link->conn;

if (PQsetnonblocking(pgsql, 1)) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
#ifdef LIBPQ_HAS_PIPELINING
is_pipeline_mode = (PQpipelineStatus(pgsql) == PQ_PIPELINE_ON);
if (!is_pipeline_mode) {
#endif
if (PQsetnonblocking(pgsql, 1)) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode");
RETURN_FALSE;
}
#ifdef LIBPQ_HAS_PIPELINING
}
#endif
switch(entry_type) {
case PHP_PG_ASYNC_IS_BUSY:
PQconsumeInput(pgsql);
Expand All @@ -3547,17 +3613,31 @@ static void php_pgsql_do_async(INTERNAL_FUNCTION_PARAMETERS, int entry_type)
if (rc < 0) {
zend_error(E_WARNING, "cannot cancel the query: %s", err);
}
while ((pgsql_result = PQgetResult(pgsql))) {
PQclear(pgsql_result);
#ifdef LIBPQ_HAS_PIPELINING
if (!is_pipeline_mode) {
#endif
while ((pgsql_result = PQgetResult(pgsql))) {
PQclear(pgsql_result);
}
#ifdef LIBPQ_HAS_PIPELINING
} else {
link->synced = 1;
}
#endif
PQfreeCancel(c);
break;
}
EMPTY_SWITCH_DEFAULT_CASE()
}
if (PQsetnonblocking(pgsql, 0)) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to blocking mode");
#ifdef LIBPQ_HAS_PIPELINING
if (!is_pipeline_mode) {
#endif
if (PQsetnonblocking(pgsql, 0)) {
php_error_docref(NULL, E_NOTICE, "Cannot set connection to blocking mode");
}
#ifdef LIBPQ_HAS_PIPELINING
}
#endif
convert_to_boolean(return_value);
}
/* }}} */
Expand Down Expand Up @@ -3636,6 +3716,7 @@ PHP_FUNCTION(pg_send_query)
}
#ifdef LIBPQ_HAS_PIPELINING
if (is_pipeline_mode) {
link->synced = 0;
ret = 0;
} else {
#endif
Expand Down Expand Up @@ -3761,6 +3842,7 @@ PHP_FUNCTION(pg_send_query_params)
if (is_non_blocking) {
#ifdef LIBPQ_HAS_PIPELINING
if (is_pipeline_mode) {
link->synced = 0;
ret = 0;
} else {
#endif
Expand Down Expand Up @@ -3852,6 +3934,7 @@ PHP_FUNCTION(pg_send_prepare)
if (is_non_blocking) {
#ifdef LIBPQ_HAS_PIPELINING
if (is_pipeline_mode) {
link->synced = 0;
ret = 0;
} else {
#endif
Expand Down Expand Up @@ -3970,6 +4053,7 @@ PHP_FUNCTION(pg_send_execute)
if (is_non_blocking) {
#ifdef LIBPQ_HAS_PIPELINING
if (is_pipeline_mode) {
link->synced = 0;
ret = 0;
} else {
#endif
Expand Down Expand Up @@ -4017,6 +4101,12 @@ PHP_FUNCTION(pg_get_result)
link = Z_PGSQL_LINK_P(pgsql_link);
CHECK_PGSQL_LINK(link);
pgsql = link->conn;
#ifdef LIBPQ_HAS_PIPELINING
if (!link->synced) {
php_error_docref(NULL, E_NOTICE,
"The connection is in pipeline mode. A synchronization message must be sent before results can be received. Call pg_pipeline_sync()");
}
#endif

pgsql_result = PQgetResult(pgsql);
if (!pgsql_result) {
Expand Down Expand Up @@ -5963,6 +6053,7 @@ PHP_FUNCTION(pg_enter_pipeline_mode)
{
zval *pgsql_link;
pgsql_link_handle *pgsql_handle;
int ret;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "O", &pgsql_link, pgsql_link_ce) == FAILURE) {
RETURN_THROWS();
Expand All @@ -5973,13 +6064,19 @@ PHP_FUNCTION(pg_enter_pipeline_mode)

PQsetnonblocking(pgsql_handle->conn, 1);

RETURN_BOOL(PQenterPipelineMode(pgsql_handle->conn));
ret = PQenterPipelineMode(pgsql_handle->conn);
if (ret) {
pgsql_handle->synced = 0;
}

RETURN_BOOL(ret);
}

PHP_FUNCTION(pg_exit_pipeline_mode)
{
zval *pgsql_link;
pgsql_link_handle *pgsql_handle;
int ret;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "O", &pgsql_link, pgsql_link_ce) == FAILURE) {
RETURN_THROWS();
Expand All @@ -5990,7 +6087,12 @@ PHP_FUNCTION(pg_exit_pipeline_mode)

PQsetnonblocking(pgsql_handle->conn, 0);

RETURN_BOOL(PQexitPipelineMode(pgsql_handle->conn));
ret = PQexitPipelineMode(pgsql_handle->conn);
if (ret) {
pgsql_handle->synced = 1;
}

RETURN_BOOL(ret);
}

PHP_FUNCTION(pg_send_flush_request)
Expand All @@ -6012,6 +6114,7 @@ PHP_FUNCTION(pg_pipeline_sync)
{
zval *pgsql_link;
pgsql_link_handle *pgsql_handle;
int ret;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "O", &pgsql_link, pgsql_link_ce) == FAILURE) {
RETURN_THROWS();
Expand All @@ -6020,7 +6123,12 @@ PHP_FUNCTION(pg_pipeline_sync)
pgsql_handle = Z_PGSQL_LINK_P(pgsql_link);
CHECK_PGSQL_LINK(pgsql_handle);

RETURN_BOOL(PQpipelineSync(pgsql_handle->conn));
ret = PQpipelineSync(pgsql_handle->conn);
if (ret) {
pgsql_handle->synced = 1;
}

RETURN_BOOL(ret);
}

PHP_FUNCTION(pg_pipeline_status)
Expand Down
3 changes: 3 additions & 0 deletions ext/pgsql/php_pgsql.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ typedef struct pgsql_link_handle {
zend_string *hash;
HashTable *notices;
bool persistent;
#ifdef LIBPQ_HAS_PIPELINING
bool synced;
#endif
zend_object std;
} pgsql_link_handle;

Expand Down
Loading