Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous server with finer-grained I/O control #22

Merged
merged 14 commits into from
Oct 29, 2022
Merged
276 changes: 155 additions & 121 deletions src/main/php/peer/server/AsyncServer.class.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?php namespace peer\server;

use lang\{IllegalStateException, Throwable};
use Throwable;
use lang\IllegalStateException;
use peer\server\protocol\SocketAcceptHandler;
use peer\{ServerSocket, SocketException, SocketTimeoutException};

Expand All @@ -19,7 +20,17 @@
* @test xp://peer.unittest.server.AsyncServerTest
*/
class AsyncServer extends Server {
private $select= [], $handle= [], $tasks= [];
private $select= [], $tasks= [], $continuation= [];

static function __static() {

// For PHP < 7.3.0
if (!function_exists('array_key_last')) {
function array_key_last(&$array) {
return key(array_slice($array, -1, 1, true));
}
}
}

/**
* Adds server socket to listen on, associating protocol handler with it
Expand All @@ -36,22 +47,24 @@ public function listen($socket, ServerProtocol $protocol) {
$protocol->server= $this;
$protocol->initialize();

$this->select($socket, function($socket) use($protocol) {
$connection= $socket->accept();
if ($protocol instanceof SocketAcceptHandler && !$protocol->handleAccept($connection)) {
$connection->close();
return;
}
$i= $this->select ? array_key_last($this->select) + 1 : 1;
$this->select[$i]= $socket;
$this->continuation[$i]= new Continuation(function($socket) use($protocol) {
do {
$connection= $socket->accept();
if ($protocol instanceof SocketAcceptHandler && !$protocol->handleAccept($connection)) {
$connection->close();
return;
}

$this->tcpnodelay && $connection->useNoDelay();
$protocol->handleConnect($connection);
$this->select($connection, [
[$protocol, 'handleData'],
[$protocol, 'handleDisconnect'],
[$protocol, 'handleError']
]);
$this->tcpnodelay && $connection->useNoDelay();
$protocol->handleConnect($connection);
yield 'accept' => $this->select($connection, $protocol);
} while (!$this->terminate);
});

// Never time out sockets we listen on
$this->continuation[$i]->next= null;
return $this;
}

Expand All @@ -68,42 +81,64 @@ public function shutdown() {
* Adds socket to select, associating a function to call for data
*
* @param peer.Socket|peer.BSDSocket $socket
* @param callable[]|function(peer.Socket|peer.BSDSocket): void $function
* @param peer.Protocol|function(peer.Socket|peer.BSDSocket): void $handler
* @return peer.Socket|peer.BSDSocket
*/
public function select($socket, $function) {
if ($this->select) {
end($this->select);
$i= key($this->select) + 1;
public function select($socket, $handler) {
$i= $this->select ? array_key_last($this->select) + 1 : 1;
$this->select[$i]= $socket;
if ($handler instanceof ServerProtocol) {
$this->continuation[$i]= new Continuation(function($socket) use($handler) {
try {

// Check for readability, then handle incoming data
while ($socket->isConnected() && !$socket->eof()) {
yield 'read' => $socket;
yield from $handler->handleData($socket) ?? [];
}

// Handle disconnnect gracefully, ensure socket is closed
$handler->handleDisconnect($socket);
$socket->close();
} catch (Throwable $t) {

// Handle any errors, then close socket
$handler->handleError($socket, $t);
$socket->close();
}
});
} else {
$i= 0;
$this->continuation[$i]= new Continuation($handler);
}

$this->select[$i]= $socket;
$this->handle[$i]= is_array($function) ? $function : [$function];
return $socket;
}

/**
* Schedule a given task to execute every given seconds. The task
* function can return an integer to indicate in how many seconds
* its next invocation should occur, overwriting the default value
* given here. If this integer is negative, the task stops running.
* Returns the added task's ID.
* function can return how many seconds its next invocation should
* occur, overwriting the default value given here. If this number
* is negative, the task stops running. Returns the added task's ID.
*
* Note: If the task function raises any exception the task stops
* running. To continue executing, exceptions must be caught and
* handled within the function!
*
* @param int $seconds
* @param function(): ?int
* @param int|float $seconds
* @param function(): ?int|float
* @return int
*/
public function schedule($seconds, $function) {
if ($this->tasks) {
end($this->tasks);
$i= key($this->tasks) - 1;
} else {
$i= -1;
}

$this->tasks[$i]= [$seconds, $function];
$i= $this->tasks ? array_key_last($this->tasks) - 1 : -1;
$this->tasks[$i]= $function;
$this->continuation[$i]= new Continuation(function($function) use($seconds) {
try {
while (($seconds= $function() ?? $seconds) >= 0) {
yield 'delay' => $seconds * 1000;
}
} catch (Throwable $t) {
// Not displayed, simply stops execution
}
});
return $i;
}

Expand All @@ -114,105 +149,104 @@ public function schedule($seconds, $function) {
* @throws lang.IllegalStateException
*/
public function service() {
if (empty($this->select)) {
throw new IllegalStateException('No sockets to select on');
if (empty($this->select) && empty($this->tasks)) {
throw new IllegalStateException('No sockets or tasks to execute');
}

// Set up scheduled tasks
$time= time();
$next= $continuation= [];
foreach ($this->tasks as $i => $task) {
$next[$i]= $time + $task[0];
}

$null= null;
$sockets= $this->select[0]->kind();
$readable= $writeable= $waitable= $write= [];
$sockets= $errors= null;
do {

// Build array of sockets that we want to check for data. If one of them
// has disconnected in the meantime, notify the listeners (socket will be
// already invalid at that time) and remove it from the clients list.
$read= [];
foreach ($this->select as $i => $socket) {
if (!$socket->isConnected() || $socket->eof()) {
if ($f= $this->handle[$i][1] ?? null) $f($socket);
unset($this->select[$i], $this->handle[$i], $next[$i], $continuation[$i]);
$time= microtime(true);
$wait= [];
foreach ($this->continuation as $i => $continuation) {
if (null !== $continuation->next && $continuation->next >= $time) {
$wait[]= $continuation->next - $time;
continue;
}

// Do not re-enter handler as long as we have a continuation
if (isset($continuation[$i])) continue;
} else if (isset($this->tasks[$i])) {
$execute= $continuation->continue($this->tasks[$i]);
unset($waitable[$i]);
} else if (isset($readable[$i]) || isset($writeable[$i]) || isset($waitable[$i])) {
$execute= $continuation->continue($this->select[$i]);
if (null !== $continuation->next) $continuation->next= $time;
unset($readable[$i], $writeable[$i], $waitable[$i]);
} else {
isset($write[$i]) ? $writeable[$i]= $this->select[$i] : $readable[$i]= $this->select[$i];
if (null === $continuation->next) continue;

// Handle timeouts manually instead of leaving this up to the sockets
// themselves - the latter has proven not to be 100% reliable.
if (isset($next[$i]) && $next[$i] <= $time) {
if ($f= $this->handle[$i][2] ?? null) {
$f($socket, new SocketTimeoutException('Timed out', $socket->getTimeout()));
$socket->close();
unset($this->select[$i], $this->handle[$i], $next[$i], $continuation[$i]);
// Check if the socket has timed out...
$idle= $time - $continuation->next;
$timeout= $this->select[$i]->getTimeout();
if ($idle < $timeout) {
$wait[]= $timeout - $idle;
continue;
}
$next[$i]= $time + $socket->getTimeout();

// ...and if so, throw an exception, allowing the continuation to handle it.
$execute= $continuation->throw($this->select[$i], new SocketTimeoutException('Timed out', $timeout));
$continuation->next= $time;
unset($readable[$i], $writeable[$i]);
}

$read[$i]= $socket;
}
// echo '* SELECT ([', implode(', ', array_keys($next)), '] -> ', $next ? max(0, min($next) - $time) : null, ")\n";
$sockets->select($read, $null, $null, $next ? max(0, min($next) - $time) : null);

// Run scheduled tasks, recording their next run immediately thereafter
$time= time();
foreach ($this->tasks as $i => $task) {
if ($next[$i] <= $time) {
$n= $task[1]();
if ($n < 0) {
unset($this->tasks[$i], $next[$i]);
} else {
$next[$i]= $time + ($n ?? $task[0]);
}
// Check whether execution has finished
if (null === $execute) {
unset($this->tasks[$i], $this->select[$i], $this->continuation[$i], $write[$i]);
continue;
}
}

// There is data on the server socket (meaning a client connection is
// waiting to be socket), or on any of the other sockets, so we'll call
// into their respective data handler. If a generator is returned,
// schedule its continuation for the next possible point.
foreach ($read as $i => $socket) {
try {
$r= $this->handle[$i][0]($socket);
if ($r instanceof \Generator && $r->valid()) {
$continuation[$i]= $r;
$task= $this->schedule(0, function() use(&$continuation, $i) {
try {
if (($c= $continuation[$i] ?? null) && $c->valid()) {
$c->next();
return;
}
} catch (SocketException $t) {
if ($f= $this->handle[$i][2] ?? null) $f($this->select[$i], $t);
$this->select[$i]->close();
}

unset($continuation[$i]);
return -1;
});
$next[$task]= $time;
}
// `yield 'accept' => $socket`: Check for being able to read from socket
// `yield 'read' => $_`: Continue as soon as the socket becomes readable
// `yield 'write' => $_`: Continue as soon as the socket becomes writeable
// `yield 'delay' => $millis`: Wait a specified number of milliseconds
// `yield`: Continue at the next possible execution slot (`delay => 0`)
switch ($execute->key()) {
case 'accept':
$socket= $execute->current();
$readable[array_key_last($this->select)]= $socket;
$readable[$i]= $this->select[$i];
$wait[]= $socket->getTimeout();
break;

$next[$i]= $time + $socket->getTimeout();
} catch (Throwable $t) {
if ($f= $this->handle[$i][2] ?? null) $f($socket, $t);
case 'write':
$write[$i]= true;
$writeable[$i]= $this->select[$i];
$wait[]= $this->select[$i]->getTimeout();
break;

case 'read':
unset($write[$i]);
$readable[$i]= $this->select[$i];
$wait[]= $this->select[$i]->getTimeout();
break;

case 'delay': default:
$delay= $execute->current() / 1000;
$continuation->next= $time + $delay;
$waitable[$i]= true;
$wait[]= $delay;
break;
}
}

$time= time();
} while (!$this->terminate);
// When asked to terminate, close sockets in reverse order
if ($this->terminate) {
for ($i= array_key_last($this->select); $i > 0; $i--) {
isset($this->select[$i]) && $this->select[$i]->close();
}
break;
}

// Close all accepted sockets first, then the listening sockets
for ($i= sizeof($this->select) - 1; $i >= 0; $i--) {
$this->select[$i]->close();
if ($f= $this->handle[$i][1] ?? null) $f($this->select[$i]);
}
if ($this->select) {
// echo date('H:i:s'), " SELECT ", \util\Objects::stringOf($wait), " @ {\n",
// " R: ", \util\Objects::stringOf($readable), "\n",
// " W: ", \util\Objects::stringOf($writeable), "\n",
// "}\n";
$sockets ?? $sockets= current($this->select)->kind();
$sockets->select($readable, $writeable, $errors, $wait ? min($wait) : null);
} else {
// echo date('H:i:s'), " SLEEP ", \util\Objects::stringOf($wait), "\n";
$wait && usleep(1000000 * (int)min($wait));
}
} while ($this->select || $this->tasks);
}

/**
Expand Down
50 changes: 50 additions & 0 deletions src/main/php/peer/server/Continuation.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php namespace peer\server;

/**
* Continuable coroutine. Not a public API!
*
* @see peer.server.AsyncServer
*/
class Continuation {
private $function;
private $continuation= null;
public $next;

/** @param function(var): Generator $function */
public function __construct($function) {
$this->function= $function;
$this->next= microtime(true) - 1;
}

/**
* Continue executing
*
* @param var $arg
* @return ?Generator
*/
public function continue($arg) {
if (null === $this->continuation) {
$this->continuation= ($this->function)($arg);
} else {
$this->continuation->next();
}

return $this->continuation->valid() ? $this->continuation : $this->continuation= null;
}

/**
* Throw an exception into the execution flow
*
* @param var $arg
* @param Throwable $t
* @return ?Generator
*/
public function throw($arg, $t) {
if (null === $this->continuation) {
$this->continuation= ($this->function)($arg);
}
$this->continuation->throw($t);

return $this->continuation->valid() ? $this->continuation : $this->continuation= null;
}
}
Loading