Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

Feature logger #22

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions src/Connection/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,25 @@
use Bunny\Exception\BunnyException;
use Bunny\Exception\ClientException;
use Bunny\Protocol;
use Contributte\RabbitMQ\Connection\Exception\WaitTimeoutException;
use Nette\Utils\Strings;
use Psr\Log\LoggerInterface;
use function time;

/**
* @codeCoverageIgnore
*/
class Client extends BunnyClient
{
protected ?LoggerInterface $logger;

/**
* Constructor.
*
* @param array<string|mixed> $options
* @param LoggerInterface|null $logger
*/
public function __construct(array $options = [])
public function __construct(array $options = [], ?LoggerInterface $logger = null)
{
parent::__construct($options);

Expand All @@ -32,6 +38,8 @@ public function __construct(array $options = [])
$this->options['heartbeat_callback'] = is_callable($options['heartbeat_callback'])
? $options['heartbeat_callback']
: null;

$this->logger = $logger;
}

/**
Expand All @@ -43,6 +51,7 @@ public function sendHeartbeat(): void
$this->flushWriteBuffer();

$this->options['heartbeat_callback'] && $this->options['heartbeat_callback']();
$this->logger?->debug('Bunny: heartbeat called');
}

public function syncDisconnect(int $replyCode = 0, string $replyText = ""): bool
Expand Down Expand Up @@ -76,6 +85,7 @@ protected function write(): void
{
if ($this->stream && feof($this->stream)) {
$this->syncDisconnect(Constants::STATUS_RESOURCE_ERROR, "Connection closed by server unexpectedly");
$this->logger?->debug('Bunny: Broken pipe detected, server closed stream');
throw new ClientException("Broken pipe or closed connection.");
}

Expand All @@ -87,6 +97,9 @@ protected function write(): void
}

error_clear_last();
$this->logger?->debug('Bunny: Broken pipe detected, send of bytes failed', [
'last_error' => $last,
]);
throw new ClientException('Broken pipe or closed connection.');
}
}
Expand Down Expand Up @@ -172,15 +185,26 @@ public function run($maxSeconds = null): void
} while ($this->running);
}

public function waitForConfirm(int $channel): Protocol\MethodBasicAckFrame|Protocol\MethodBasicNackFrame
public function waitForConfirm(int $channel, ?int $timeout = null): Protocol\MethodBasicAckFrame|Protocol\MethodBasicNackFrame
{
$frame = null; // psalm bug
$time = time();

$checkTimeout = static function () use ($time, $timeout): void {
if ($time + $timeout < time()) {
throw new WaitTimeoutException('Timeout reached');
}
};

while (true) {
$timeout && $checkTimeout();

/**
* @phpstan-ignore-next-line
*/
while (($frame = $this->getReader()->consumeFrame($this->getReadBuffer())) === null) {
$this->feedReadBuffer();
$timeout && $checkTimeout();
}

if ($frame->channel === $channel && ($frame instanceof Protocol\MethodBasicNackFrame || $frame instanceof Protocol\MethodBasicAckFrame)) {
Expand All @@ -200,4 +224,10 @@ public function waitForConfirm(int $channel): Protocol\MethodBasicAckFrame|Proto
$this->enqueue($frame);
}
}

protected function feedReadBuffer(): bool
{
$this->logger?->debug('Bunny: read buffer called');
return parent::feedReadBuffer();
}
}
21 changes: 17 additions & 4 deletions src/Connection/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Bunny\Channel;
use Bunny\Exception\ClientException;
use Contributte\RabbitMQ\Connection\Exception\ConnectionException;
use Psr\Log\LoggerInterface;
use function in_array;
use function max;
use function time;
Expand All @@ -15,6 +16,7 @@ final class Connection implements IConnection
{

private const HEARTBEAT_INTERVAL = 1;
private const CONFIRM_INTERVAL = 3;

private Client $bunnyClient;

Expand All @@ -23,6 +25,7 @@ final class Connection implements IConnection
*/
private array $connectionParams;
private float $heartbeat;
private int $publishConfirm;
private int $lastBeat = 0;
private ?Channel $channel = null;

Expand All @@ -45,7 +48,8 @@ public function __construct(
?array $ssl = null,
?callable $cycleCallback = null,
?callable $heartbeatCallback = null,
private bool $publishConfirm = false,
bool|int $publishConfirm = false,
?LoggerInterface $logger = null,
) {
$this->connectionParams = [
'host' => $host,
Expand All @@ -64,13 +68,17 @@ public function __construct(
'heartbeat_callback' => $heartbeatCallback,
];

$this->bunnyClient = $this->createNewConnection();
$this->bunnyClient = $this->createNewConnection($logger);
$this->heartbeat = max($heartbeat, self::HEARTBEAT_INTERVAL);

if (!$lazy) {
$this->lastBeat = time();
$this->bunnyClient->connect();
}

$this->publishConfirm = $publishConfirm === true
? self::CONFIRM_INTERVAL
: (int) $publishConfirm;
}


Expand Down Expand Up @@ -143,6 +151,11 @@ public function connectIfNeeded(): void
}

public function isPublishConfirm(): bool
{
return $this->publishConfirm > 0;
}

public function getPublishConfirm(): int
{
return $this->publishConfirm;
}
Expand All @@ -161,9 +174,9 @@ public function getVhost(): string
return $this->connectionParams['vhost'];
}

private function createNewConnection(): Client
private function createNewConnection(?LoggerInterface $logger = null): Client
{
return new Client($this->connectionParams);
return new Client($this->connectionParams, $logger);
}

public function isConnected(): bool
Expand Down
4 changes: 3 additions & 1 deletion src/Connection/ConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Contributte\RabbitMQ\Connection;

use Contributte\RabbitMQ\Connection\Exception\ConnectionFactoryException;
use Psr\Log\LoggerInterface;

final class ConnectionFactory
{
Expand All @@ -20,7 +21,7 @@ final class ConnectionFactory
private array $requests = [];


public function __construct(private ConnectionsDataBag $connectionsDataBag)
public function __construct(private ConnectionsDataBag $connectionsDataBag, private ?LoggerInterface $logger = null)
{
}

Expand Down Expand Up @@ -120,6 +121,7 @@ private function create(string $name): IConnection
cycleCallback: fn () => $this->sendHeartbeat(),
heartbeatCallback: $connectionData['heartbeatCallback'] ?? null,
publishConfirm: $connectionData['publishConfirm'],
logger: $this->logger,
);
}
}
11 changes: 11 additions & 0 deletions src/Connection/Exception/WaitTimeoutException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Contributte\RabbitMQ\Connection\Exception;

use Bunny\Exception\ClientException;

class WaitTimeoutException extends ClientException
{
}
1 change: 1 addition & 0 deletions src/Connection/IConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public function sendHeartbeat(): void;
public function isConnected(): bool;
public function getVhost(): string;
public function isPublishConfirm(): bool;
public function getPublishConfirm(): int;

/** @internal */
public function resetChannel(): void;
Expand Down
9 changes: 7 additions & 2 deletions src/DI/Helpers/ConnectionsHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
use Nette\DI\Definitions\ServiceDefinition;
use Nette\Schema\Expect;
use Nette\Schema\Schema;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerInterface;

final class ConnectionsHelper extends AbstractHelper
{
Expand All @@ -30,7 +32,10 @@ public function getConfigSchema(): Schema
'lazy' => Expect::bool(true),
'ssl' => Expect::array(null)->required(false),
'heartbeatCallback' => Expect::array(null)->required(false),
'publishConfirm' => Expect::bool(false),
'publishConfirm' => Expect::anyOf(
Expect::bool(),
Expect::int(),
)->default(false),
'admin' => Expect::structure([
'port' => Expect::int(15672),
'secure' => Expect::bool(false),
Expand All @@ -53,6 +58,6 @@ public function setup(ContainerBuilder $builder, array $config = []): ServiceDef

return $builder->addDefinition($this->extension->prefix('connectionFactory'))
->setFactory(ConnectionFactory::class)
->setArguments([$connectionsDataBag]);
->setArguments([$connectionsDataBag, $builder->getByType(LoggerInterface::class)]);
}
}
8 changes: 7 additions & 1 deletion src/Producer/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Bunny\Protocol\MethodBasicNackFrame;
use Contributte\RabbitMQ\Connection\Client;
use Contributte\RabbitMQ\Connection\Exception\PublishException;
use Contributte\RabbitMQ\Connection\Exception\WaitTimeoutException;
use Contributte\RabbitMQ\Exchange\IExchange;
use Contributte\RabbitMQ\LazyDeclarator;
use Contributte\RabbitMQ\Queue\IQueue;
Expand Down Expand Up @@ -86,11 +87,16 @@ private function tryPublish(IQueue|IExchange $target, string $message, array $he
return;
}

$frame = $client->waitForConfirm($target->getConnection()->getChannel()->getChannelId());
$frame = $client->waitForConfirm($target->getConnection()->getChannel()->getChannelId(), $target->getConnection()->getPublishConfirm());
if ($frame instanceof MethodBasicNackFrame && $frame->deliveryTag === $deliveryTag) {
throw new PublishException("Publish of message failed.\nExchange:{$exchange}\nRoutingKey:{$routingKey}");
}
}
} catch (WaitTimeoutException $e) {
throw new PublishException(
"Confirm message timeout.\nExchange:{$exchange}\nRoutingKey:{$routingKey}\n",
previous: $e
);
} catch (ClientException $e) {
if ($try >= 2) {
throw $e;
Expand Down