diff --git a/src/Connection/Client.php b/src/Connection/Client.php index 656db37..adc6c66 100644 --- a/src/Connection/Client.php +++ b/src/Connection/Client.php @@ -10,19 +10,25 @@ use Bunny\Exception\BunnyException; use Bunny\Exception\ClientException; use Bunny\Protocol; +use Contributte\RabbitMQ\Connection\Exception\WaitTimeoutException; use Nette\Utils\Strings; +use Psr\Log\LoggerInterface; +use function time; /** * @codeCoverageIgnore */ class Client extends BunnyClient { + protected ?LoggerInterface $logger; + /** * Constructor. * * @param array $options + * @param LoggerInterface|null $logger */ - public function __construct(array $options = []) + public function __construct(array $options = [], ?LoggerInterface $logger = null) { parent::__construct($options); @@ -32,6 +38,8 @@ public function __construct(array $options = []) $this->options['heartbeat_callback'] = is_callable($options['heartbeat_callback']) ? $options['heartbeat_callback'] : null; + + $this->logger = $logger; } /** @@ -43,6 +51,7 @@ public function sendHeartbeat(): void $this->flushWriteBuffer(); $this->options['heartbeat_callback'] && $this->options['heartbeat_callback'](); + $this->logger?->debug('Bunny: heartbeat called'); } public function syncDisconnect(int $replyCode = 0, string $replyText = ""): bool @@ -76,6 +85,7 @@ protected function write(): void { if ($this->stream && feof($this->stream)) { $this->syncDisconnect(Constants::STATUS_RESOURCE_ERROR, "Connection closed by server unexpectedly"); + $this->logger?->debug('Bunny: Broken pipe detected, server closed stream'); throw new ClientException("Broken pipe or closed connection."); } @@ -87,6 +97,9 @@ protected function write(): void } error_clear_last(); + $this->logger?->debug('Bunny: Broken pipe detected, send of bytes failed', [ + 'last_error' => $last, + ]); throw new ClientException('Broken pipe or closed connection.'); } } @@ -172,15 +185,26 @@ public function run($maxSeconds = null): void } while ($this->running); } - public function waitForConfirm(int $channel): Protocol\MethodBasicAckFrame|Protocol\MethodBasicNackFrame + public function waitForConfirm(int $channel, ?int $timeout = null): Protocol\MethodBasicAckFrame|Protocol\MethodBasicNackFrame { $frame = null; // psalm bug + $time = time(); + + $checkTimeout = static function () use ($time, $timeout): void { + if ($time + $timeout < time()) { + throw new WaitTimeoutException('Timeout reached'); + } + }; + while (true) { + $timeout && $checkTimeout(); + /** * @phpstan-ignore-next-line */ while (($frame = $this->getReader()->consumeFrame($this->getReadBuffer())) === null) { $this->feedReadBuffer(); + $timeout && $checkTimeout(); } if ($frame->channel === $channel && ($frame instanceof Protocol\MethodBasicNackFrame || $frame instanceof Protocol\MethodBasicAckFrame)) { @@ -200,4 +224,10 @@ public function waitForConfirm(int $channel): Protocol\MethodBasicAckFrame|Proto $this->enqueue($frame); } } + + protected function feedReadBuffer(): bool + { + $this->logger?->debug('Bunny: read buffer called'); + return parent::feedReadBuffer(); + } } diff --git a/src/Connection/Connection.php b/src/Connection/Connection.php index 3f2f4ab..3952d9e 100644 --- a/src/Connection/Connection.php +++ b/src/Connection/Connection.php @@ -7,6 +7,7 @@ use Bunny\Channel; use Bunny\Exception\ClientException; use Contributte\RabbitMQ\Connection\Exception\ConnectionException; +use Psr\Log\LoggerInterface; use function in_array; use function max; use function time; @@ -15,6 +16,7 @@ final class Connection implements IConnection { private const HEARTBEAT_INTERVAL = 1; + private const CONFIRM_INTERVAL = 3; private Client $bunnyClient; @@ -23,6 +25,7 @@ final class Connection implements IConnection */ private array $connectionParams; private float $heartbeat; + private int $publishConfirm; private int $lastBeat = 0; private ?Channel $channel = null; @@ -45,7 +48,8 @@ public function __construct( ?array $ssl = null, ?callable $cycleCallback = null, ?callable $heartbeatCallback = null, - private bool $publishConfirm = false, + bool|int $publishConfirm = false, + ?LoggerInterface $logger = null, ) { $this->connectionParams = [ 'host' => $host, @@ -64,13 +68,17 @@ public function __construct( 'heartbeat_callback' => $heartbeatCallback, ]; - $this->bunnyClient = $this->createNewConnection(); + $this->bunnyClient = $this->createNewConnection($logger); $this->heartbeat = max($heartbeat, self::HEARTBEAT_INTERVAL); if (!$lazy) { $this->lastBeat = time(); $this->bunnyClient->connect(); } + + $this->publishConfirm = $publishConfirm === true + ? self::CONFIRM_INTERVAL + : (int) $publishConfirm; } @@ -143,6 +151,11 @@ public function connectIfNeeded(): void } public function isPublishConfirm(): bool + { + return $this->publishConfirm > 0; + } + + public function getPublishConfirm(): int { return $this->publishConfirm; } @@ -161,9 +174,9 @@ public function getVhost(): string return $this->connectionParams['vhost']; } - private function createNewConnection(): Client + private function createNewConnection(?LoggerInterface $logger = null): Client { - return new Client($this->connectionParams); + return new Client($this->connectionParams, $logger); } public function isConnected(): bool diff --git a/src/Connection/ConnectionFactory.php b/src/Connection/ConnectionFactory.php index 70e3f37..b693ef4 100644 --- a/src/Connection/ConnectionFactory.php +++ b/src/Connection/ConnectionFactory.php @@ -5,6 +5,7 @@ namespace Contributte\RabbitMQ\Connection; use Contributte\RabbitMQ\Connection\Exception\ConnectionFactoryException; +use Psr\Log\LoggerInterface; final class ConnectionFactory { @@ -20,7 +21,7 @@ final class ConnectionFactory private array $requests = []; - public function __construct(private ConnectionsDataBag $connectionsDataBag) + public function __construct(private ConnectionsDataBag $connectionsDataBag, private ?LoggerInterface $logger = null) { } @@ -120,6 +121,7 @@ private function create(string $name): IConnection cycleCallback: fn () => $this->sendHeartbeat(), heartbeatCallback: $connectionData['heartbeatCallback'] ?? null, publishConfirm: $connectionData['publishConfirm'], + logger: $this->logger, ); } } diff --git a/src/Connection/Exception/WaitTimeoutException.php b/src/Connection/Exception/WaitTimeoutException.php new file mode 100644 index 0000000..a96e808 --- /dev/null +++ b/src/Connection/Exception/WaitTimeoutException.php @@ -0,0 +1,11 @@ + Expect::bool(true), 'ssl' => Expect::array(null)->required(false), 'heartbeatCallback' => Expect::array(null)->required(false), - 'publishConfirm' => Expect::bool(false), + 'publishConfirm' => Expect::anyOf( + Expect::bool(), + Expect::int(), + )->default(false), 'admin' => Expect::structure([ 'port' => Expect::int(15672), 'secure' => Expect::bool(false), @@ -53,6 +58,6 @@ public function setup(ContainerBuilder $builder, array $config = []): ServiceDef return $builder->addDefinition($this->extension->prefix('connectionFactory')) ->setFactory(ConnectionFactory::class) - ->setArguments([$connectionsDataBag]); + ->setArguments([$connectionsDataBag, $builder->getByType(LoggerInterface::class)]); } } diff --git a/src/Producer/Producer.php b/src/Producer/Producer.php index ca8524f..b78d70a 100644 --- a/src/Producer/Producer.php +++ b/src/Producer/Producer.php @@ -8,6 +8,7 @@ use Bunny\Protocol\MethodBasicNackFrame; use Contributte\RabbitMQ\Connection\Client; use Contributte\RabbitMQ\Connection\Exception\PublishException; +use Contributte\RabbitMQ\Connection\Exception\WaitTimeoutException; use Contributte\RabbitMQ\Exchange\IExchange; use Contributte\RabbitMQ\LazyDeclarator; use Contributte\RabbitMQ\Queue\IQueue; @@ -86,11 +87,16 @@ private function tryPublish(IQueue|IExchange $target, string $message, array $he return; } - $frame = $client->waitForConfirm($target->getConnection()->getChannel()->getChannelId()); + $frame = $client->waitForConfirm($target->getConnection()->getChannel()->getChannelId(), $target->getConnection()->getPublishConfirm()); if ($frame instanceof MethodBasicNackFrame && $frame->deliveryTag === $deliveryTag) { throw new PublishException("Publish of message failed.\nExchange:{$exchange}\nRoutingKey:{$routingKey}"); } } + } catch (WaitTimeoutException $e) { + throw new PublishException( + "Confirm message timeout.\nExchange:{$exchange}\nRoutingKey:{$routingKey}\n", + previous: $e + ); } catch (ClientException $e) { if ($try >= 2) { throw $e;