Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add execStartStream() API endpoint #37

Merged
merged 2 commits into from
Apr 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ execute arbitrary commands within isolated containers, stop running containers a
* [Commands](#commands)
* [Promises](#promises)
* [Blocking](#blocking)
* [Command streaming](#command-streaming)
* [TAR streaming](#tar-streaming)
* [JSON streaming](#json-streaming)
* [JsonProgressException](#jsonprogressexception)
Expand Down Expand Up @@ -183,6 +184,52 @@ $inspections = Block\awaitAll($promises, $loop);

Please refer to [clue/block-react](https://github.com/clue/php-block-react#readme) for more details.

#### Command streaming

The following API endpoint resolves with a buffered string of the command output
(STDOUT and/or STDERR):

```php
$client->execStart($exec);
```

Keep in mind that this means the whole string has to be kept in memory.
If you want to access the individual output chunks as they happen or
for bigger command outputs, it's usually a better idea to use a streaming
approach.

This works for (any number of) commands of arbitrary sizes.
The following API endpoint complements the default Promise-based API and returns
a [`Stream`](https://github.com/reactphp/stream) instance instead:

```php
$stream = $client->execStartStream($exec);
```

The resulting stream is a well-behaving readable stream that will emit
the normal stream events:

```php
$stream = $client->execStartStream($exec, $config);
$stream->on('data', function ($data) {
// data will be emitted in multiple chunk
echo $data;
});
$stream->on('close', function () {
// the stream just ended, this could(?) be a good thing
echo 'Ended' . PHP_EOL;
});
```

See also the [streaming exec example](examples/exec-stream.php) and the [exec benchmark example](examples/benchmark-exec.php).

Running this benchmark on my personal (rather mediocre) VM setup reveals that
the benchmark achieves a throughput of ~300 MiB/s while the (totally unfair)
comparison script using the plain Docker client only yields ~100 MiB/s.
Instead of me posting more details here, I encourage you to re-run the benchmark
yourself and adjust it to better suite your problem domain.
The key takeway here is: *PHP is faster than you probably thought*.

#### TAR streaming

The following API endpoints resolve with a string in the [TAR file format](https://en.wikipedia.org/wiki/Tar_%28computing%29):
Expand Down
45 changes: 45 additions & 0 deletions examples/benchmark-exec.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php
// this simple example executes a command within the given running container and
// displays how fast it can receive its output.
// expect this to be significantly faster than the (totally unfair) equivalent:
// $ docker exec asd dd if=/dev/zero bs=1M count=1000 | dd of=/dev/null

require __DIR__ . '/../vendor/autoload.php';

use React\EventLoop\Factory as LoopFactory;
use Clue\React\Docker\Factory;
use React\Stream\Stream;

$container = 'asd';
$cmd = array('dd', 'if=/dev/zero', 'bs=1M', 'count=1000');

if (isset($argv[1])) {
$container = $argv[1];
$cmd = array_slice($argv, 2);
}

$loop = LoopFactory::create();

$factory = new Factory($loop);
$client = $factory->createClient();

$client->execCreate($container, array('Cmd' => $cmd, 'AttachStdout' => true, 'AttachStderr' => true))->then(function ($info) use ($client) {
$stream = $client->execStartStream($info['Id'], array('Tty' => true));

$start = microtime(true);
$bytes = 0;
$stream->on('data', function ($chunk) use (&$bytes) {
$bytes += strlen($chunk);
});

$stream->on('error', 'printf');

// show stats when stream ends
$stream->on('close', function () use ($client, &$bytes, $start) {
$time = microtime(true) - $start;

echo 'Received ' . $bytes . ' bytes in ' . round($time, 1) . 's => ' . round($bytes / $time / 1024 / 1024, 1) . ' MiB/s' . PHP_EOL;
});
}, 'printf');

$loop->run();
44 changes: 44 additions & 0 deletions examples/exec-stream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php
// this example executes some commands within the given running container and
// displays the streaming output as it happens.

require __DIR__ . '/../vendor/autoload.php';

use React\EventLoop\Factory as LoopFactory;
use Clue\React\Docker\Factory;
use React\Stream\Stream;

$container = 'asd';
//$cmd = array('echo', 'hello world');
//$cmd = array('sleep', '2');
$cmd = array('sh', '-c', 'echo -n hello && sleep 1 && echo world && sleep 1 && env');
//$cmd = array('cat', 'invalid-path');

if (isset($argv[1])) {
$container = $argv[1];
$cmd = array_slice($argv, 2);
}

$loop = LoopFactory::create();

$factory = new Factory($loop);
$client = $factory->createClient();

$out = new Stream(STDOUT, $loop);
$out->pause();

$client->execCreate($container, array('Cmd' => $cmd, 'AttachStdout' => true, 'AttachStderr' => true, 'Tty' => true))->then(function ($info) use ($client, $out) {
$stream = $client->execStartStream($info['Id'], array('Tty' => true));
$stream->pipe($out);

$stream->on('error', 'printf');

// exit with error code of executed command once it closes
$stream->on('close', function () use ($client, $info) {
$client->execInspect($info['Id'])->then(function ($info) {
exit($info['ExitCode']);
}, 'printf');
});
}, 'printf');

$loop->run();
52 changes: 44 additions & 8 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,9 @@ public function execCreate($container, $config)
* as set up in the `execCreate()` call.
*
* Keep in mind that this means the whole string has to be kept in memory.
* If you want to access the individual output chunks as they happen or
* for bigger command outputs, it's usually a better idea to use a streaming
* approach, see `execStartStream()` for more details.
*
* If detach is true, this API returns after starting the exec command.
* Otherwise, this API sets up an interactive session with the exec command.
Expand All @@ -915,18 +918,51 @@ public function execCreate($container, $config)
* @param array $config (see link)
* @return PromiseInterface Promise<string> buffered exec data
* @link https://docs.docker.com/reference/api/docker_remote_api_v1.15/#exec-start
* @uses self::execStartStream()
* @see self::execStartStream()
*/
public function execStart($exec, $config = array())
{
return $this->postJson(
$this->uri->expand(
'/exec/{exec}/start',
return $this->streamingParser->bufferedStream(
$this->execStartStream($exec, $config)
);
}

/**
* Starts a previously set up exec instance id.
*
* This is a streaming API endpoint that returns a readable stream instance
* containing the command output, i.e. STDOUT and STDERR as set up in the
* `execCreate()` call.
*
* This works for command output of any size as only small chunks have to
* be kept in memory.
*
* If detach is true, this API returns after starting the exec command.
* Otherwise, this API sets up an interactive session with the exec command.
*
* @param string $exec exec ID
* @param array $config (see link)
* @return ReadableStreamInterface stream of exec data
* @link https://docs.docker.com/reference/api/docker_remote_api_v1.15/#exec-start
* @see self::execStart()
*/
public function execStartStream($exec, $config = array())
{
return $this->streamingParser->parsePlainStream(
$this->browser->withOptions(array('streaming' => true))->post(
$this->uri->expand(
'/exec/{exec}/start',
array(
'exec' => $exec
)
),
array(
'exec' => $exec
)
),
$config
)->then(array($this->parser, 'expectPlain'));
'Content-Type' => 'application/json'
),
$this->json($config)
)
);
}

/**
Expand Down
11 changes: 11 additions & 0 deletions src/Io/StreamingParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ public function parsePlainStream(PromiseInterface $promise)
}));
}

/**
* Returns a promise which resolves with the buffered stream contents of the given stream
*
* @param ReadableStreamInterface $stream
* @return PromiseInterface Promise<string, Exception>
*/
public function bufferedStream(ReadableStreamInterface $stream)
{
return Stream\buffer($stream);
}

/**
* Returns a promise which resolves with an array of all "progress" events
*
Expand Down
18 changes: 17 additions & 1 deletion tests/ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use RingCentral\Psr7\Response;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\RequestInterface;
use React\Promise;

class ClientTest extends TestCase
{
Expand Down Expand Up @@ -388,11 +389,26 @@ public function testExecStart()
{
$data = 'hello world';
$config = array();
$this->expectRequestFlow('post', '/exec/123/start', $this->createResponse($data), 'expectPlain');
$stream = $this->getMock('React\Stream\ReadableStreamInterface');

$this->expectRequest('POST', '/exec/123/start', $this->createResponse($data));
$this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream));
$this->streamingParser->expects($this->once())->method('bufferedStream')->with($this->equalTo($stream))->willReturn(Promise\resolve($data));

$this->expectPromiseResolveWith($data, $this->client->execStart(123, $config));
}

public function testExecStartStream()
{
$config = array();
$stream = $this->getMock('React\Stream\ReadableStreamInterface');

$this->expectRequest('POST', '/exec/123/start', $this->createResponse());
$this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream));

$this->assertSame($stream, $this->client->execStartStream(123, $config));
}

public function testExecResize()
{
$this->expectRequestFlow('POST', '/exec/123/resize?w=800&h=600', $this->createResponse(), 'expectEmpty');
Expand Down
51 changes: 51 additions & 0 deletions tests/FunctionalClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use React\EventLoop\Factory as LoopFactory;
use Clue\React\Docker\Factory;
use Clue\React\Block;
use Clue\React\Promise\Stream;

class FunctionalClientTest extends TestCase
{
Expand Down Expand Up @@ -157,6 +158,56 @@ public function testExecInspectAfterRunning($exec)
$this->assertEquals(0, $info['ExitCode']);
}

/**
* @depends testStartRunning
* @param string $container
*/
public function testExecStreamEmptyOutputWhileRunning($container)
{
$promise = $this->client->execCreate($container, array(
'Cmd' => array('true'),
'AttachStdout' => true,
'AttachStderr' => true,
'Tty' => true
));
$exec = Block\await($promise, $this->loop);

$this->assertTrue(is_array($exec));
$this->assertTrue(is_string($exec['Id']));

$stream = $this->client->execStartStream($exec['Id'], array('Tty' => true));
$stream->on('end', $this->expectCallableOnce());

$output = Block\await(Stream\buffer($stream), $this->loop);

$this->assertEquals('', $output);
}

/**
* @depends testStartRunning
* @param string $container
*/
public function testExecStreamEmptyOutputBecauseOfDetachWhileRunning($container)
{
$promise = $this->client->execCreate($container, array(
'Cmd' => array('sleep', '10'),
'AttachStdout' => true,
'AttachStderr' => true,
'Tty' => true
));
$exec = Block\await($promise, $this->loop);

$this->assertTrue(is_array($exec));
$this->assertTrue(is_string($exec['Id']));

$stream = $this->client->execStartStream($exec['Id'], array('Tty' => true, 'Detach' => true));
$stream->on('end', $this->expectCallableOnce());

$output = Block\await(Stream\buffer($stream), $this->loop);

$this->assertEquals('', $output);
}

/**
* @depends testStartRunning
* @param string $container
Expand Down