Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- `MySQL`
- `Postgres`
* Now possible to define custom input/output value objects for the query builder.
* Added a `EventStream` response sender to simplify sending [sever-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events).

--------------------------------------------------------

Expand Down
129 changes: 129 additions & 0 deletions src/mako/http/response/senders/EventStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
<?php

/**
* @copyright Frederic G. Østby
* @license http://www.makoframework.com/license
*/

namespace mako\http\response\senders;

use Closure;
use Generator;
use JsonSerializable;
use mako\http\Request;
use mako\http\Response;
use mako\http\response\senders\stream\event\Event;
use Override;
use Stringable;

use function connection_aborted;
use function flush;
use function is_object;
use function json_encode;
use function ob_end_clean;
use function ob_get_level;

/**
* Event stream response.
*
* @see https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
*/
class EventStream implements ResponseSenderInterface
{
/**
* Constructor.
*/
public function __construct(
protected Closure $stream
) {
}

/**
* Erases and disables output buffers.
*/
protected function eraseAndDisableOutputBuffers(): void
{
while (ob_get_level() > 0) {
ob_end_clean();
}
}

/**
* Stringifies the value.
*/
protected function stringifyValue(null|float|int|JsonSerializable|string|Stringable $value): string
{
if (is_object($value)) {
if ($value instanceof JsonSerializable) {
return json_encode($value, JSON_THROW_ON_ERROR);
}
}

return (string) $value;
}

/**
* Prepares the event for sending.
*/
protected function prepareEvent(Event $event): string
{
$output = '';

foreach ($event->fields as $field) {
$output .= "{$field->type->value}: {$this->stringifyValue($field->value)}\n";
}

$output .= "\n";

return $output;
}

/**
* Sends the event to the client.
*/
protected function sendEvent(string $event): void
{
echo $event;

flush();
}

/**
* Sends the stream to the client.
*/
protected function sendStream(): void
{
foreach ((fn (): Generator => ($this->stream)())() as $event) {
if (connection_aborted()) {
break;
}

$this->sendEvent($this->prepareEvent($event));
}
}

/**
* {@inheritDoc}
*/
#[Override]
public function send(Request $request, Response $response): void
{
$response->setType('text/event-stream', 'UTF-8');

$response->headers->add('Connection', 'keep-alive');
$response->headers->add('Cache-Control', 'no-cache');
$response->headers->add('X-Accel-Buffering', 'no');

// Erase output buffers and disable output buffering

$this->eraseAndDisableOutputBuffers();

// Send headers

$response->sendHeaders();

// Send the stream

$this->sendStream();
}
}
29 changes: 29 additions & 0 deletions src/mako/http/response/senders/stream/event/Event.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

/**
* @copyright Frederic G. Østby
* @license http://www.makoframework.com/license
*/

namespace mako\http\response\senders\stream\event;

/**
* Event stream event.
*/
class Event
{
/**
* Event fields.
*
* @var Field[]
*/
public protected(set) array $fields;

/**
* Constructor.
*/
public function __construct(Field ...$fields)
{
$this->fields = $fields;
}
}
23 changes: 23 additions & 0 deletions src/mako/http/response/senders/stream/event/Field.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

/**
* @copyright Frederic G. Østby
* @license http://www.makoframework.com/license
*/

namespace mako\http\response\senders\stream\event;

use JsonSerializable;
use Stringable;

/**
* Event stream field.
*/
class Field
{
public function __construct(
public protected(set) Type $type,
public protected(set) null|float|int|JsonSerializable|string|Stringable $value
) {
}
}
20 changes: 20 additions & 0 deletions src/mako/http/response/senders/stream/event/Type.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

/**
* @copyright Frederic G. Østby
* @license http://www.makoframework.com/license
*/

namespace mako\http\response\senders\stream\event;

/**
* Event stream field types.
*/
enum Type: string
{
case COMMENT = '';
case DATA = 'data';
case EVENT = 'event';
case ID = 'id';
case RETRY = 'retry';
}
174 changes: 174 additions & 0 deletions tests/unit/http/response/senders/EventStreamTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
<?php

/**
* @copyright Frederic G. Østby
* @license http://www.makoframework.com/license
*/

namespace mako\tests\unit\http\response\senders;

use JsonSerializable;
use mako\http\Request;
use mako\http\Response;
use mako\http\response\Headers;
use mako\http\response\senders\EventStream;
use mako\http\response\senders\stream\event\Event;
use mako\http\response\senders\stream\event\Field;
use mako\http\response\senders\stream\event\Type;
use mako\tests\TestCase;
use Mockery;
use Mockery\MockInterface;
use PHPUnit\Framework\Attributes\Group;
use Stringable;

#[Group('unit')]
class EventStreamTest extends TestCase
{
/**
*
*/
protected function getRequest(): MockInterface&Request
{
return Mockery::mock(Request::class);
}

/**
*
*/
protected function getResponse(): MockInterface&Response
{
$headers = Mockery::mock(Headers::class);

$headers->shouldReceive('add')->once()->with('Connection', 'keep-alive');
$headers->shouldReceive('add')->once()->with('Cache-Control', 'no-cache');
$headers->shouldReceive('add')->once()->with('X-Accel-Buffering', 'no');

$response = Mockery::mock(Response::class);

$response->shouldReceive('setType')->once()->with('text/event-stream', 'UTF-8');
$response->shouldReceive('sendHeaders')->once();

(function () use ($headers): void {
$this->headers = $headers;
})->bindTo($response, Response::class)();

return $response;
}

/**
*
*/
public function testBasicEventStream(): void
{
$eventStream = Mockery::mock(EventStream::class, [function () {
yield new Event(
new Field(Type::DATA, 'hello, world!')
);
}]);

$eventStream->makePartial()->shouldAllowMockingProtectedMethods();

$eventStream->shouldReceive('eraseAndDisableOutputBuffers')->once();

$eventStream->shouldReceive('sendEvent')->once()->with("data: hello, world!\n\n");

$eventStream->send($this->getRequest(), $this->getResponse());
}

/**
*
*/
public function testEventStreamWithMultipleFields(): void
{
$eventStream = Mockery::mock(EventStream::class, [function () {
yield new Event(
new Field(Type::EVENT, 'greeting'),
new Field(Type::DATA, 'hello, world!')
);
}]);

$eventStream->makePartial()->shouldAllowMockingProtectedMethods();

$eventStream->shouldReceive('eraseAndDisableOutputBuffers')->once();

$eventStream->shouldReceive('sendEvent')->once()->with("event: greeting\ndata: hello, world!\n\n");

$eventStream->send($this->getRequest(), $this->getResponse());
}

/**
*
*/
public function testEventStreamWithMultipleEvents(): void
{
$eventStream = Mockery::mock(EventStream::class, [function () {
yield new Event(
new Field(Type::EVENT, 'greeting'),
new Field(Type::DATA, 'first hello')
);
yield new Event(
new Field(Type::EVENT, 'greeting'),
new Field(Type::DATA, 'second hello')
);
}]);

$eventStream->makePartial()->shouldAllowMockingProtectedMethods();

$eventStream->shouldReceive('eraseAndDisableOutputBuffers')->once();

$eventStream->shouldReceive('sendEvent')->once()->with("event: greeting\ndata: first hello\n\n");
$eventStream->shouldReceive('sendEvent')->once()->with("event: greeting\ndata: second hello\n\n");

$eventStream->send($this->getRequest(), $this->getResponse());
}

/**
*
*/
public function testEventStreamWithStringable(): void
{
$eventStream = Mockery::mock(EventStream::class, [function () {
yield new Event(
new Field(Type::DATA, new class implements Stringable {
public function __toString(): string
{
return 'this is a string';
}
})
);
}]);

$eventStream->makePartial()->shouldAllowMockingProtectedMethods();

$eventStream->shouldReceive('eraseAndDisableOutputBuffers')->once();

$eventStream->shouldReceive('sendEvent')->once()->with("data: this is a string\n\n");

$eventStream->send($this->getRequest(), $this->getResponse());
}

/**
*
*/
public function testEventStreamWithJsonSerializable(): void
{
$eventStream = Mockery::mock(EventStream::class, [function () {
yield new Event(
new Field(Type::DATA, new class implements JsonSerializable {
public function jsonSerialize(): mixed
{
return [1, 2, 3];
}
})
);
}]);

$eventStream->makePartial()->shouldAllowMockingProtectedMethods();

$eventStream->shouldReceive('eraseAndDisableOutputBuffers')->once();

$eventStream->shouldReceive('sendEvent')->once()->with("data: [1,2,3]\n\n");

$eventStream->send($this->getRequest(), $this->getResponse());
}
}
Loading