Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support HTTP keep-alive for HTTP client (reusing persistent connections) #486

Merged
merged 4 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Browser.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class Browser
private $baseUrl;
private $protocolVersion = '1.1';
private $defaultHeaders = array(
'Connection' => 'close',
'User-Agent' => 'ReactPHP/1'
);

Expand Down
17 changes: 6 additions & 11 deletions src/Client/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,25 @@
namespace React\Http\Client;

use Psr\Http\Message\RequestInterface;
use React\EventLoop\LoopInterface;
use React\Http\Io\ClientConnectionManager;
use React\Http\Io\ClientRequestStream;
use React\Socket\Connector;
use React\Socket\ConnectorInterface;

/**
* @internal
*/
class Client
{
private $connector;
/** @var ClientConnectionManager */
private $connectionManager;

public function __construct(LoopInterface $loop, ConnectorInterface $connector = null)
public function __construct(ClientConnectionManager $connectionManager)
{
if ($connector === null) {
$connector = new Connector(array(), $loop);
}

$this->connector = $connector;
$this->connectionManager = $connectionManager;
}

/** @return ClientRequestStream */
public function request(RequestInterface $request)
{
return new ClientRequestStream($this->connector, $request);
return new ClientRequestStream($this->connectionManager, $request);
}
}
137 changes: 137 additions & 0 deletions src/Io/ClientConnectionManager.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
<?php

namespace React\Http\Io;

use Psr\Http\Message\UriInterface;
use React\EventLoop\LoopInterface;
use React\EventLoop\TimerInterface;
use React\Promise\PromiseInterface;
use React\Socket\ConnectionInterface;
use React\Socket\ConnectorInterface;

/**
* [Internal] Manages outgoing HTTP connections for the HTTP client
*
* @internal
* @final
*/
class ClientConnectionManager
{
/** @var ConnectorInterface */
private $connector;

/** @var LoopInterface */
private $loop;

/** @var string[] */
private $idleUris = array();

/** @var ConnectionInterface[] */
private $idleConnections = array();

/** @var TimerInterface[] */
private $idleTimers = array();

/** @var \Closure[] */
private $idleStreamHandlers = array();

/** @var float */
private $maximumTimeToKeepAliveIdleConnection = 0.001;

public function __construct(ConnectorInterface $connector, LoopInterface $loop)
{
$this->connector = $connector;
$this->loop = $loop;
}

/**
* @return PromiseInterface<ConnectionInterface>
*/
public function connect(UriInterface $uri)
{
$scheme = $uri->getScheme();
if ($scheme !== 'https' && $scheme !== 'http') {
return \React\Promise\reject(new \InvalidArgumentException(
'Invalid request URL given'
));
}

$port = $uri->getPort();
if ($port === null) {
$port = $scheme === 'https' ? 443 : 80;
}
$uri = ($scheme === 'https' ? 'tls://' : '') . $uri->getHost() . ':' . $port;

// Reuse idle connection for same URI if available
foreach ($this->idleConnections as $id => $connection) {
if ($this->idleUris[$id] === $uri) {
assert($this->idleStreamHandlers[$id] instanceof \Closure);
$connection->removeListener('close', $this->idleStreamHandlers[$id]);
$connection->removeListener('data', $this->idleStreamHandlers[$id]);
$connection->removeListener('error', $this->idleStreamHandlers[$id]);

assert($this->idleTimers[$id] instanceof TimerInterface);
$this->loop->cancelTimer($this->idleTimers[$id]);
unset($this->idleUris[$id], $this->idleConnections[$id], $this->idleTimers[$id], $this->idleStreamHandlers[$id]);

return \React\Promise\resolve($connection);
}
}

// Create new connection if no idle connection to same URI is available
return $this->connector->connect($uri);
}

/**
* Hands back an idle connection to the connection manager for possible future reuse.
*
* @return void
*/
public function keepAlive(UriInterface $uri, ConnectionInterface $connection)
{
$scheme = $uri->getScheme();
assert($scheme === 'https' || $scheme === 'http');

$port = $uri->getPort();
if ($port === null) {
$port = $scheme === 'https' ? 443 : 80;
}

$this->idleUris[] = ($scheme === 'https' ? 'tls://' : '') . $uri->getHost() . ':' . $port;
$this->idleConnections[] = $connection;

$that = $this;
$cleanUp = function () use ($connection, $that) {
// call public method to support legacy PHP 5.3
$that->cleanUpConnection($connection);
};

// clean up and close connection when maximum time to keep-alive idle connection has passed
$this->idleTimers[] = $this->loop->addTimer($this->maximumTimeToKeepAliveIdleConnection, $cleanUp);

// clean up and close connection when unexpected close/data/error event happens during idle time
$this->idleStreamHandlers[] = $cleanUp;
$connection->on('close', $cleanUp);
$connection->on('data', $cleanUp);
$connection->on('error', $cleanUp);
}

/**
* @internal
* @return void
*/
public function cleanUpConnection(ConnectionInterface $connection) // private (PHP 5.4+)
{
$id = \array_search($connection, $this->idleConnections, true);
if ($id === false) {
return;
}

assert(\is_int($id));
assert($this->idleTimers[$id] instanceof TimerInterface);
$this->loop->cancelTimer($this->idleTimers[$id]);
unset($this->idleUris[$id], $this->idleConnections[$id], $this->idleTimers[$id], $this->idleStreamHandlers[$id]);

$connection->close();
}
}
63 changes: 37 additions & 26 deletions src/Io/ClientRequestStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
namespace React\Http\Io;

use Evenement\EventEmitter;
use Psr\Http\Message\MessageInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use React\Http\Message\Response;
use React\Promise;
use React\Socket\ConnectionInterface;
use React\Socket\ConnectorInterface;
use React\Stream\WritableStreamInterface;
use RingCentral\Psr7 as gPsr;

Expand All @@ -26,8 +24,8 @@ class ClientRequestStream extends EventEmitter implements WritableStreamInterfac
const STATE_HEAD_WRITTEN = 2;
const STATE_END = 3;

/** @var ConnectorInterface */
private $connector;
/** @var ClientConnectionManager */
private $connectionManager;

/** @var RequestInterface */
private $request;
Expand All @@ -44,9 +42,9 @@ class ClientRequestStream extends EventEmitter implements WritableStreamInterfac

private $pendingWrites = '';

public function __construct(ConnectorInterface $connector, RequestInterface $request)
public function __construct(ClientConnectionManager $connectionManager, RequestInterface $request)
{
$this->connector = $connector;
$this->connectionManager = $connectionManager;
$this->request = $request;
}

Expand All @@ -65,7 +63,7 @@ private function writeHead()
$pendingWrites = &$this->pendingWrites;
$that = $this;

$promise = $this->connect();
$promise = $this->connectionManager->connect($this->request->getUri());
$promise->then(
function (ConnectionInterface $connection) use ($request, &$connectionRef, &$stateRef, &$pendingWrites, $that) {
$connectionRef = $connection;
Expand Down Expand Up @@ -174,11 +172,20 @@ public function handleData($data)
$this->connection = null;
$this->buffer = '';

// take control over connection handling and close connection once response body closes
// take control over connection handling and check if we can reuse the connection once response body closes
$that = $this;
$request = $this->request;
$connectionManager = $this->connectionManager;
$successfulEndReceived = false;
$input = $body = new CloseProtectionStream($connection);
$input->on('close', function () use ($connection, $that) {
$connection->close();
$input->on('close', function () use ($connection, $that, $connectionManager, $request, $response, &$successfulEndReceived) {
// only reuse connection after successful response and both request and response allow keep alive
if ($successfulEndReceived && $connection->isReadable() && $that->hasMessageKeepAliveEnabled($response) && $that->hasMessageKeepAliveEnabled($request)) {
$connectionManager->keepAlive($request->getUri(), $connection);
} else {
$connection->close();
}

$that->close();
});

Expand All @@ -193,6 +200,9 @@ public function handleData($data)
$length = (int) $response->getHeaderLine('Content-Length');
}
$response = $response->withBody($body = new ReadableBodyStream($body, $length));
$body->on('end', function () use (&$successfulEndReceived) {
$successfulEndReceived = true;
});

// emit response with streaming response body (see `Sender`)
$this->emit('response', array($response, $body));
Expand Down Expand Up @@ -253,27 +263,28 @@ public function close()
$this->removeAllListeners();
}

protected function connect()
/**
* @internal
* @return bool
* @link https://www.rfc-editor.org/rfc/rfc9112#section-9.3
* @link https://www.rfc-editor.org/rfc/rfc7230#section-6.1
*/
public function hasMessageKeepAliveEnabled(MessageInterface $message)
{
$scheme = $this->request->getUri()->getScheme();
if ($scheme !== 'https' && $scheme !== 'http') {
return Promise\reject(
new \InvalidArgumentException('Invalid request URL given')
);
}
$connectionOptions = \RingCentral\Psr7\normalize_header(\strtolower($message->getHeaderLine('Connection')));

$host = $this->request->getUri()->getHost();
$port = $this->request->getUri()->getPort();
if (\in_array('close', $connectionOptions, true)) {
return false;
}

if ($scheme === 'https') {
$host = 'tls://' . $host;
if ($message->getProtocolVersion() === '1.1') {
return true;
}

if ($port === null) {
$port = $scheme === 'https' ? 443 : 80;
if (\in_array('keep-alive', $connectionOptions, true)) {
return true;
}

return $this->connector
->connect($host . ':' . $port);
return false;
}
}
14 changes: 6 additions & 8 deletions src/Io/Sender.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use React\Http\Client\Client as HttpClient;
use React\Promise\PromiseInterface;
use React\Promise\Deferred;
use React\Socket\Connector;
use React\Socket\ConnectorInterface;
use React\Stream\ReadableStreamInterface;

Expand Down Expand Up @@ -49,7 +50,11 @@ class Sender
*/
public static function createFromLoop(LoopInterface $loop, ConnectorInterface $connector = null)
{
return new self(new HttpClient($loop, $connector));
if ($connector === null) {
$connector = new Connector(array(), $loop);
}

return new self(new HttpClient(new ClientConnectionManager($connector, $loop)));
}

private $http;
Expand Down Expand Up @@ -93,13 +98,6 @@ public function send(RequestInterface $request)
$size = 0;
}

// automatically add `Connection: close` request header for HTTP/1.1 requests to avoid connection reuse
if ($request->getProtocolVersion() === '1.1') {
$request = $request->withHeader('Connection', 'close');
} else {
$request = $request->withoutHeader('Connection');
}

// automatically add `Authorization: Basic …` request header if URL includes `user:pass@host`
if ($request->getUri()->getUserInfo() !== '' && !$request->hasHeader('Authorization')) {
$request = $request->withHeader('Authorization', 'Basic ' . \base64_encode($request->getUri()->getUserInfo()));
Expand Down
2 changes: 1 addition & 1 deletion src/Io/Transaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ private function makeRedirectRequest(RequestInterface $request, UriInterface $lo
->withMethod($request->getMethod() === 'HEAD' ? 'HEAD' : 'GET')
->withoutHeader('Content-Type')
->withoutHeader('Content-Length')
->withBody(new EmptyBodyStream());
->withBody(new BufferedBody(''));
}

return $request;
Expand Down
Loading