forked from clue/reactphp-docker
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathStreamingParser.php
148 lines (127 loc) · 4.67 KB
/
StreamingParser.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
<?php
namespace Clue\React\Docker\Io;
use React\Promise\PromiseInterface;
use Clue\JsonStream\StreamingJsonParser;
use React\Promise\Deferred;
use React\Stream\ReadableStream;
use React\Stream\ReadableStreamInterface;
use RuntimeException;
use React\Promise\CancellablePromiseInterface;
use Clue\React\Promise\Stream;
use Psr\Http\Message\ResponseInterface;
/**
* StreamingParser is a simple helper to work with the streaming body of HTTP response objects
*
* @internal
* @see ResponseParser for working with buffered bodies
*/
class StreamingParser
{
/**
* Returns a readable JSON stream for the given ResponseInterface
*
* @param PromiseInterface $promise Promise<ResponseInterface>
* @return ReadableStreamInterface
* @uses self::parsePlainSream()
*/
public function parseJsonStream(PromiseInterface $promise)
{
// application/json
$in = $this->parsePlainStream($promise);
$out = new ReadableStream();
// invalid/closing input stream => return closed output stream
if (!$in->isReadable()) {
$out->close();
return $out;
}
// forward each data chunk to the streaming JSON parser
$parser = new StreamingJsonParser();
$in->on('data', function ($data) use ($parser, $out) {
$objects = $parser->push($data);
foreach ($objects as $object) {
if (isset($object['error'])) {
$out->emit('error', array(new JsonProgressException($object), $out));
$out->close();
return;
}
$out->emit('progress', array($object, $out));
}
});
// forward error and make sure stream closes
$in->on('error', function ($error) use ($out) {
$out->emit('error', array($error, $out));
$out->close();
});
// closing either stream closes the other one
$in->on('close', array($out, 'close'));
$out->on('close', array($in, 'close'));
return $out;
}
/**
* Returns a readable plain text stream for the given ResponseInterface
*
* @param PromiseInterface $promise Promise<ResponseInterface>
* @return ReadableStreamInterface
*/
public function parsePlainStream(PromiseInterface $promise)
{
// text/plain
return Stream\unwrapReadable($promise->then(function (ResponseInterface $response) {
return $response->getBody();
}));
}
/**
* Returns a readable plain text stream for the given multiplexed stream using Docker's "attach multiplexing protocol"
*
* @param ReadableStreamInterface $input
* @param string $stderrEvent
* @return ReadableStreamInterface
*/
public function demultiplexStream(ReadableStreamInterface $input, $stderrEvent = null)
{
return new ReadableDemultiplexStream($input, $stderrEvent);
}
/**
* Returns a promise which resolves with the buffered stream contents of the given stream
*
* @param ReadableStreamInterface $stream
* @return PromiseInterface Promise<string, Exception>
*/
public function bufferedStream(ReadableStreamInterface $stream)
{
return Stream\buffer($stream);
}
/**
* Returns a promise which resolves with an array of all "progress" events
*
* @param ReadableStreamInterface $stream
* @param string $progressEventName the name of the event to collect
* @return PromiseInterface Promise<array, Exception>
*/
public function deferredStream(ReadableStreamInterface $stream, $progressEventName)
{
// cancelling the deferred will (try to) close the stream
$deferred = new Deferred(function () use ($stream) {
$stream->close();
throw new RuntimeException('Cancelled');
});
if ($stream->isReadable()) {
// buffer all data events for deferred resolving
$buffered = array();
$stream->on($progressEventName, function ($data) use (&$buffered) {
$buffered []= $data;
});
// error event rejects
$stream->on('error', function ($error) use ($deferred) {
$deferred->reject($error);
});
// close event resolves with buffered events (unless already error'ed)
$stream->on('close', function () use ($deferred, &$buffered) {
$deferred->resolve($buffered);
});
} else {
$deferred->reject(new RuntimeException('Stream already ended, looks like it could not be opened'));
}
return $deferred->promise();
}
}