Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Protocol violations #219

Merged
merged 8 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: '3.8'

x-definitions:
x-shared-env:
&common-env
Expand Down
27 changes: 6 additions & 21 deletions src/Bolt/BoltConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
use Laudis\Neo4j\Contracts\FormatterInterface;
use Laudis\Neo4j\Databags\BookmarkHolder;
use Laudis\Neo4j\Databags\DatabaseInfo;
use Laudis\Neo4j\Databags\Neo4jError;
use Laudis\Neo4j\Enum\AccessMode;
use Laudis\Neo4j\Enum\ConnectionProtocol;
use Laudis\Neo4j\Exception\Neo4jException;
Expand Down Expand Up @@ -151,6 +150,12 @@ public function setTimeout(float $timeout): void

public function consumeResults(): void
{
if ($this->protocol()->serverState !== ServerState::STREAMING && $this->protocol()->serverState !== ServerState::TX_STREAMING) {
$this->subscribedResults = [];

return;
}

foreach ($this->subscribedResults as $result) {
$result = $result->get();
if ($result) {
Expand Down Expand Up @@ -185,10 +190,6 @@ public function begin(?string $database, ?float $timeout, BookmarkHolder $holder
{
$this->consumeResults();

if ($this->protocol()->serverState !== ServerState::READY) {
throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Request.Invalid', 'Message \'BEGIN\' cannot be handled by a session which isn\'t in the READY state.')]);
}

$extra = $this->buildRunExtra($database, $timeout, $holder, AccessMode::WRITE());
$response = $this->protocol()
->begin($extra)
Expand All @@ -203,10 +204,6 @@ public function begin(?string $database, ?float $timeout, BookmarkHolder $holder
*/
public function discard(?int $qid): void
{
if (!in_array($this->protocol()->serverState, [ServerState::STREAMING, ServerState::TX_STREAMING], true)) {
throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Request.Invalid', 'Message \'DISCARD\' cannot be handled by a session which isn\'t in the STREAMING|TX_STREAMING state.')]);
}

$extra = $this->buildResultExtra(null, $qid);
$response = $this->protocol()
->discard($extra)
Expand All @@ -223,10 +220,6 @@ public function discard(?int $qid): void
*/
public function run(string $text, array $parameters, ?string $database, ?float $timeout, BookmarkHolder $holder, ?AccessMode $mode): array
{
if (!in_array($this->protocol()->serverState, [ServerState::READY, ServerState::TX_READY, ServerState::TX_STREAMING], true)) {
throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Request.Invalid', 'Message \'RUN\' cannot be handled by a session which isn\'t in the READY|TX_READY|TX_STREAMING state.')]);
}

$extra = $this->buildRunExtra($database, $timeout, $holder, $mode);
$response = $this->protocol()
->run($text, $parameters, $extra)
Expand Down Expand Up @@ -260,10 +253,6 @@ public function rollback(): void
{
$this->consumeResults();

if ($this->protocol()->serverState !== ServerState::TX_READY) {
throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Request.Invalid', 'Message \'ROLLBACK\' cannot be handled by a session which isn\'t in the TX_READY state.')]);
}

$response = $this->protocol()
->rollback()
->getResponse();
Expand All @@ -284,10 +273,6 @@ public function protocol(): V4_4|V5|V5_1|V5_2|V5_3|V5_4
*/
public function pull(?int $qid, ?int $fetchSize): array
{
if (!in_array($this->protocol()->serverState, [ServerState::STREAMING, ServerState::TX_STREAMING], true)) {
throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Request.Invalid', 'Message \'PULL\' cannot be handled by a session which isn\'t in the STREAMING|TX_STREAMING state.')]);
}

$extra = $this->buildResultExtra($fetchSize, $qid);

$tbr = [];
Expand Down
75 changes: 46 additions & 29 deletions src/Bolt/BoltUnmanagedTransaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@

namespace Laudis\Neo4j\Bolt;

use Laudis\Neo4j\Common\TransactionHelper;
use Bolt\enum\ServerState;
use Laudis\Neo4j\Contracts\FormatterInterface;
use Laudis\Neo4j\Contracts\UnmanagedTransactionInterface;
use Laudis\Neo4j\Databags\BookmarkHolder;
use Laudis\Neo4j\Databags\SessionConfiguration;
use Laudis\Neo4j\Databags\Statement;
use Laudis\Neo4j\Databags\TransactionConfiguration;
use Laudis\Neo4j\Exception\Neo4jException;
use Laudis\Neo4j\Enum\TransactionState;
use Laudis\Neo4j\Exception\ClientException;
use Laudis\Neo4j\ParameterHelper;
use Laudis\Neo4j\Types\AbstractCypherSequence;
use Laudis\Neo4j\Types\CypherList;
Expand All @@ -40,9 +41,7 @@
*/
final class BoltUnmanagedTransaction implements UnmanagedTransactionInterface
{
private bool $isRolledBack = false;

private bool $isCommitted = false;
private TransactionState $state = TransactionState::ACTIVE;

/**
* @param FormatterInterface<T> $formatter
Expand All @@ -61,8 +60,25 @@ public function __construct(
private readonly BookmarkHolder $bookmarkHolder
) {}

/**
* @throws ClientException|Throwable
*/
public function commit(iterable $statements = []): CypherList
{
if ($this->isFinished()) {
if ($this->state === TransactionState::TERMINATED) {
throw new ClientException("Can't commit, transaction has been terminated");
}

if ($this->state === TransactionState::COMMITTED) {
throw new ClientException("Can't commit, transaction has already been committed");
}

if ($this->state === TransactionState::ROLLED_BACK) {
throw new ClientException("Can't commit, transaction has already been rolled back");
}
}

// Force the results to pull all the results.
// After a commit, the connection will be in the ready state, making it impossible to use PULL
$tbr = $this->runStatements($statements)->each(static function ($list) {
Expand All @@ -72,15 +88,29 @@ public function commit(iterable $statements = []): CypherList
});

$this->connection->commit();
$this->isCommitted = true;
$this->state = TransactionState::COMMITTED;

return $tbr;
}

public function rollback(): void
{
if ($this->isFinished()) {
if ($this->state === TransactionState::TERMINATED) {
throw new ClientException("Can't rollback, transaction has been terminated");
}

if ($this->state === TransactionState::COMMITTED) {
throw new ClientException("Can't rollback, transaction has already been committed");
}

if ($this->state === TransactionState::ROLLED_BACK) {
throw new ClientException("Can't rollback, transaction has already been rolled back");
}
}

$this->connection->rollback();
$this->isRolledBack = true;
$this->state = TransactionState::ROLLED_BACK;
}

/**
Expand All @@ -99,6 +129,11 @@ public function runStatement(Statement $statement)
$parameters = ParameterHelper::formatParameters($statement->getParameters(), $this->connection->getProtocol());
$start = microtime(true);

$serverState = $this->connection->protocol()->serverState;
if (in_array($serverState, [ServerState::STREAMING, ServerState::TX_STREAMING])) {
$this->connection->consumeResults();
}

try {
$meta = $this->connection->run(
$statement->getText(),
Expand All @@ -109,7 +144,7 @@ public function runStatement(Statement $statement)
$this->config->getAccessMode()
);
} catch (Throwable $e) {
$this->isRolledBack = true;
$this->state = TransactionState::TERMINATED;
throw $e;
}
$run = microtime(true);
Expand Down Expand Up @@ -139,36 +174,18 @@ public function runStatements(iterable $statements): CypherList
return new CypherList($tbr);
}

/**
* @throws Neo4jException
*
* @return never
*/
private function handleMessageException(Neo4jException $e): void
{
$exception = $e->getErrors()[0];
if (!($exception->getClassification() === 'ClientError' && $exception->getCategory() === 'Request')) {
$this->connection->reset();
}
if (!$this->isFinished() && in_array($exception->getClassification(), TransactionHelper::ROLLBACK_CLASSIFICATIONS)) {
$this->isRolledBack = true;
}

throw $e;
}

public function isRolledBack(): bool
{
return $this->isRolledBack;
return $this->state === TransactionState::ROLLED_BACK || $this->state === TransactionState::TERMINATED;
}

public function isCommitted(): bool
{
return $this->isCommitted;
return $this->state == TransactionState::COMMITTED;
}

public function isFinished(): bool
{
return $this->isRolledBack() || $this->isCommitted();
return $this->state != TransactionState::ACTIVE;
}
}
40 changes: 0 additions & 40 deletions src/Enum/TransactionEffect.php

This file was deleted.

40 changes: 40 additions & 0 deletions src/Enum/TransactionState.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Neo4j PHP Client and Driver package.
*
* (c) Nagels <https://nagels.tech>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Laudis\Neo4j\Enum;

/**
* The state of a transaction.
*/
enum TransactionState
{
/**
* The transaction is running with no explicit success or failure marked.
*/
case ACTIVE;

/**
* This transaction has been terminated because of a fatal connection error.
*/
case TERMINATED;

/**
* This transaction has successfully committed.
*/
case COMMITTED;

/**
* This transaction has been rolled back.
*/
case ROLLED_BACK;
}
32 changes: 32 additions & 0 deletions src/Exception/ClientException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Neo4j PHP Client and Driver package.
*
* (c) Nagels <https://nagels.tech>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Laudis\Neo4j\Exception;

use RuntimeException;
use Throwable;

/**
* Exception when a Client Error occurs.
*
* @psalm-immutable
*
* @psalm-suppress MutableDependency
*/
final class ClientException extends RuntimeException
{
public function __construct(string $message, ?Throwable $previous = null)
{
parent::__construct($message, 0, $previous);
}
}
Loading
Loading