diff --git a/README.md b/README.md index 5875d33..447ceb8 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,37 @@ $client->listen('some_channel') $client->query("NOTIFY some_channel, 'Hello World'")->subscribe(); ``` +## Example - Connecting over TLS with CA cert +```php + +$client = new PgAsync\Client([ + "host" => "127.0.0.1", + "port" => "5432", + "user" => "matt", + "database" => "matt", + "tls" => true, +], Loop::get(), new Connector([ + 'tls' => [ + 'starttls' => true, + 'cafile' => '/path/to/ca.crt', + ], +])); + +$client->query('SELECT * FROM channel')->subscribe( + function ($row) { + var_dump($row); + }, + function ($e) { + echo "Failed.\n"; + }, + function () { + echo "Complete.\n"; + } +); + + +``` + ## Install With [composer](https://getcomposer.org/) install into you project with: diff --git a/composer.json b/composer.json index 309a457..13dcbda 100644 --- a/composer.json +++ b/composer.json @@ -36,11 +36,18 @@ "php": ">=7.0.0", "voryx/event-loop": "^3.0 || ^2.0.2", "reactivex/rxphp": "^2.0", - "react/socket": "^1.0 || ^0.8 || ^0.7", + "react/promise-stream": "^1.5", + "react/socket": "dev-1.x-opportunistic-tls-connection as 1.999.999", "evenement/evenement": "^2.0 | ^3.0" }, "require-dev": { "phpunit/phpunit": ">=8.5.23 || ^6.5.5", "react/dns": "^1.0" - } + }, + "repositories": [ + { + "type": "vcs", + "url": "https://github.com/WyriHaximus-labs/socket" + } + ] } diff --git a/src/PgAsync/Connection.php b/src/PgAsync/Connection.php index 1f896f7..739c190 100644 --- a/src/PgAsync/Connection.php +++ b/src/PgAsync/Connection.php @@ -10,6 +10,7 @@ use PgAsync\Command\Execute; use PgAsync\Command\Parse; use PgAsync\Command\PasswordMessage; +use PgAsync\Command\SSLRequest; use PgAsync\Command\Sync; use PgAsync\Command\Terminate; use PgAsync\Message\Authentication; @@ -30,9 +31,15 @@ use PgAsync\Message\ReadyForQuery; use PgAsync\Message\RowDescription; use PgAsync\Command\StartupMessage; +use React\EventLoop\Loop; use React\EventLoop\LoopInterface; +use React\Promise\Promise; +use React\Socket\ConnectionInterface; use React\Socket\Connector; use React\Socket\ConnectorInterface; +use React\Socket\DelayedSpecialStreamEncryption; +use React\Socket\OpportunisticTlsConnectionInterface; +use React\Socket\StartTlsConnectionInterface; use React\Stream\DuplexStreamInterface; use Rx\Disposable\CallbackDisposable; use Rx\Disposable\EmptyDisposable; @@ -41,6 +48,8 @@ use Rx\ObserverInterface; use Rx\SchedulerInterface; use Rx\Subject\Subject; +use function React\Promise\resolve; +use function React\Promise\Stream\first; class Connection extends EventEmitter { @@ -129,6 +138,7 @@ class Connection extends EventEmitter /** @var bool */ private $auto_disconnect = false; + private $tls = false; private $password; public function __construct(array $parameters, LoopInterface $loop, ConnectorInterface $connector = null) @@ -153,6 +163,11 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt unset($parameters['password']); } + if (array_key_exists('tls', $parameters)) { + $this->tls = $parameters['tls']; + unset($parameters['tls']); + } + if (isset($parameters['auto_disconnect'])) { $this->auto_disconnect = $parameters['auto_disconnect']; unset($parameters['auto_disconnect']); @@ -168,7 +183,7 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt $this->queryType = static::QUERY_SIMPLE; $this->connStatus = static::CONNECTION_NEEDED; $this->socket = $connector ?: new Connector($loop); - $this->uri = 'tcp://' . $parameters['host'] . ':' . $parameters['port']; + $this->uri = 'opportunistic+tls://' . $parameters['host'] . ':' . $parameters['port']; $this->notificationSubject = new Subject(); $this->cancelPending = false; $this->cancelRequested = false; @@ -185,23 +200,39 @@ private function start() $this->connStatus = static::CONNECTION_STARTED; $this->socket->connect($this->uri)->then( - function (DuplexStreamInterface $stream) { - $this->stream = $stream; - $this->connStatus = static::CONNECTION_MADE; + function (OpportunisticTlsConnectionInterface $stream) { + (new Promise(function (callable $resolve, callable $reject) use ($stream) { + if ($this->tls) { + first($stream)->then(function ($data) use ($resolve, $reject, $stream) { + if (trim($data) === 'S') { + $stream->enableEncryption()->then($resolve, $reject); + return; + } + + $resolve($stream); + }, $reject); + + $ssl = new SSLRequest(); + $stream->write($ssl->encodedMessage()); + return; + } - $stream->on('close', [$this, 'onClose']); + $resolve($stream); + }))->then(function (DuplexStreamInterface $stream) { + $this->stream = $stream; + $this->connStatus = static::CONNECTION_MADE; - $stream->on('data', [$this, 'onData']); + $stream->on('close', [$this, 'onClose']); - // $ssl = new SSLRequest(); - // $stream->write($ssl->encodedMessage()); + $stream->on('data', [$this, 'onData']); - $startupParameters = $this->parameters; - unset($startupParameters['host'], $startupParameters['port']); + $startupParameters = $this->parameters; + unset($startupParameters['host'], $startupParameters['port']); - $startup = new StartupMessage(); - $startup->setParameters($startupParameters); - $stream->write($startup->encodedMessage()); + $startup = new StartupMessage(); + $startup->setParameters($startupParameters); + $stream->write($startup->encodedMessage()); + })->done(); }, function ($e) { // connection error @@ -566,11 +597,11 @@ function (ObserverInterface $observer, SchedulerInterface $scheduler = null) use $this->processQueue(); return new CallbackDisposable(function () use ($q) { - if ($this->currentCommand === $q && $q->isActive()) { - $this->cancelRequested = true; - } - $q->cancel(); - }); + if ($this->currentCommand === $q && $q->isActive()) { + $this->cancelRequested = true; + } + $q->cancel(); + }); } );