diff --git a/src/main/php/peer/server/AsyncServer.class.php b/src/main/php/peer/server/AsyncServer.class.php index c729b43..13a55f5 100755 --- a/src/main/php/peer/server/AsyncServer.class.php +++ b/src/main/php/peer/server/AsyncServer.class.php @@ -1,6 +1,7 @@ server= $this; $protocol->initialize(); - $this->select($socket, function($socket) use($protocol) { - $connection= $socket->accept(); - if ($protocol instanceof SocketAcceptHandler && !$protocol->handleAccept($connection)) { - $connection->close(); - return; - } + $i= $this->select ? array_key_last($this->select) + 1 : 1; + $this->select[$i]= $socket; + $this->continuation[$i]= new Continuation(function($socket) use($protocol) { + do { + $connection= $socket->accept(); + if ($protocol instanceof SocketAcceptHandler && !$protocol->handleAccept($connection)) { + $connection->close(); + return; + } - $this->tcpnodelay && $connection->useNoDelay(); - $protocol->handleConnect($connection); - $this->select($connection, [ - [$protocol, 'handleData'], - [$protocol, 'handleDisconnect'], - [$protocol, 'handleError'] - ]); + $this->tcpnodelay && $connection->useNoDelay(); + $protocol->handleConnect($connection); + yield 'accept' => $this->select($connection, $protocol); + } while (!$this->terminate); }); + // Never time out sockets we listen on + $this->continuation[$i]->next= null; return $this; } @@ -68,42 +81,64 @@ public function shutdown() { * Adds socket to select, associating a function to call for data * * @param peer.Socket|peer.BSDSocket $socket - * @param callable[]|function(peer.Socket|peer.BSDSocket): void $function + * @param peer.Protocol|function(peer.Socket|peer.BSDSocket): void $handler * @return peer.Socket|peer.BSDSocket */ - public function select($socket, $function) { - if ($this->select) { - end($this->select); - $i= key($this->select) + 1; + public function select($socket, $handler) { + $i= $this->select ? array_key_last($this->select) + 1 : 1; + $this->select[$i]= $socket; + if ($handler instanceof ServerProtocol) { + $this->continuation[$i]= new Continuation(function($socket) use($handler) { + try { + + // Check for readability, then handle incoming data + while ($socket->isConnected() && !$socket->eof()) { + yield 'read' => $socket; + yield from $handler->handleData($socket) ?? []; + } + + // Handle disconnnect gracefully, ensure socket is closed + $handler->handleDisconnect($socket); + $socket->close(); + } catch (Throwable $t) { + + // Handle any errors, then close socket + $handler->handleError($socket, $t); + $socket->close(); + } + }); } else { - $i= 0; + $this->continuation[$i]= new Continuation($handler); } - - $this->select[$i]= $socket; - $this->handle[$i]= is_array($function) ? $function : [$function]; return $socket; } /** * Schedule a given task to execute every given seconds. The task - * function can return an integer to indicate in how many seconds - * its next invocation should occur, overwriting the default value - * given here. If this integer is negative, the task stops running. - * Returns the added task's ID. + * function can return how many seconds its next invocation should + * occur, overwriting the default value given here. If this number + * is negative, the task stops running. Returns the added task's ID. + * + * Note: If the task function raises any exception the task stops + * running. To continue executing, exceptions must be caught and + * handled within the function! * - * @param int $seconds - * @param function(): ?int + * @param int|float $seconds + * @param function(): ?int|float * @return int */ public function schedule($seconds, $function) { - if ($this->tasks) { - end($this->tasks); - $i= key($this->tasks) - 1; - } else { - $i= -1; - } - - $this->tasks[$i]= [$seconds, $function]; + $i= $this->tasks ? array_key_last($this->tasks) - 1 : -1; + $this->tasks[$i]= $function; + $this->continuation[$i]= new Continuation(function($function) use($seconds) { + try { + while (($seconds= $function() ?? $seconds) >= 0) { + yield 'delay' => $seconds * 1000; + } + } catch (Throwable $t) { + // Not displayed, simply stops execution + } + }); return $i; } @@ -114,105 +149,104 @@ public function schedule($seconds, $function) { * @throws lang.IllegalStateException */ public function service() { - if (empty($this->select)) { - throw new IllegalStateException('No sockets to select on'); + if (empty($this->select) && empty($this->tasks)) { + throw new IllegalStateException('No sockets or tasks to execute'); } - // Set up scheduled tasks - $time= time(); - $next= $continuation= []; - foreach ($this->tasks as $i => $task) { - $next[$i]= $time + $task[0]; - } - - $null= null; - $sockets= $this->select[0]->kind(); + $readable= $writeable= $waitable= $write= []; + $sockets= $errors= null; do { - - // Build array of sockets that we want to check for data. If one of them - // has disconnected in the meantime, notify the listeners (socket will be - // already invalid at that time) and remove it from the clients list. - $read= []; - foreach ($this->select as $i => $socket) { - if (!$socket->isConnected() || $socket->eof()) { - if ($f= $this->handle[$i][1] ?? null) $f($socket); - unset($this->select[$i], $this->handle[$i], $next[$i], $continuation[$i]); + $time= microtime(true); + $wait= []; + foreach ($this->continuation as $i => $continuation) { + if (null !== $continuation->next && $continuation->next >= $time) { + $wait[]= $continuation->next - $time; continue; - } - - // Do not re-enter handler as long as we have a continuation - if (isset($continuation[$i])) continue; + } else if (isset($this->tasks[$i])) { + $execute= $continuation->continue($this->tasks[$i]); + unset($waitable[$i]); + } else if (isset($readable[$i]) || isset($writeable[$i]) || isset($waitable[$i])) { + $execute= $continuation->continue($this->select[$i]); + if (null !== $continuation->next) $continuation->next= $time; + unset($readable[$i], $writeable[$i], $waitable[$i]); + } else { + isset($write[$i]) ? $writeable[$i]= $this->select[$i] : $readable[$i]= $this->select[$i]; + if (null === $continuation->next) continue; - // Handle timeouts manually instead of leaving this up to the sockets - // themselves - the latter has proven not to be 100% reliable. - if (isset($next[$i]) && $next[$i] <= $time) { - if ($f= $this->handle[$i][2] ?? null) { - $f($socket, new SocketTimeoutException('Timed out', $socket->getTimeout())); - $socket->close(); - unset($this->select[$i], $this->handle[$i], $next[$i], $continuation[$i]); + // Check if the socket has timed out... + $idle= $time - $continuation->next; + $timeout= $this->select[$i]->getTimeout(); + if ($idle < $timeout) { + $wait[]= $timeout - $idle; continue; } - $next[$i]= $time + $socket->getTimeout(); + + // ...and if so, throw an exception, allowing the continuation to handle it. + $execute= $continuation->throw($this->select[$i], new SocketTimeoutException('Timed out', $timeout)); + $continuation->next= $time; + unset($readable[$i], $writeable[$i]); } - $read[$i]= $socket; - } - // echo '* SELECT ([', implode(', ', array_keys($next)), '] -> ', $next ? max(0, min($next) - $time) : null, ")\n"; - $sockets->select($read, $null, $null, $next ? max(0, min($next) - $time) : null); - - // Run scheduled tasks, recording their next run immediately thereafter - $time= time(); - foreach ($this->tasks as $i => $task) { - if ($next[$i] <= $time) { - $n= $task[1](); - if ($n < 0) { - unset($this->tasks[$i], $next[$i]); - } else { - $next[$i]= $time + ($n ?? $task[0]); - } + // Check whether execution has finished + if (null === $execute) { + unset($this->tasks[$i], $this->select[$i], $this->continuation[$i], $write[$i]); + continue; } - } - // There is data on the server socket (meaning a client connection is - // waiting to be socket), or on any of the other sockets, so we'll call - // into their respective data handler. If a generator is returned, - // schedule its continuation for the next possible point. - foreach ($read as $i => $socket) { - try { - $r= $this->handle[$i][0]($socket); - if ($r instanceof \Generator && $r->valid()) { - $continuation[$i]= $r; - $task= $this->schedule(0, function() use(&$continuation, $i) { - try { - if (($c= $continuation[$i] ?? null) && $c->valid()) { - $c->next(); - return; - } - } catch (SocketException $t) { - if ($f= $this->handle[$i][2] ?? null) $f($this->select[$i], $t); - $this->select[$i]->close(); - } - - unset($continuation[$i]); - return -1; - }); - $next[$task]= $time; - } + // `yield 'accept' => $socket`: Check for being able to read from socket + // `yield 'read' => $_`: Continue as soon as the socket becomes readable + // `yield 'write' => $_`: Continue as soon as the socket becomes writeable + // `yield 'delay' => $millis`: Wait a specified number of milliseconds + // `yield`: Continue at the next possible execution slot (`delay => 0`) + switch ($execute->key()) { + case 'accept': + $socket= $execute->current(); + $readable[array_key_last($this->select)]= $socket; + $readable[$i]= $this->select[$i]; + $wait[]= $socket->getTimeout(); + break; - $next[$i]= $time + $socket->getTimeout(); - } catch (Throwable $t) { - if ($f= $this->handle[$i][2] ?? null) $f($socket, $t); + case 'write': + $write[$i]= true; + $writeable[$i]= $this->select[$i]; + $wait[]= $this->select[$i]->getTimeout(); + break; + + case 'read': + unset($write[$i]); + $readable[$i]= $this->select[$i]; + $wait[]= $this->select[$i]->getTimeout(); + break; + + case 'delay': default: + $delay= $execute->current() / 1000; + $continuation->next= $time + $delay; + $waitable[$i]= true; + $wait[]= $delay; + break; } } - $time= time(); - } while (!$this->terminate); + // When asked to terminate, close sockets in reverse order + if ($this->terminate) { + for ($i= array_key_last($this->select); $i > 0; $i--) { + isset($this->select[$i]) && $this->select[$i]->close(); + } + break; + } - // Close all accepted sockets first, then the listening sockets - for ($i= sizeof($this->select) - 1; $i >= 0; $i--) { - $this->select[$i]->close(); - if ($f= $this->handle[$i][1] ?? null) $f($this->select[$i]); - } + if ($this->select) { + // echo date('H:i:s'), " SELECT ", \util\Objects::stringOf($wait), " @ {\n", + // " R: ", \util\Objects::stringOf($readable), "\n", + // " W: ", \util\Objects::stringOf($writeable), "\n", + // "}\n"; + $sockets ?? $sockets= current($this->select)->kind(); + $sockets->select($readable, $writeable, $errors, $wait ? min($wait) : null); + } else { + // echo date('H:i:s'), " SLEEP ", \util\Objects::stringOf($wait), "\n"; + $wait && usleep(1000000 * (int)min($wait)); + } + } while ($this->select || $this->tasks); } /** diff --git a/src/main/php/peer/server/Continuation.class.php b/src/main/php/peer/server/Continuation.class.php new file mode 100755 index 0000000..0dd26b1 --- /dev/null +++ b/src/main/php/peer/server/Continuation.class.php @@ -0,0 +1,50 @@ +function= $function; + $this->next= microtime(true) - 1; + } + + /** + * Continue executing + * + * @param var $arg + * @return ?Generator + */ + public function continue($arg) { + if (null === $this->continuation) { + $this->continuation= ($this->function)($arg); + } else { + $this->continuation->next(); + } + + return $this->continuation->valid() ? $this->continuation : $this->continuation= null; + } + + /** + * Throw an exception into the execution flow + * + * @param var $arg + * @param Throwable $t + * @return ?Generator + */ + public function throw($arg, $t) { + if (null === $this->continuation) { + $this->continuation= ($this->function)($arg); + } + $this->continuation->throw($t); + + return $this->continuation->valid() ? $this->continuation : $this->continuation= null; + } +} diff --git a/src/test/php/peer/unittest/server/AsyncServerTest.class.php b/src/test/php/peer/unittest/server/AsyncServerTest.class.php index 192b6d9..0eb9f6e 100755 --- a/src/test/php/peer/unittest/server/AsyncServerTest.class.php +++ b/src/test/php/peer/unittest/server/AsyncServerTest.class.php @@ -1,5 +1,6 @@ schedule(1, function() use($s, &$invoked) { + $invoked++; + $s->shutdown(); + }); + + $before= $invoked; + $s->service(); + + $this->assertEquals(0, $before, 'before service()'); + $this->assertEquals(1, $invoked, 'after service()'); + } + + #[Test, Values([1, 2, 3])] + public function scheduled_function_invoked_after_delay($executions) { + $delay= 0.05; // 50 ms + + $invoked= 0; + $s= new AsyncServer(); + $s->schedule($delay, function() use($s, $executions, &$invoked) { + $invoked++; + if ($invoked >= $executions) $s->shutdown(); + }); + + $start= microtime(true); + $s->service(); + $time= microtime(true) - $start; + $expected= $delay * ($executions - 1); + + $this->assertEquals($executions, $invoked); + $this->assertTrue($time >= $expected, $time.' >= '.$expected); + } + #[Test] public function connected() { $this->connect(); @@ -32,7 +69,7 @@ public function read_synchronously_written_data() { $this->connect(); $this->conn->write("SYNC 3\n"); $read= []; - while ('.' !== ($line= $this->conn->readLine())) { + while ('.' !== ($line= $this->conn->readLine()) && !$this->conn->eof()) { $read[]= $line; } $this->conn->close(); @@ -45,7 +82,7 @@ public function read_asynchronously_written_data() { $this->connect(); $this->conn->write("ASNC 3\n"); $read= []; - while ('.' !== ($line= $this->conn->readLine())) { + while ('.' !== ($line= $this->conn->readLine()) && !$this->conn->eof()) { $read[]= $line; } $this->conn->close(); diff --git a/src/test/php/peer/unittest/server/TestingProtocol.class.php b/src/test/php/peer/unittest/server/TestingProtocol.class.php index 5b97623..a45f477 100755 --- a/src/test/php/peer/unittest/server/TestingProtocol.class.php +++ b/src/test/php/peer/unittest/server/TestingProtocol.class.php @@ -98,7 +98,8 @@ public function handleData($socket) { case 'ASNC': { for ($i= 0, $s= (int)substr($cmd, 5); $i < $s; $i++) { - yield $socket->write(($i + 1)."\n"); + yield 'write' => $socket; + $socket->write(($i + 1)."\n"); } $socket->write(".\n"); break;