Skip to content

Commit de523b6

Browse files
authored
Updates for RxPHP v2 and PHP 7 #7
2 parents dce6a05 + 0f3f1d4 commit de523b6

9 files changed

+57
-133
lines changed

.travis.yml

+1-6
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
11
language: php
22

33
php:
4-
- 5.6
54
- 7
6-
- hhvm
7-
8-
matrix:
9-
allow_failures:
10-
- php: hhvm
5+
- 7.1
116

127
before_install:
138
- export PATH=$HOME/.local/bin:$PATH

README.md

+5-5
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ Rx\Websocket is a PHP Websocket library.
88
```php
99
$client = new \Rx\Websocket\Client('ws://127.0.0.1:9191/');
1010

11-
$client->subscribeCallback(
11+
$client->subscribe(
1212
function (\Rx\Websocket\MessageSubject $ms) {
13-
$ms->subscribeCallback(
13+
$ms->subscribe(
1414
function ($message) {
1515
echo $message . "\n";
1616
}
@@ -36,7 +36,7 @@ $client->subscribeCallback(
3636
```php
3737
$server = new \Rx\Websocket\Server('127.0.0.1', 9191);
3838

39-
$server->subscribeCallback(function (\Rx\Websocket\MessageSubject $cs) {
39+
$server->subscribe(function (\Rx\Websocket\MessageSubject $cs) {
4040
$cs->subscribe($cs);
4141
});
4242
```
@@ -45,8 +45,8 @@ $server->subscribeCallback(function (\Rx\Websocket\MessageSubject $cs) {
4545
```php
4646
$server = new \Rx\Websocket\Server('127.0.0.1', 9191);
4747

48-
$server->subscribeCallback(function (\Rx\Websocket\MessageSubject $cs) {
49-
$cs->subscribeCallback(function ($message) {
48+
$server->subscribe(function (\Rx\Websocket\MessageSubject $cs) {
49+
$cs->subscribe(function ($message) {
5050
echo $message;
5151
});
5252
});

composer.json

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "rx/websocket",
33
"type": "library",
4-
"description": "Async Websockets for PHP using Rx",
4+
"description": "Websockets for PHP using Rx",
55
"keywords": [
66
"websocket",
77
"websockets",
@@ -30,11 +30,11 @@
3030
"require": {
3131
"react/http": "^0.4.1",
3232
"react/http-client": "^0.4.10",
33-
"voryx/event-loop": "^0.2.1",
34-
"reactivex/rxphp": "^1.5.1",
35-
"ratchet/rfc6455": "^0.2.2"
33+
"voryx/event-loop": "^2.0",
34+
"ratchet/rfc6455": "^0.2.2",
35+
"reactivex/rxphp": "^2.0"
3636
},
3737
"require-dev": {
38-
"phpunit/phpunit": "^5.7.0"
38+
"phpunit/phpunit": "~5.7.0"
3939
}
4040
}

src/Client.php

+11-23
Original file line numberDiff line numberDiff line change
@@ -5,49 +5,38 @@
55
use GuzzleHttp\Psr7\Uri;
66
use Ratchet\RFC6455\Handshake\ClientNegotiator;
77
use React\Dns\Resolver\Factory;
8+
use React\Dns\Resolver\Resolver;
9+
use React\EventLoop\LoopInterface;
810
use React\HttpClient\Request;
911
use React\HttpClient\Response;
1012
use Rx\Disposable\CallbackDisposable;
13+
use Rx\DisposableInterface;
1114
use Rx\Observable;
1215
use Rx\Observable\AnonymousObservable;
1316
use Rx\Observer\CallbackObserver;
1417
use Rx\ObserverInterface;
15-
use Rx\SchedulerInterface;
1618

1719
class Client extends Observable
1820
{
19-
/** @var string */
2021
protected $url;
21-
22-
/** @var bool */
2322
private $useMessageObject;
24-
25-
/** @var array */
2623
private $subProtocols;
24+
private $loop;
25+
private $dnsResolver;
2726

28-
/**
29-
* Websocket constructor.
30-
* @param $url
31-
* @param bool $useMessageObject
32-
* @param array $subProtocols
33-
*/
34-
public function __construct($url, $useMessageObject = false, array $subProtocols = [])
27+
public function __construct(string $url, bool $useMessageObject = false, array $subProtocols = [], LoopInterface $loop = null, Resolver $dnsResolver = null)
3528
{
3629
$this->url = $url;
3730
$this->useMessageObject = $useMessageObject;
3831
$this->subProtocols = $subProtocols;
32+
$this->loop = $loop ?: \EventLoop\getLoop();
33+
$this->dnsResolver = $dnsResolver ?: (new Factory())->createCached('8.8.8.8', $this->loop);
3934
}
4035

41-
public function subscribe(ObserverInterface $clientObserver, $scheduler = null)
36+
public function _subscribe(ObserverInterface $clientObserver): DisposableInterface
4237
{
43-
$loop = \EventLoop\getLoop();
44-
45-
$dnsResolverFactory = new Factory();
46-
47-
$dnsResolver = $dnsResolverFactory->createCached('8.8.8.8', $loop);
48-
4938
$factory = new \React\HttpClient\Factory();
50-
$client = $factory->create($loop, $dnsResolver);
39+
$client = $factory->create($this->loop, $this->dnsResolver);
5140

5241
$cNegotiator = new ClientNegotiator();
5342

@@ -90,7 +79,7 @@ public function subscribe(ObserverInterface $clientObserver, $scheduler = null)
9079
$subprotoHeader = $psr7Response->getHeader('Sec-WebSocket-Protocol');
9180

9281
$clientObserver->onNext(new MessageSubject(
93-
new AnonymousObservable(function (ObserverInterface $observer, SchedulerInterface $scheduler) use ($response, $clientObserver) {
82+
new AnonymousObservable(function (ObserverInterface $observer) use ($response, $clientObserver) {
9483

9584
$response->on('data', function ($data) use ($observer) {
9685
$observer->onNext($data);
@@ -111,7 +100,6 @@ public function subscribe(ObserverInterface $clientObserver, $scheduler = null)
111100
$clientObserver->onCompleted();
112101
});
113102

114-
115103
return new CallbackDisposable(function () use ($response) {
116104
// commented this out because disposal was causing the other
117105
// end (the request) to close also - which causes the pending messages

src/MessageSubject.php

+18-59
Original file line numberDiff line numberDiff line change
@@ -10,53 +10,26 @@
1010
use Ratchet\RFC6455\Messaging\Message;
1111
use Ratchet\RFC6455\Messaging\MessageBuffer;
1212
use Ratchet\RFC6455\Messaging\MessageInterface;
13-
use Rx\DisposableInterface;
1413
use Rx\Observable;
15-
use Rx\Observer\CallbackObserver;
1614
use Rx\ObserverInterface;
1715
use Rx\Subject\Subject;
1816

1917
class MessageSubject extends Subject
2018
{
21-
/** @var Observable */
2219
protected $rawDataIn;
23-
24-
/** @var ObserverInterface */
2520
protected $rawDataOut;
26-
27-
/** @var bool */
2821
protected $mask;
29-
30-
/** @var Observable */
3122
protected $controlFrames;
32-
33-
/** @var string */
3423
private $subProtocol;
35-
36-
/** @var RequestInterface */
3724
private $request;
38-
39-
/** @var ResponseInterface */
4025
private $response;
41-
42-
/** @var DisposableInterface */
4326
private $rawDataDisp;
4427

45-
/**
46-
* ConnectionSubject constructor.
47-
* @param Observable $rawDataIn
48-
* @param ObserverInterface $rawDataOut
49-
* @param bool $mask
50-
* @param bool $useMessageObject
51-
* @param string $subProtocol
52-
* @param RequestInterface $request
53-
* @param ResponseInterface $response
54-
*/
5528
public function __construct(
5629
Observable $rawDataIn,
5730
ObserverInterface $rawDataOut,
58-
$mask = false,
59-
$useMessageObject = false,
31+
bool $mask = false,
32+
bool $useMessageObject = false,
6033
$subProtocol = "",
6134
RequestInterface $request,
6235
ResponseInterface $response
@@ -101,23 +74,21 @@ function (FrameInterface $frame) use ($rawDataOut) {
10174
!$this->mask
10275
);
10376

104-
$this->rawDataDisp = $this->rawDataIn
105-
->subscribe(new CallbackObserver(
106-
function ($data) use ($messageBuffer) {
107-
$messageBuffer->onData($data);
108-
},
109-
function (\Exception $exception) {
110-
parent::onError($exception);
111-
},
112-
function () {
113-
parent::onCompleted();
114-
}
115-
));
77+
$this->rawDataDisp = $this->rawDataIn->subscribe(
78+
function ($data) use ($messageBuffer) {
79+
$messageBuffer->onData($data);
80+
},
81+
function (\Exception $exception) {
82+
parent::onError($exception);
83+
},
84+
function () {
85+
parent::onCompleted();
86+
});
11687

11788
$this->subProtocol = $subProtocol;
11889
}
11990

120-
private function createCloseFrame($closeCode = Frame::CLOSE_NORMAL)
91+
private function createCloseFrame(int $closeCode = Frame::CLOSE_NORMAL): Frame
12192
{
12293
$frame = new Frame(pack('n', $closeCode), true, Frame::OP_CLOSE);
12394
if ($this->mask) {
@@ -141,10 +112,7 @@ public function sendFrame(Frame $frame)
141112
$this->rawDataOut->onNext($frame->getContents());
142113
}
143114

144-
/**
145-
* @return Observable
146-
*/
147-
public function getControlFrames()
115+
public function getControlFrames(): Observable
148116
{
149117
return $this->controlFrames;
150118
}
@@ -160,7 +128,7 @@ public function onNext($value)
160128
$this->sendFrame(new Frame($value));
161129
}
162130

163-
public function onError(\Exception $exception)
131+
public function onError(\Throwable $exception)
164132
{
165133
$this->rawDataDisp->dispose();
166134

@@ -174,26 +142,17 @@ public function onCompleted()
174142
parent::onCompleted();
175143
}
176144

177-
/**
178-
* @return string
179-
*/
180-
public function getSubProtocol()
145+
public function getSubProtocol(): string
181146
{
182147
return $this->subProtocol;
183148
}
184149

185-
/**
186-
* @return RequestInterface
187-
*/
188-
public function getRequest()
150+
public function getRequest(): RequestInterface
189151
{
190152
return $this->request;
191153
}
192154

193-
/**
194-
* @return ResponseInterface
195-
*/
196-
public function getResponse()
155+
public function getResponse(): ResponseInterface
197156
{
198157
return $this->response;
199158
}

src/Server.php

+7-22
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
use GuzzleHttp\Psr7\Uri;
66
use Ratchet\RFC6455\Handshake\RequestVerifier;
77
use Ratchet\RFC6455\Handshake\ServerNegotiator;
8+
use React\EventLoop\LoopInterface;
89
use React\Http\Request;
910
use React\Http\Response;
1011
use Rx\Disposable\CallbackDisposable;
12+
use Rx\DisposableInterface;
1113
use Rx\Observable;
1214
use Rx\Observable\AnonymousObservable;
1315
use Rx\Observer\CallbackObserver;
@@ -16,33 +18,23 @@
1618
class Server extends Observable
1719
{
1820
protected $bindAddress;
19-
2021
protected $port;
21-
22-
/** @var bool */
2322
private $useMessageObject;
24-
25-
/** @var array */
2623
private $subProtocols;
24+
private $loop;
2725

28-
/**
29-
* Server constructor.
30-
* @param $bindAddress
31-
* @param $port
32-
* @param bool $useMessageObject
33-
* @param array $subProtocols
34-
*/
35-
public function __construct($bindAddress, $port, $useMessageObject = false, array $subProtocols = [])
26+
public function __construct(string $bindAddress, int $port, bool $useMessageObject = false, array $subProtocols = [], LoopInterface $loop = null)
3627
{
3728
$this->bindAddress = $bindAddress;
3829
$this->port = $port;
3930
$this->useMessageObject = $useMessageObject;
4031
$this->subProtocols = $subProtocols;
32+
$this->loop = $loop ?: \EventLoop\getLoop();
4133
}
4234

43-
public function subscribe(ObserverInterface $observer, $scheduler = null)
35+
public function _subscribe(ObserverInterface $observer): DisposableInterface
4436
{
45-
$socket = new \React\Socket\Server(\EventLoop\getLoop());
37+
$socket = new \React\Socket\Server($this->loop);
4638

4739
$negotiator = new ServerNegotiator(new RequestVerifier());
4840
if (!empty($this->subProtocols)) {
@@ -132,13 +124,6 @@ function () use ($response) {
132124

133125
$socket->listen($this->port, $this->bindAddress);
134126

135-
// $http->on('end', function () {});
136-
// $http->on('data', function () {});
137-
// $http->on('pause', function () {});
138-
// $http->on('resume', function () {});
139-
140-
$this->started = true;
141-
142127
return new CallbackDisposable(function () use ($socket) {
143128
$socket->shutdown();
144129
});

test/ABResultsTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ private function verifyAutobahnResults($fileName)
1717
if ($result->behavior === "INFORMATIONAL") {
1818
continue;
1919
}
20-
$this->assertTrue("OK" === $result->behavior || "NON-STRICT" === $result->behavior, "Autobahn test case " . $name . " in " . $fileName);
20+
$this->assertContains($result->behavior, ['OK', 'NON-STRICT'], "Autobahn test case " . $name . " in " . $fileName);
2121
}
2222
}
2323

0 commit comments

Comments
 (0)