Skip to content

Commit b9587a0

Browse files
committedFeb 2, 2016
Add request/response info to MessageSubject, subprotocol support, fixed onComplete not passing through
1 parent 2a8c361 commit b9587a0

5 files changed

+85
-10
lines changed
 

‎src/Websocket/Client.php

+17-1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,19 @@ private function startConnection()
6565
// TODO: Should validate response
6666
//$cNegotiator->validateResponse($response);
6767

68+
$subprotoHeader = "";
69+
70+
$psr7Response = new \GuzzleHttp\Psr7\Response(
71+
$response->getCode(),
72+
$response->getHeaders(),
73+
null,
74+
$response->getVersion()
75+
);
76+
77+
if (count($psr7Response->getHeader('Sec-WebSocket-Protocol')) == 1) {
78+
$subprotoHeader = $psr7Response->getHeader('Sec-WebSocket-Protocol')[0];
79+
}
80+
6881
parent::onNext(new MessageSubject(
6982
new AnonymousObservable(function (ObserverInterface $observer) use ($response) {
7083
$response->on('data', function ($data) use ($observer) {
@@ -106,7 +119,10 @@ function () use ($request) {
106119
}
107120
),
108121
true,
109-
$this->useMessageObject
122+
$this->useMessageObject,
123+
$subprotoHeader,
124+
$cNegotiator->getRequest(),
125+
$psr7Response
110126
));
111127
});
112128

‎src/Websocket/MessageSubject.php

+38-5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
namespace Rx\Websocket;
44

5+
use Psr\Http\Message\RequestInterface;
6+
use Psr\Http\Message\ResponseInterface;
57
use Rx\Websocket\RFC6455\Messaging\Protocol\Frame;
68
use Rx\Observable;
79
use Rx\Observable\AnonymousObservable;
@@ -27,21 +29,34 @@ class MessageSubject extends Subject
2729
/** @var string */
2830
private $subProtocol;
2931

32+
/** @var RequestInterface */
33+
private $request;
34+
35+
/** @var ResponseInterface */
36+
private $response;
37+
3038
/**
3139
* ConnectionSubject constructor.
3240
* @param ObservableInterface $rawDataIn
3341
* @param ObserverInterface $rawDataOut
3442
* @param bool $mask
3543
* @param bool $useMessageObject
3644
* @param string $subProtocol
45+
* @param RequestInterface $request
46+
* @param ResponseInterface $response
3747
*/
3848
public function __construct(
3949
ObservableInterface $rawDataIn,
4050
ObserverInterface $rawDataOut,
4151
$mask = false,
4252
$useMessageObject = false,
43-
$subProtocol = ""
53+
$subProtocol = "",
54+
RequestInterface $request,
55+
ResponseInterface $response
4456
) {
57+
$this->request = $request;
58+
$this->response = $response;
59+
4560
$this->rawDataIn = new AnonymousObservable(function ($observer) use ($rawDataIn) {
4661
return $rawDataIn->subscribe($observer);
4762
});
@@ -92,10 +107,12 @@ function () use ($frames) {
92107
->filter(function (Frame $frame) {
93108
return $frame->getOpcode() === $frame::OP_PING;
94109
})
95-
->subscribe(new CallbackObserver(function (Frame $frame) {
96-
$pong = new Frame($frame->getPayload(), true, Frame::OP_PONG);
97-
$this->sendFrame($pong);
98-
}));
110+
->subscribe(new CallbackObserver(
111+
function (Frame $frame) {
112+
$pong = new Frame($frame->getPayload(), true, Frame::OP_PONG);
113+
$this->sendFrame($pong);
114+
}
115+
));
99116

100117
$frames
101118
->filter(function (Frame $frame) {
@@ -177,4 +194,20 @@ public function getSubProtocol()
177194
{
178195
return $this->subProtocol;
179196
}
197+
198+
/**
199+
* @return RequestInterface
200+
*/
201+
public function getRequest()
202+
{
203+
return $this->request;
204+
}
205+
206+
/**
207+
* @return ResponseInterface
208+
*/
209+
public function getResponse()
210+
{
211+
return $this->response;
212+
}
180213
}

‎src/Websocket/Server.php

+24-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
namespace Rx\Websocket;
44

5+
use Guzzle\Http\Message\RequestInterface;
6+
use GuzzleHttp\Psr7\Uri;
57
use Rx\Websocket\RFC6455\Encoding\Validator;
68
use Rx\Websocket\RFC6455\Handshake\Negotiator;
79
use React\Http\Request;
@@ -56,12 +58,21 @@ private function startServer()
5658

5759
$http = new \React\Http\Server($socket);
5860
$http->on('request', function (Request $request, Response $response) use ($negotiator) {
61+
$uri = new Uri($request->getPath());
62+
if (count($request->getQuery()) > 0) {
63+
$uri = $uri->withQuery(\GuzzleHttp\Psr7\build_query($request->getQuery()));
64+
}
65+
5966
$psrRequest = new \GuzzleHttp\Psr7\Request(
6067
$request->getMethod(),
61-
$request->getPath(),
68+
$uri,
6269
$request->getHeaders()
6370
);
6471

72+
// cram the remote address into the header in out own X- header so
73+
// the user will have access to it
74+
$psrRequest = $psrRequest->withAddedHeader("X-RxWebsocket-Remote-Address", $request->remoteAddress);
75+
6576
$negotiatorResponse = $negotiator->handshake($psrRequest);
6677

6778
$response->writeHead(
@@ -77,6 +88,11 @@ private function startServer()
7788
return;
7889
}
7990

91+
$subProtocol = "";
92+
if (count($negotiatorResponse->getHeader('Sec-WebSocket-Protocol')) > 0) {
93+
$subProtocol = $negotiatorResponse->getHeader('Sec-WebSocket-Protocol')[0];
94+
}
95+
8096
$connection = new MessageSubject(
8197
new AnonymousObservable(
8298
function (ObserverInterface $observer) use ($request) {
@@ -89,6 +105,9 @@ function (ObserverInterface $observer) use ($request) {
89105
$request->on('close', function () use ($observer) {
90106
$observer->onCompleted();
91107
});
108+
$request->on('end', function () use ($observer) {
109+
$observer->onCompleted();
110+
});
92111

93112
return new CallbackDisposable(
94113
function () use ($request) {
@@ -109,7 +128,10 @@ function () use ($response) {
109128
}
110129
),
111130
false,
112-
$this->useMessageObject
131+
$this->useMessageObject,
132+
$subProtocol,
133+
$psrRequest,
134+
$negotiatorResponse
113135
);
114136

115137
$this->connectionSubject->onNext($connection);

‎src/Websocket/WebsocketFrameOperator.php

+3-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ function ($data) use ($observer) {
7676
$this->frame = new Frame();
7777
}
7878
}
79-
}
79+
},
80+
[$observer, 'onError'],
81+
[$observer, 'onCompleted']
8082
));
8183
}
8284
}

‎src/Websocket/WebsocketMessageOperator.php

+3-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ function ($frame) use ($observer) {
5959
}
6060
$this->message = new Message();
6161
}
62-
}
62+
},
63+
[$observer, 'onError'],
64+
[$observer, 'onCompleted']
6365
));
6466
}
6567
}

0 commit comments

Comments
 (0)
Please sign in to comment.