diff --git a/README.md b/README.md index d8dd491..28a137e 100644 --- a/README.md +++ b/README.md @@ -221,6 +221,23 @@ $stream->on('close', function () { }); ``` +Note that by default the output of both STDOUT and STDERR will be emitted +as normal `data` events. You can optionally pass a custom event name which +will be used to emit STDERR data so that it can be handled separately. +Note that the normal streaming primitives likely do not know about this +event, so special care may have to be taken. +Also note that this option has no effect if you execute with a TTY. + +```php +$stream = $client->execStartStream($exec, $tty, 'stderr'); +$stream->on('data', function ($data) { + echo 'STDOUT data: ' . $data; +}); +$stream->on('stderr', function ($data) { + echo 'STDERR data: ' . $data; +}); +``` + See also the [streaming exec example](examples/exec-stream.php) and the [exec benchmark example](examples/benchmark-exec.php). The TTY mode should be set depending on whether your command needs a TTY diff --git a/examples/exec-inspect.php b/examples/exec-inspect.php index 9d2f677..d13a33e 100644 --- a/examples/exec-inspect.php +++ b/examples/exec-inspect.php @@ -23,14 +23,14 @@ $factory = new Factory($loop); $client = $factory->createClient(); -$client->execCreate($container, $cmd, true)->then(function ($info) use ($client) { +$client->execCreate($container, $cmd)->then(function ($info) use ($client) { echo 'Created with info: ' . json_encode($info) . PHP_EOL; return $client->execInspect($info['Id']); })->then(function ($info) use ($client) { echo 'Inspected after creation: ' . json_encode($info, JSON_PRETTY_PRINT) . PHP_EOL; - return $client->execStart($info['ID'], true)->then(function ($out) use ($client, $info) { + return $client->execStart($info['ID'])->then(function ($out) use ($client, $info) { echo 'Starting returned: '; var_dump($out); diff --git a/examples/exec-stream.php b/examples/exec-stream.php index 9fb7a55..e288dcf 100644 --- a/examples/exec-stream.php +++ b/examples/exec-stream.php @@ -27,18 +27,36 @@ $out = new Stream(STDOUT, $loop); $out->pause(); -$client->execCreate($container, $cmd, true)->then(function ($info) use ($client, $out) { - $stream = $client->execStartStream($info['Id'], true); +$stderr = new Stream(STDERR, $loop); +$stderr->pause(); + +// unkown exit code by default +$exit = 1; + +$client->execCreate($container, $cmd)->then(function ($info) use ($client, $out, $stderr, &$exit) { + $stream = $client->execStartStream($info['Id'], false, 'stderr'); $stream->pipe($out); + // forward custom stderr event to STDERR stream + $stream->on('stderr', function ($data) use ($stderr, $stream) { + if ($stderr->write($data) === false) { + $stream->pause(); + $stderr->once('drain', function () use ($stream) { + $stream->resume(); + }); + } + }); + $stream->on('error', 'printf'); - // exit with error code of executed command once it closes - $stream->on('close', function () use ($client, $info) { - $client->execInspect($info['Id'])->then(function ($info) { - exit($info['ExitCode']); + // remember exit code of executed command once it closes + $stream->on('close', function () use ($client, $info, &$exit) { + $client->execInspect($info['Id'])->then(function ($info) use (&$exit) { + $exit = $info['ExitCode']; }, 'printf'); }); }, 'printf'); $loop->run(); + +exit($exit); diff --git a/src/Client.php b/src/Client.php index afb41a0..772ad20 100644 --- a/src/Client.php +++ b/src/Client.php @@ -1008,16 +1008,24 @@ public function execStartDetached($exec, $tty = false) * This works for command output of any size as only small chunks have to * be kept in memory. * - * @param string $exec exec ID - * @param boolean $tty tty mode + * Note that by default the output of both STDOUT and STDERR will be emitted + * as normal "data" events. You can optionally pass a custom event name which + * will be used to emit STDERR data so that it can be handled separately. + * Note that the normal streaming primitives likely do not know about this + * event, so special care may have to be taken. + * Also note that this option has no effect if you execute with a TTY. + * + * @param string $exec exec ID + * @param boolean $tty tty mode + * @param string $stderrEvent custom event to emit for STDERR data (otherwise emits as "data") * @return ReadableStreamInterface stream of exec data * @link https://docs.docker.com/reference/api/docker_remote_api_v1.15/#exec-start * @see self::execStart() * @see self::execStartDetached() */ - public function execStartStream($exec, $tty = false) + public function execStartStream($exec, $tty = false, $stderrEvent = null) { - return $this->streamingParser->parsePlainStream( + $stream = $this->streamingParser->parsePlainStream( $this->browser->withOptions(array('streaming' => true))->post( $this->uri->expand( '/exec/{exec}/start', @@ -1033,6 +1041,13 @@ public function execStartStream($exec, $tty = false) )) ) ); + + // this is a multiplexed stream unless this is started with a TTY + if (!$tty) { + $stream = $this->streamingParser->demultiplexStream($stream, $stderrEvent); + } + + return $stream; } /** diff --git a/src/Io/ReadableDemultiplexStream.php b/src/Io/ReadableDemultiplexStream.php new file mode 100644 index 0000000..e684262 --- /dev/null +++ b/src/Io/ReadableDemultiplexStream.php @@ -0,0 +1,136 @@ +multiplexed = $multiplexed; + + if ($stderrEvent === null) { + $stderrEvent = 'data'; + } + + $this->stderrEvent = $stderrEvent; + + $out = $this; + $buffer =& $this->buffer; + $closed =& $this->closed; + + // pass all input data chunks through the parser + $multiplexed->on('data', array($out, 'push')); + + // forward end event to output (unless parsing is still in progress) + $multiplexed->on('end', function () use (&$buffer, $out, &$closed) { + // ignore duplicate end events + if ($closed) { + return; + } + + // buffer must be empty on end, otherwise this is an error situation + if ($buffer === '') { + $out->emit('end', array()); + } else { + $out->emit('error', array(new \RuntimeException('Stream ended within incomplete multiplexed chunk'))); + } + $out->close(); + }); + + // forward error event to output + $multiplexed->on('error', function ($error) use ($out) { + $out->emit('error', array($error)); + $out->close(); + }); + + // forward close event to output + $multiplexed->on('close', function ($error) use ($out) { + $out->close(); + }); + } + + /** + * push the given stream chunk into the parser buffer and try to extract all frames + * + * @internal + * @param string $chunk + */ + public function push($chunk) + { + $this->buffer .= $chunk; + + while ($this->buffer !== '') { + if (!isset($this->buffer[7])) { + // last header byte not set => no complete header in buffer + break; + } + + $header = unpack('Cstream/x/x/x/Nlength', substr($this->buffer, 0, 8)); + + if (!isset($this->buffer[7 + $header['length']])) { + // last payload byte not set => message payload is incomplete + break; + } + + $payload = substr($this->buffer, 8, $header['length']); + $this->buffer = (string)substr($this->buffer, 8 + $header['length']); + + $this->emit( + ($header['stream'] === 2) ? $this->stderrEvent : 'data', + array($payload) + ); + } + } + + public function pause() + { + $this->multiplexed->pause(); + } + + public function resume() + { + $this->multiplexed->resume(); + } + + public function isReadable() + { + return $this->multiplexed->isReadable(); + } + + public function pipe(WritableStreamInterface $dest, array $options = array()) + { + return Util::pipe($this, $dest, $options); + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->closed = true; + + // closing output stream closes input stream + $this->multiplexed->close(); + + $this->emit('close', array()); + } +} diff --git a/src/Io/StreamingParser.php b/src/Io/StreamingParser.php index 6f91437..69c68be 100644 --- a/src/Io/StreamingParser.php +++ b/src/Io/StreamingParser.php @@ -84,6 +84,18 @@ public function parsePlainStream(PromiseInterface $promise) })); } + /** + * Returns a readable plain text stream for the given multiplexed stream using Docker's "attach multiplexing protocol" + * + * @param ReadableStreamInterface $input + * @param string $stderrEvent + * @return ReadableStreamInterface + */ + public function demultiplexStream(ReadableStreamInterface $input, $stderrEvent = null) + { + return new ReadableDemultiplexStream($input, $stderrEvent); + } + /** * Returns a promise which resolves with the buffered stream contents of the given stream * diff --git a/tests/ClientTest.php b/tests/ClientTest.php index b0761df..2033d97 100644 --- a/tests/ClientTest.php +++ b/tests/ClientTest.php @@ -408,22 +408,48 @@ public function testExecStart() $this->expectRequest('POST', '/exec/123/start', $this->createResponse($data)); $this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream)); + $this->streamingParser->expects($this->once())->method('demultiplexStream')->with($stream)->willReturn($stream); $this->streamingParser->expects($this->once())->method('bufferedStream')->with($this->equalTo($stream))->willReturn(Promise\resolve($data)); $this->expectPromiseResolveWith($data, $this->client->execStart(123, $config)); } - public function testExecStartStream() + public function testExecStartStreamWithoutTtyWillDemultiplex() { $config = array(); $stream = $this->getMock('React\Stream\ReadableStreamInterface'); $this->expectRequest('POST', '/exec/123/start', $this->createResponse()); $this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream)); + $this->streamingParser->expects($this->once())->method('demultiplexStream')->with($stream)->willReturn($stream); $this->assertSame($stream, $this->client->execStartStream(123, $config)); } + public function testExecStartStreamWithTtyWillNotDemultiplex() + { + $config = array('Tty' => true); + $stream = $this->getMock('React\Stream\ReadableStreamInterface'); + + $this->expectRequest('POST', '/exec/123/start', $this->createResponse()); + $this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream)); + $this->streamingParser->expects($this->never())->method('demultiplexStream'); + + $this->assertSame($stream, $this->client->execStartStream(123, $config)); + } + + public function testExecStartStreamWithCustomStderrEvent() + { + $config = array(); + $stream = $this->getMock('React\Stream\ReadableStreamInterface'); + + $this->expectRequest('POST', '/exec/123/start', $this->createResponse()); + $this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream)); + $this->streamingParser->expects($this->once())->method('demultiplexStream')->with($stream, 'stderr')->willReturn($stream); + + $this->assertSame($stream, $this->client->execStartStream(123, $config, 'stderr')); + } + public function testExecResize() { $this->expectRequestFlow('POST', '/exec/123/resize?w=800&h=600', $this->createResponse(), 'expectEmpty'); diff --git a/tests/FunctionalClientTest.php b/tests/FunctionalClientTest.php index 80e14d8..4ea8147 100644 --- a/tests/FunctionalClientTest.php +++ b/tests/FunctionalClientTest.php @@ -133,7 +133,7 @@ public function testExecInspectBeforeRunning($exec) */ public function testExecStartWhileRunning($exec) { - $promise = $this->client->execStart($exec, true); + $promise = $this->client->execStart($exec); $output = Block\await($promise, $this->loop); $this->assertEquals('hello world', $output); @@ -165,25 +165,46 @@ public function testExecStringCommandWithOutputWhileRunning($container) $this->assertTrue(is_array($exec)); $this->assertTrue(is_string($exec['Id'])); - $promise = $this->client->execStart($exec['Id'], true); + $promise = $this->client->execStart($exec['Id']); $output = Block\await($promise, $this->loop); $this->assertEquals('hello world', $output); } + /** + * @depends testStartRunning + * @param string $container + */ + public function testExecStreamOutputInMultipleChunksWhileRunning($container) + { + $promise = $this->client->execCreate($container, 'echo -n hello && sleep 0 && echo -n world'); + $exec = Block\await($promise, $this->loop); + + $this->assertTrue(is_array($exec)); + $this->assertTrue(is_string($exec['Id'])); + + $stream = $this->client->execStartStream($exec['Id']); + $stream->once('data', $this->expectCallableOnceWith('hello')); + $stream->on('end', $this->expectCallableOnce()); + + $output = Block\await(Stream\buffer($stream), $this->loop); + + $this->assertEquals('helloworld', $output); + } + /** * @depends testStartRunning * @param string $container */ public function testExecUserSpecificCommandWithOutputWhileRunning($container) { - $promise = $this->client->execCreate($container, 'whoami', true, false, true, true, 'nobody'); + $promise = $this->client->execCreate($container, 'whoami', false, false, true, true, 'nobody'); $exec = Block\await($promise, $this->loop); $this->assertTrue(is_array($exec)); $this->assertTrue(is_string($exec['Id'])); - $promise = $this->client->execStart($exec['Id'], true); + $promise = $this->client->execStart($exec['Id']); $output = Block\await($promise, $this->loop); $this->assertEquals('nobody', rtrim($output)); @@ -195,18 +216,62 @@ public function testExecUserSpecificCommandWithOutputWhileRunning($container) */ public function testExecStringCommandWithStderrOutputWhileRunning($container) { - $promise = $this->client->execCreate($container, 'echo -n hello world >&2', true); + $promise = $this->client->execCreate($container, 'echo -n hello world >&2'); $exec = Block\await($promise, $this->loop); $this->assertTrue(is_array($exec)); $this->assertTrue(is_string($exec['Id'])); - $promise = $this->client->execStart($exec['Id'], true); + $promise = $this->client->execStart($exec['Id']); $output = Block\await($promise, $this->loop); $this->assertEquals('hello world', $output); } + /** + * @depends testStartRunning + * @param string $container + */ + public function testExecStreamCommandWithTtyAndStderrOutputWhileRunning($container) + { + $promise = $this->client->execCreate($container, 'echo -n hello world >&2', true); + $exec = Block\await($promise, $this->loop); + + $this->assertTrue(is_array($exec)); + $this->assertTrue(is_string($exec['Id'])); + + $stream = $this->client->execStartStream($exec['Id'], true); + $stream->once('data', $this->expectCallableOnce('hello world')); + $stream->on('end', $this->expectCallableOnce()); + + $output = Block\await(Stream\buffer($stream), $this->loop); + + $this->assertEquals('hello world', $output); + } + + /** + * @depends testStartRunning + * @param string $container + */ + public function testExecStreamStderrCustomEventWhileRunning($container) + { + $promise = $this->client->execCreate($container, 'echo -n hello world >&2'); + $exec = Block\await($promise, $this->loop); + + $this->assertTrue(is_array($exec)); + $this->assertTrue(is_string($exec['Id'])); + + $stream = $this->client->execStartStream($exec['Id'], false, 'err'); + $stream->on('err', $this->expectCallableOnceWith('hello world')); + $stream->on('data', $this->expectCallableNever()); + $stream->on('error', $this->expectCallableNever()); + $stream->on('end', $this->expectCallableOnce()); + + $output = Block\await(Stream\buffer($stream), $this->loop); + + $this->assertEquals('', $output); + } + /** * @depends testStartRunning * @param string $container diff --git a/tests/Io/ReadableDemultiplexStreamTest.php b/tests/Io/ReadableDemultiplexStreamTest.php new file mode 100644 index 0000000..e9f549b --- /dev/null +++ b/tests/Io/ReadableDemultiplexStreamTest.php @@ -0,0 +1,107 @@ +stream = new ReadableStream(); + $this->parser = new ReadableDemultiplexStream($this->stream); + } + + public function testStreamWillForwardEndAndClose() + { + $this->parser->on('data', $this->expectCallableNever()); + $this->parser->on('close', $this->expectCallableOnce()); + $this->parser->on('end', $this->expectCallableOnce()); + + $this->stream->emit('end', array()); + + $this->assertFalse($this->parser->isReadable()); + } + + public function testStreamWillForwardErrorAndClose() + { + $this->parser->on('error', $this->expectCallableOnce()); + $this->parser->on('close', $this->expectCallableOnce()); + $this->parser->on('end', $this->expectCallableNever()); + + $this->stream->emit('error', array(new \RuntimeException('Test'))); + + $this->assertFalse($this->parser->isReadable()); + } + + public function testStreamWillEmitErrorWhenEndingWithinStream() + { + $this->parser->on('data', $this->expectCallableNever()); + $this->parser->on('error', $this->expectCallableOnce()); + $this->parser->on('close', $this->expectCallableOnce()); + $this->parser->on('end', $this->expectCallableNever()); + + $this->stream->emit('data', array('XX')); + $this->stream->emit('end', array()); + + $this->assertFalse($this->parser->isReadable()); + } + + public function testStreamWillEmitDataOnCompleteFrame() + { + $this->parser->on('data', $this->expectCallableOnceWith('test')); + + $this->stream->emit('data', array("\x01\x00\x00\x00" . "\x00\x00\x00\x04" . "test")); + } + + public function testStreamWillNotEmitDataOnIncompleteFrameHeader() + { + $this->parser->on('data', $this->expectCallableNever()); + + $this->stream->emit('data', array("\x01\0\0\0")); + } + + public function testStreamWillNotEmitDataOnIncompleteFramePayload() + { + $this->parser->on('data', $this->expectCallableNever()); + + $this->stream->emit('data', array("\x01\0\0\0" . "\0\0\0\x04" . "te")); + } + + public function testStreamWillEmitDataOnCompleteFrameChunked() + { + $this->parser->on('data', $this->expectCallableOnceWith('test')); + + $this->stream->emit('data', array("\x01\x00\x00\x00" . "\x00\x00\x00\x04" . "te")); + $this->stream->emit('data', array("st")); + } + + public function testPipeWillBeForwardedToTargetStream() + { + $target = new WritableStream(); + $target->on('pipe', $this->expectCallableOnceWith($this->parser)); + + $this->parser->pipe($target); + } + + public function testPauseWillBeForwarded() + { + $this->stream = $this->getMock('React\Stream\ReadableStreamInterface'); + $this->stream->expects($this->once())->method('pause'); + $this->parser = new ReadableDemultiplexStream($this->stream); + + $this->parser->pause(); + } + + public function testResumeWillBeForwarded() + { + $this->stream = $this->getMock('React\Stream\ReadableStreamInterface'); + $this->stream->expects($this->once())->method('resume'); + $this->parser = new ReadableDemultiplexStream($this->stream); + + $this->parser->resume(); + } +} diff --git a/tests/Io/StreamingParserTest.php b/tests/Io/StreamingParserTest.php index 248b66f..5f59d20 100644 --- a/tests/Io/StreamingParserTest.php +++ b/tests/Io/StreamingParserTest.php @@ -121,4 +121,13 @@ public function testDeferredCancelingPromiseWillCloseStream() $stream->expects($this->once())->method('close'); $promise->cancel(); } + + public function testDemultiplexStreamWillReturnReadable() + { + $stream = new ReadableStream(); + + $out = $this->parser->demultiplexStream($stream); + + $this->assertInstanceOf('React\Stream\ReadableStreamInterface', $out); + } }