Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Expose DuplexStreamInterface for connections that requires Upgrades #382

Closed
wants to merge 15 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactored some parts so tests can pass
bosunski committed Jul 28, 2020
commit 7f3890fd61cb866c3fda4624de61b30ac96c60f9
62 changes: 12 additions & 50 deletions src/Client/Request.php
Original file line number Diff line number Diff line change
@@ -61,31 +61,6 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe
$streamRef = $stream;

$stream->on('drain', array($that, 'handleDrain'));

$buffer = '';
$headerParser = function ($data) use (&$headerParser, $buffer, $streamRef, $that) {
$buffer .= $data;

static $headerParsed = false;

if ($headerParsed || false == strpos($buffer, "\r\n\r\n")) {
return $that->handleData($data);
}

$headerParsed = true;

$response = gPsr\parse_response($buffer);

if ($that->responseIsAnUpgradeResponse($response)) {
$that->stream->removeListener('data', $headerParser);

$that->emit('upgrade', array($that->stream, $response, $that));
return;
}

return $that->handleData($data);
};

$stream->on('data', array($that, 'handleData'));
$stream->on('end', array($that, 'handleEnd'));
$stream->on('error', array($that, 'handleError'));
@@ -168,32 +143,19 @@ public function handleData($data)
{
$this->buffer .= $data;

static $headerParsed = false;

if ($headerParsed || false == strpos($this->buffer, "\r\n\r\n")) {
return $this->handleEx();
}

$headerParsed = true;

$response = gPsr\parse_response($this->buffer);

if ($this->responseIsAnUpgradeResponse($response)) {
$this->stream->removeListener('data', array($this, 'handleData'));

$this->emit('upgrade', array($this->stream, $response, $this));
return;
}

// buffer until double CRLF (or double LF for compatibility with legacy servers)
return $this->handleEx();
}

protected function handleEx()
{
if (false !== strpos($this->buffer, "\r\n\r\n") || false !== strpos($this->buffer, "\n\n")) {
try {
list($response, $bodyChunk) = $this->parseResponse($this->buffer);
$psrResponse = gPsr\parse_response($this->buffer);

if ($this->responseIsAnUpgradeResponse($psrResponse)) {
$this->stream->removeListener('data', array($this, 'handleData'));

$this->emit('upgrade', array($this->stream, $psrResponse, $this));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I like exposing the React\Http\Client\Request which is currently marked as @internal only. There's ongoing effort to remove all classes from this namespace as there's some duplication with the Io namespace.

Do we really need to expose the response and request objects?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is the right Request object to expose, but it's kind of a good idea to expose a request object because there could be cases where you want to verify some things about the request after upgrade and then close the connection if that verification fails. That's my thought about it. But sure, I might be wrong here.

I actually read a piece of Node docs to see how upgrade is being handled by the consumer side. I followed that in adding the upgrade event also. Here is a sample piece of how node handles the upgrade.

Sure, If there's a better you think this could be or should be done, I'll be willing to make it work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the whole upgrade event. How does this interfere with the existing request/response semantics?

I like how this is exposed on the server side via https://github.com/reactphp/http#streaming-outgoing-response, perhaps we can find a similar solution here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually followed a Node piece I read here as I also mentioned above.

I will check out the sample you sent, I kind of like it, knowing that a ThroughStream also implements DuplexStreamInterface.

return;
}

list($response, $bodyChunk) = $this->parseResponse($psrResponse);
} catch (\InvalidArgumentException $exception) {
$this->emit('error', array($exception));
}
@@ -277,9 +239,9 @@ public function close()
$this->removeAllListeners();
}

protected function parseResponse($data)
protected function parseResponse($psrResponse)
{
$psrResponse = gPsr\parse_response($data);
// $psrResponse = gPsr\parse_response($data);
$headers = array_map(function($val) {
if (1 === count($val)) {
$val = $val[0];
5 changes: 2 additions & 3 deletions src/Io/Sender.php
Original file line number Diff line number Diff line change
@@ -79,9 +79,8 @@ public function __construct(HttpClient $http)
public function send(RequestInterface $request, $upgrade = false)
{
$body = $request->getBody();
$size = $body->getSize();

$requestStream = $this->createRequestStream($request);
list($size, $requestStream) = $this->createRequestStream($request);

$deferred = new Deferred(function ($_, $reject) use ($requestStream) {
// close request stream if request is cancelled
@@ -182,6 +181,6 @@ protected function createRequestStream(RequestInterface $request)
$headers[$name] = implode(', ', $values);
}

return $this->http->request($request->getMethod(), (string)$request->getUri(), $headers, $request->getProtocolVersion());
return array($size, $this->http->request($request->getMethod(), (string)$request->getUri(), $headers, $request->getProtocolVersion()));
}
}