diff --git a/ext/pgsql/pgsql.c b/ext/pgsql/pgsql.c index 165342edcc51c..f96e6ad745c25 100644 --- a/ext/pgsql/pgsql.c +++ b/ext/pgsql/pgsql.c @@ -174,6 +174,18 @@ static void pgsql_link_free(pgsql_link_handle *link) { PGresult *res; +#ifdef LIBPQ_HAS_PIPELINING + if (!link->synced) { + PGcancel *c; + char err[256]; + + c = PQgetCancel(link->conn); + PQcancel(c, err, sizeof(err)); + PQfreeCancel(c); + PQpipelineSync(link->conn); + } +#endif + while ((res = PQgetResult(link->conn))) { PQclear(res); } @@ -359,6 +371,20 @@ static int _rollback_transactions(zval *el) return -1; } + if (PQtransactionStatus(link) != PQTRANS_IDLE) { + PGcancel *c; + char err[256]; + + c = PQgetCancel(link); + PQcancel(c, err, sizeof(err)); + PQfreeCancel(c); +#ifdef LIBPQ_HAS_PIPELINING + if (PQpipelineSync(link)) { + PQexitPipelineMode(link); + } +#endif + } + while ((res = PQgetResult(link))) { PQclear(res); } @@ -626,6 +652,9 @@ static void php_pgsql_do_connect(INTERNAL_FUNCTION_PARAMETERS, int persistent) link->hash = zend_string_copy(str.s); link->notices = NULL; link->persistent = 1; +#ifdef LIBPQ_HAS_PIPELINING + link->synced = 1; +#endif } else { /* Non persistent connection */ zval *index_ptr; @@ -673,6 +702,9 @@ static void php_pgsql_do_connect(INTERNAL_FUNCTION_PARAMETERS, int persistent) link->hash = zend_string_copy(str.s); link->notices = NULL; link->persistent = 0; +#ifdef LIBPQ_HAS_PIPELINING + link->synced = 1; +#endif /* add it to the hash */ zend_hash_update(&PGG(connections), str.s, return_value); @@ -988,17 +1020,23 @@ PHP_FUNCTION(pg_query) pgsql = link->conn; - if (PQsetnonblocking(pgsql, 0)) { - php_error_docref(NULL, E_NOTICE,"Cannot set connection to blocking mode"); - RETURN_FALSE; - } - while ((pgsql_result = PQgetResult(pgsql))) { - PQclear(pgsql_result); - leftover = 1; - } - if (leftover) { - php_error_docref(NULL, E_NOTICE, "Found results on this connection. Use pg_get_result() to get these results first"); +#ifdef LIBPQ_HAS_PIPELINING + if (PQpipelineStatus(pgsql) == PQ_PIPELINE_OFF) { +#endif + if (PQsetnonblocking(pgsql, 0)) { + php_error_docref(NULL, E_NOTICE,"Cannot set connection to blocking mode"); + RETURN_FALSE; + } + while ((pgsql_result = PQgetResult(pgsql))) { + PQclear(pgsql_result); + leftover = 1; + } + if (leftover) { + php_error_docref(NULL, E_NOTICE, "Found results on this connection. Use pg_get_result() to get these results first"); + } +#ifdef LIBPQ_HAS_PIPELINING } +#endif pgsql_result = PQexec(pgsql, query); if ((PGG(auto_reset_persistent) & 2) && PQstatus(pgsql) != CONNECTION_OK) { PQclear(pgsql_result); @@ -1082,17 +1120,23 @@ PHP_FUNCTION(pg_query_params) pgsql = link->conn; - if (PQsetnonblocking(pgsql, 0)) { - php_error_docref(NULL, E_NOTICE,"Cannot set connection to blocking mode"); - RETURN_FALSE; - } - while ((pgsql_result = PQgetResult(pgsql))) { - PQclear(pgsql_result); - leftover = 1; - } - if (leftover) { - php_error_docref(NULL, E_NOTICE, "Found results on this connection. Use pg_get_result() to get these results first"); +#ifdef LIBPQ_HAS_PIPELINING + if (PQpipelineStatus(pgsql) == PQ_PIPELINE_OFF) { +#endif + if (PQsetnonblocking(pgsql, 0)) { + php_error_docref(NULL, E_NOTICE,"Cannot set connection to blocking mode"); + RETURN_FALSE; + } + while ((pgsql_result = PQgetResult(pgsql))) { + PQclear(pgsql_result); + leftover = 1; + } + if (leftover) { + php_error_docref(NULL, E_NOTICE, "Found results on this connection. Use pg_get_result() to get these results first"); + } +#ifdef LIBPQ_HAS_PIPELINING } +#endif num_params = zend_hash_num_elements(Z_ARRVAL_P(pv_param_arr)); if (num_params > 0) { @@ -1187,17 +1231,23 @@ PHP_FUNCTION(pg_prepare) pgsql = link->conn; - if (PQsetnonblocking(pgsql, 0)) { - php_error_docref(NULL, E_NOTICE,"Cannot set connection to blocking mode"); - RETURN_FALSE; - } - while ((pgsql_result = PQgetResult(pgsql))) { - PQclear(pgsql_result); - leftover = 1; - } - if (leftover) { - php_error_docref(NULL, E_NOTICE, "Found results on this connection. Use pg_get_result() to get these results first"); +#ifdef LIBPQ_HAS_PIPELINING + if (PQpipelineStatus(pgsql) == PQ_PIPELINE_OFF) { +#endif + if (PQsetnonblocking(pgsql, 0)) { + php_error_docref(NULL, E_NOTICE,"Cannot set connection to blocking mode"); + RETURN_FALSE; + } + while ((pgsql_result = PQgetResult(pgsql))) { + PQclear(pgsql_result); + leftover = 1; + } + if (leftover) { + php_error_docref(NULL, E_NOTICE, "Found results on this connection. Use pg_get_result() to get these results first"); + } +#ifdef LIBPQ_HAS_PIPELINING } +#endif pgsql_result = PQprepare(pgsql, stmtname, query, 0, NULL); if ((PGG(auto_reset_persistent) & 2) && PQstatus(pgsql) != CONNECTION_OK) { PQclear(pgsql_result); @@ -1268,17 +1318,23 @@ PHP_FUNCTION(pg_execute) pgsql = link->conn; - if (PQsetnonblocking(pgsql, 0)) { - php_error_docref(NULL, E_NOTICE,"Cannot set connection to blocking mode"); - RETURN_FALSE; - } - while ((pgsql_result = PQgetResult(pgsql))) { - PQclear(pgsql_result); - leftover = 1; - } - if (leftover) { - php_error_docref(NULL, E_NOTICE, "Found results on this connection. Use pg_get_result() to get these results first"); +#ifdef LIBPQ_HAS_PIPELINING + if (PQpipelineStatus(pgsql) == PQ_PIPELINE_OFF) { +#endif + if (PQsetnonblocking(pgsql, 0)) { + php_error_docref(NULL, E_NOTICE,"Cannot set connection to blocking mode"); + RETURN_FALSE; + } + while ((pgsql_result = PQgetResult(pgsql))) { + PQclear(pgsql_result); + leftover = 1; + } + if (leftover) { + php_error_docref(NULL, E_NOTICE, "Found results on this connection. Use pg_get_result() to get these results first"); + } +#ifdef LIBPQ_HAS_PIPELINING } +#endif num_params = zend_hash_num_elements(Z_ARRVAL_P(pv_param_arr)); if (num_params > 0) { @@ -3519,6 +3575,9 @@ static void php_pgsql_do_async(INTERNAL_FUNCTION_PARAMETERS, int entry_type) pgsql_link_handle *link; PGconn *pgsql; PGresult *pgsql_result; +#ifdef LIBPQ_HAS_PIPELINING + bool is_pipeline_mode; +#endif if (zend_parse_parameters(ZEND_NUM_ARGS(), "O", &pgsql_link, pgsql_link_ce) == FAILURE) { RETURN_THROWS(); @@ -3528,10 +3587,17 @@ static void php_pgsql_do_async(INTERNAL_FUNCTION_PARAMETERS, int entry_type) CHECK_PGSQL_LINK(link); pgsql = link->conn; - if (PQsetnonblocking(pgsql, 1)) { - php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode"); - RETURN_FALSE; +#ifdef LIBPQ_HAS_PIPELINING + is_pipeline_mode = (PQpipelineStatus(pgsql) == PQ_PIPELINE_ON); + if (!is_pipeline_mode) { +#endif + if (PQsetnonblocking(pgsql, 1)) { + php_error_docref(NULL, E_NOTICE, "Cannot set connection to nonblocking mode"); + RETURN_FALSE; + } +#ifdef LIBPQ_HAS_PIPELINING } +#endif switch(entry_type) { case PHP_PG_ASYNC_IS_BUSY: PQconsumeInput(pgsql); @@ -3547,17 +3613,31 @@ static void php_pgsql_do_async(INTERNAL_FUNCTION_PARAMETERS, int entry_type) if (rc < 0) { zend_error(E_WARNING, "cannot cancel the query: %s", err); } - while ((pgsql_result = PQgetResult(pgsql))) { - PQclear(pgsql_result); +#ifdef LIBPQ_HAS_PIPELINING + if (!is_pipeline_mode) { +#endif + while ((pgsql_result = PQgetResult(pgsql))) { + PQclear(pgsql_result); + } +#ifdef LIBPQ_HAS_PIPELINING + } else { + link->synced = 1; } +#endif PQfreeCancel(c); break; } EMPTY_SWITCH_DEFAULT_CASE() } - if (PQsetnonblocking(pgsql, 0)) { - php_error_docref(NULL, E_NOTICE, "Cannot set connection to blocking mode"); +#ifdef LIBPQ_HAS_PIPELINING + if (!is_pipeline_mode) { +#endif + if (PQsetnonblocking(pgsql, 0)) { + php_error_docref(NULL, E_NOTICE, "Cannot set connection to blocking mode"); + } +#ifdef LIBPQ_HAS_PIPELINING } +#endif convert_to_boolean(return_value); } /* }}} */ @@ -3636,6 +3716,7 @@ PHP_FUNCTION(pg_send_query) } #ifdef LIBPQ_HAS_PIPELINING if (is_pipeline_mode) { + link->synced = 0; ret = 0; } else { #endif @@ -3761,6 +3842,7 @@ PHP_FUNCTION(pg_send_query_params) if (is_non_blocking) { #ifdef LIBPQ_HAS_PIPELINING if (is_pipeline_mode) { + link->synced = 0; ret = 0; } else { #endif @@ -3852,6 +3934,7 @@ PHP_FUNCTION(pg_send_prepare) if (is_non_blocking) { #ifdef LIBPQ_HAS_PIPELINING if (is_pipeline_mode) { + link->synced = 0; ret = 0; } else { #endif @@ -3970,6 +4053,7 @@ PHP_FUNCTION(pg_send_execute) if (is_non_blocking) { #ifdef LIBPQ_HAS_PIPELINING if (is_pipeline_mode) { + link->synced = 0; ret = 0; } else { #endif @@ -4017,6 +4101,12 @@ PHP_FUNCTION(pg_get_result) link = Z_PGSQL_LINK_P(pgsql_link); CHECK_PGSQL_LINK(link); pgsql = link->conn; +#ifdef LIBPQ_HAS_PIPELINING + if (!link->synced) { + php_error_docref(NULL, E_NOTICE, + "The connection is in pipeline mode. A synchronization message must be sent before results can be received. Call pg_pipeline_sync()"); + } +#endif pgsql_result = PQgetResult(pgsql); if (!pgsql_result) { @@ -5963,6 +6053,7 @@ PHP_FUNCTION(pg_enter_pipeline_mode) { zval *pgsql_link; pgsql_link_handle *pgsql_handle; + int ret; if (zend_parse_parameters(ZEND_NUM_ARGS(), "O", &pgsql_link, pgsql_link_ce) == FAILURE) { RETURN_THROWS(); @@ -5973,13 +6064,19 @@ PHP_FUNCTION(pg_enter_pipeline_mode) PQsetnonblocking(pgsql_handle->conn, 1); - RETURN_BOOL(PQenterPipelineMode(pgsql_handle->conn)); + ret = PQenterPipelineMode(pgsql_handle->conn); + if (ret) { + pgsql_handle->synced = 0; + } + + RETURN_BOOL(ret); } PHP_FUNCTION(pg_exit_pipeline_mode) { zval *pgsql_link; pgsql_link_handle *pgsql_handle; + int ret; if (zend_parse_parameters(ZEND_NUM_ARGS(), "O", &pgsql_link, pgsql_link_ce) == FAILURE) { RETURN_THROWS(); @@ -5990,7 +6087,12 @@ PHP_FUNCTION(pg_exit_pipeline_mode) PQsetnonblocking(pgsql_handle->conn, 0); - RETURN_BOOL(PQexitPipelineMode(pgsql_handle->conn)); + ret = PQexitPipelineMode(pgsql_handle->conn); + if (ret) { + pgsql_handle->synced = 1; + } + + RETURN_BOOL(ret); } PHP_FUNCTION(pg_send_flush_request) @@ -6012,6 +6114,7 @@ PHP_FUNCTION(pg_pipeline_sync) { zval *pgsql_link; pgsql_link_handle *pgsql_handle; + int ret; if (zend_parse_parameters(ZEND_NUM_ARGS(), "O", &pgsql_link, pgsql_link_ce) == FAILURE) { RETURN_THROWS(); @@ -6020,7 +6123,12 @@ PHP_FUNCTION(pg_pipeline_sync) pgsql_handle = Z_PGSQL_LINK_P(pgsql_link); CHECK_PGSQL_LINK(pgsql_handle); - RETURN_BOOL(PQpipelineSync(pgsql_handle->conn)); + ret = PQpipelineSync(pgsql_handle->conn); + if (ret) { + pgsql_handle->synced = 1; + } + + RETURN_BOOL(ret); } PHP_FUNCTION(pg_pipeline_status) diff --git a/ext/pgsql/php_pgsql.h b/ext/pgsql/php_pgsql.h index 5286ccec636b4..6fc2bc9b6dda5 100644 --- a/ext/pgsql/php_pgsql.h +++ b/ext/pgsql/php_pgsql.h @@ -148,6 +148,9 @@ typedef struct pgsql_link_handle { zend_string *hash; HashTable *notices; bool persistent; +#ifdef LIBPQ_HAS_PIPELINING + bool synced; +#endif zend_object std; } pgsql_link_handle; diff --git a/ext/pgsql/tests/pg_pipeline_sync.phpt b/ext/pgsql/tests/pg_pipeline_sync.phpt index f72c8cd87940f..ec54ab4b03cfd 100644 --- a/ext/pgsql/tests/pg_pipeline_sync.phpt +++ b/ext/pgsql/tests/pg_pipeline_sync.phpt @@ -117,14 +117,158 @@ if (($result = pg_get_result($db)) !== false) { } } +for ($i = 99; $i < 199; ++$i) { + if (!pg_send_query_params($db, "select $1 as index, now() + ($1||' day')::interval as time, pg_sleep(0.001)", array($i))) { + die('pg_send_query_params failed'); + } +} + +if (!pg_pipeline_sync($db)) { + die('pg_pipeline_sync failed'); +} + +usleep(10000); + +pg_cancel_query($db); + +if (pg_pipeline_status($db) !== PGSQL_PIPELINE_ON) { + die('pg_pipeline_status failed'); +} + +if (pg_connection_busy($db)) { + $read = [$stream]; $write = $ex = []; + while (!stream_select($read, $write, $ex, null, null)) { } +} + +$canceled_count = 0; +for ($i = 99; $i < 199; ++$i) { + if (!($result = pg_get_result($db))) { + die('pg_get_result'); + } + + $result_status = pg_result_status($result); + if ($result_status === PGSQL_FATAL_ERROR) { + if (pg_connection_status($db) !== PGSQL_CONNECTION_OK) { + die('pg_cancel_query failed'); + } + if (strpos(pg_last_error($db), 'canceling statement') === false) { + die('pg_cancel_query failed'); + } + pg_free_result($result); + if ($result = pg_get_result($db)) { + die('pg_get_result'); + } + continue; + } + if ($result_status === 11/*PGSQL_STATUS_PIPELINE_ABORTED*/) { + ++$canceled_count; + pg_free_result($result); + if ($result = pg_get_result($db)) { + die('pg_get_result'); + } + continue; + } + if ($result_status !== PGSQL_TUPLES_OK) { + die('pg_result_status failed'); + } + + if (pg_num_rows($result) == -1) { + die('pg_num_rows failed'); + } + + if (!($row = pg_fetch_row($result, null))) { + die('pg_fetch_row failed'); + } + + pg_free_result($result); + + if (pg_get_result($db) !== false) { + die('pg_get_result failed'); + } +} + +if ($canceled_count < 1) { + die('pg_cancel_query failed'); +} + +if (($result = pg_get_result($db)) !== false) { + if (pg_result_status($result) !== PGSQL_PIPELINE_SYNC) { + die('pg_result_status failed'); + } +} + if (!pg_exit_pipeline_mode($db)) { die('pg_exit_pipeline_mode failed'); } -echo "OK"; +if (!pg_enter_pipeline_mode($db)) { + die('pg_exit_pipeline_mode failed'); +} + +if (!pg_send_query_params($db, "create table if not exists __test__pg_pipeline_sync__test__(f1 integer, f2 character varying)", array())) { + die('pg_send_query_params failed'); +} + +if (pg_exit_pipeline_mode($db)) { + die('pg_exit_pipeline_mode failed'); +} pg_close($db); +if (!$db = pg_connect($conn_str)) { + die("pg_connect() error"); +} + +if (!pg_send_query_params($db, "select * from __test__pg_pipeline_sync__test__", array())) { + die('pg_send_query_params failed'); +} + +if (!($stream = pg_socket($db))) { + die('pg_socket'); +} + +if (pg_connection_busy($db)) { + $read = [$stream]; $write = $ex = []; + while (!stream_select($read, $write, $ex, null, null)) { } +} + +if (!($result = pg_get_result($db))) { + die('pg_get_result'); +} + +if (pg_result_status($result) !== PGSQL_FATAL_ERROR) { + die('pg_result_status failed'); +} + +pg_free_result($result); + +if (pg_get_result($db) !== false) { + die('pg_get_result failed'); +} + +pg_close($db); + +if (!$db = pg_connect($conn_str)) { + die("pg_connect() error"); +} + +if (!pg_enter_pipeline_mode($db)) { + die('pg_enter_pipeline_mode'); +} + +if (!pg_send_query_params($db, "select $1 as index, now() + ($1||' day')::interval as time", array(199))) { + die('pg_send_query_params failed'); +} + +@pg_query_params($db, "select $1 as index, now() + ($1||' day')::interval as time", array(200)); +if (strpos(pg_last_error($db), 'synchronous command execution functions are not allowed in pipeline mode') === false) { + die('pg_query_params failed'); +} + +pg_close($db); + +echo "OK"; + ?> --EXPECT-- OK