From 4ad0c6ca01598b1f38437dffcdf9fc64a1ffd1af Mon Sep 17 00:00:00 2001 From: Timm Friebe Date: Sun, 21 Aug 2022 18:01:37 +0200 Subject: [PATCH 01/12] Asynchronous server with finer-grained I/O control --- .../php/peer/server/AsyncServer.class.php | 246 ++++++++++-------- .../php/peer/server/Continuation.class.php | 29 +++ 2 files changed, 160 insertions(+), 115 deletions(-) create mode 100755 src/main/php/peer/server/Continuation.class.php diff --git a/src/main/php/peer/server/AsyncServer.class.php b/src/main/php/peer/server/AsyncServer.class.php index c729b43..85c463e 100755 --- a/src/main/php/peer/server/AsyncServer.class.php +++ b/src/main/php/peer/server/AsyncServer.class.php @@ -1,6 +1,7 @@ server= $this; $protocol->initialize(); - $this->select($socket, function($socket) use($protocol) { - $connection= $socket->accept(); - if ($protocol instanceof SocketAcceptHandler && !$protocol->handleAccept($connection)) { - $connection->close(); - return; - } + $i= $this->select ? array_key_last($this->select) + 1 : 1; + $this->select[$i]= $socket; + $this->continuation[$i]= new Continuation(function($socket) use($protocol) { + do { + $connection= $socket->accept(); + if ($protocol instanceof SocketAcceptHandler && !$protocol->handleAccept($connection)) { + $connection->close(); + return; + } - $this->tcpnodelay && $connection->useNoDelay(); - $protocol->handleConnect($connection); - $this->select($connection, [ - [$protocol, 'handleData'], - [$protocol, 'handleDisconnect'], - [$protocol, 'handleError'] - ]); + $this->tcpnodelay && $connection->useNoDelay(); + yield 'accept' => $this->select($connection, $protocol); + } while (!$this->terminate); }); return $this; @@ -68,19 +78,36 @@ public function shutdown() { * Adds socket to select, associating a function to call for data * * @param peer.Socket|peer.BSDSocket $socket - * @param callable[]|function(peer.Socket|peer.BSDSocket): void $function + * @param peer.Protocol|function(peer.Socket|peer.BSDSocket): void $handler * @return peer.Socket|peer.BSDSocket */ - public function select($socket, $function) { - if ($this->select) { - end($this->select); - $i= key($this->select) + 1; + public function select($socket, $handler) { + $i= $this->select ? array_key_last($this->select) + 1 : 1; + $this->select[$i]= $socket; + if ($handler instanceof ServerProtocol) { + $this->continuation[$i]= new Continuation(function($socket) use($handler) { + try { + $handler->handleConnect($socket); + + // Check for readability, then handle incoming data + while ($socket->isConnected() && !$socket->eof()) { + yield 'read' => $socket; + yield from $handler->handleData($socket) ?? []; + } + + // Handle disconnnect gracefully, ensure socket is closed + $handler->handleDisconnect($socket); + $socket->close(); + } catch (Throwable $t) { + + // Handle any errors, then close socket + $handler->handleError($socket, $t); + $socket->close(); + } + }); } else { - $i= 0; + $this->continuation[$i]= new Continuation($handler); } - - $this->select[$i]= $socket; - $this->handle[$i]= is_array($function) ? $function : [$function]; return $socket; } @@ -90,20 +117,27 @@ public function select($socket, $function) { * its next invocation should occur, overwriting the default value * given here. If this integer is negative, the task stops running. * Returns the added task's ID. + * + * Note: If the task function raises any exception the task stops + * running. To continue executing, exceptions must be caught and + * handled within the function! * * @param int $seconds * @param function(): ?int * @return int */ public function schedule($seconds, $function) { - if ($this->tasks) { - end($this->tasks); - $i= key($this->tasks) - 1; - } else { - $i= -1; - } - - $this->tasks[$i]= [$seconds, $function]; + $i= $this->last ? array_key_last($this->last) - 1 : -1; + $this->tasks[$i]= $function; + $this->continuation[$i]= new Continuation(function($function) use($seconds) { + try { + while (($seconds= $function() ?? $seconds) >= 0) { + yield 'delay' => $seconds * 1000; + } + } catch (Throwable $t) { + // Not displayed, simply stops execution + } + }); return $i; } @@ -118,101 +152,83 @@ public function service() { throw new IllegalStateException('No sockets to select on'); } - // Set up scheduled tasks - $time= time(); - $next= $continuation= []; - foreach ($this->tasks as $i => $task) { - $next[$i]= $time + $task[0]; - } - - $null= null; - $sockets= $this->select[0]->kind(); + $sockets= current($this->select)->kind(); + $readable= $writeable= $waitable= $write= []; + $errors= null; + $time= microtime(true); do { - - // Build array of sockets that we want to check for data. If one of them - // has disconnected in the meantime, notify the listeners (socket will be - // already invalid at that time) and remove it from the clients list. - $read= []; - foreach ($this->select as $i => $socket) { - if (!$socket->isConnected() || $socket->eof()) { - if ($f= $this->handle[$i][1] ?? null) $f($socket); - unset($this->select[$i], $this->handle[$i], $next[$i], $continuation[$i]); + $wait= []; + foreach ($this->continuation as $i => $continuation) { + if ($continuation->next >= $time) { + $wait[]= $continuation->next - $time; + continue; + } else if (isset($this->tasks[$i])) { + $execute= $continuation->continue($this->tasks[$i]); + unset($waitable[$i]); + } else if (isset($readable[$i]) || isset($writeable[$i]) || isset($waitable[$i])) { + $execute= $continuation->continue($this->select[$i]); + unset($readable[$i], $writeable[$i], $waitable[$i]); + } else { + isset($write[$i]) ? $writeable[$i]= $this->select[$i] : $readable[$i]= $this->select[$i]; continue; } - // Do not re-enter handler as long as we have a continuation - if (isset($continuation[$i])) continue; - - // Handle timeouts manually instead of leaving this up to the sockets - // themselves - the latter has proven not to be 100% reliable. - if (isset($next[$i]) && $next[$i] <= $time) { - if ($f= $this->handle[$i][2] ?? null) { - $f($socket, new SocketTimeoutException('Timed out', $socket->getTimeout())); - $socket->close(); - unset($this->select[$i], $this->handle[$i], $next[$i], $continuation[$i]); - continue; - } - $next[$i]= $time + $socket->getTimeout(); + // Check whether execution has finished + if (null === $execute) { + unset($this->tasks[$i], $this->select[$i], $this->continuation[$i], $write[$i]); + continue; } - $read[$i]= $socket; - } - // echo '* SELECT ([', implode(', ', array_keys($next)), '] -> ', $next ? max(0, min($next) - $time) : null, ")\n"; - $sockets->select($read, $null, $null, $next ? max(0, min($next) - $time) : null); - - // Run scheduled tasks, recording their next run immediately thereafter - $time= time(); - foreach ($this->tasks as $i => $task) { - if ($next[$i] <= $time) { - $n= $task[1](); - if ($n < 0) { - unset($this->tasks[$i], $next[$i]); - } else { - $next[$i]= $time + ($n ?? $task[0]); - } + // `yield 'accept' => $socket`: Check for being able to read from socket + // `yield 'read' => $_`: Continue as soon as the socket becomes readable + // `yield 'write' => $_`: Continue as soon as the socket becomes writeable + // `yield 'delay' => $millis`: Wait a specified number of milliseconds + // `yield`: Continue at the next possible execution slot (`delay => 0`) + switch ($execute->key()) { + case 'accept': + $socket= $execute->current(); + $readable[array_key_last($this->select)]= $socket; + $readable[$i]= $this->select[$i]; + $wait[]= $socket->getTimeout(); + break; + + case 'write': + $write[$i]= true; + $writeable[$i]= $this->select[$i]; + $wait[]= $this->select[$i]->getTimeout(); + break; + + case 'read': + unset($write[$i]); + $readable[$i]= $this->select[$i]; + $wait[]= $this->select[$i]->getTimeout(); + break; + + case 'delay': default: + $delay= $execute->current() / 1000; + $continuation->next= $time + $delay; + $waitable[$i]= true; + $wait[]= $delay; + break; } } - // There is data on the server socket (meaning a client connection is - // waiting to be socket), or on any of the other sockets, so we'll call - // into their respective data handler. If a generator is returned, - // schedule its continuation for the next possible point. - foreach ($read as $i => $socket) { - try { - $r= $this->handle[$i][0]($socket); - if ($r instanceof \Generator && $r->valid()) { - $continuation[$i]= $r; - $task= $this->schedule(0, function() use(&$continuation, $i) { - try { - if (($c= $continuation[$i] ?? null) && $c->valid()) { - $c->next(); - return; - } - } catch (SocketException $t) { - if ($f= $this->handle[$i][2] ?? null) $f($this->select[$i], $t); - $this->select[$i]->close(); - } - - unset($continuation[$i]); - return -1; - }); - $next[$task]= $time; - } - - $next[$i]= $time + $socket->getTimeout(); - } catch (Throwable $t) { - if ($f= $this->handle[$i][2] ?? null) $f($socket, $t); + // When asked to terminate, close sockets in reverse order + if ($this->terminate) { + for ($i= sizeof($this->select) - 1; $i >= 0; $i--) { + $this->select[$i]->close(); + // FIXME $continuation->throw() or ->send(INTERRUPTED)? or nothing? } + break; } - $time= time(); - } while (!$this->terminate); - - // Close all accepted sockets first, then the listening sockets - for ($i= sizeof($this->select) - 1; $i >= 0; $i--) { - $this->select[$i]->close(); - if ($f= $this->handle[$i][1] ?? null) $f($this->select[$i]); - } + // echo date('H:i:s'), " SELECT ", \util\Objects::stringOf($wait), " @ {\n", + // " R: ", \util\Objects::stringOf($readable), "\n", + // " W: ", \util\Objects::stringOf($writeable), "\n", + // "}\n"; + $sockets->select($readable, $writeable, $errors, $wait ? min($wait) : null); + $time= microtime(true); + } while ($this->select); } /** diff --git a/src/main/php/peer/server/Continuation.class.php b/src/main/php/peer/server/Continuation.class.php new file mode 100755 index 0000000..a5b07ce --- /dev/null +++ b/src/main/php/peer/server/Continuation.class.php @@ -0,0 +1,29 @@ +function= $function; + $this->next= microtime(true) - 1; + } + + /** + * Continue executing + * + * @param var $arg + * @return ?Generator + */ + public function continue($arg) { + if (null === $this->continuation) { + $this->continuation= ($this->function)($arg); + } else { + $this->continuation->next(); + } + + return $this->continuation->valid() ? $this->continuation : $this->continuation= null; + } +} From d182d9ab294e4098f74f89eee1364cc0179c148a Mon Sep 17 00:00:00 2001 From: Timm Friebe Date: Sun, 21 Aug 2022 18:21:01 +0200 Subject: [PATCH 02/12] Prevent "undefined index" errors when terminating --- src/main/php/peer/server/AsyncServer.class.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/php/peer/server/AsyncServer.class.php b/src/main/php/peer/server/AsyncServer.class.php index 85c463e..0218663 100755 --- a/src/main/php/peer/server/AsyncServer.class.php +++ b/src/main/php/peer/server/AsyncServer.class.php @@ -123,7 +123,7 @@ public function select($socket, $handler) { * handled within the function! * * @param int $seconds - * @param function(): ?int + * @param function(): ?int|float * @return int */ public function schedule($seconds, $function) { @@ -215,8 +215,8 @@ public function service() { // When asked to terminate, close sockets in reverse order if ($this->terminate) { - for ($i= sizeof($this->select) - 1; $i >= 0; $i--) { - $this->select[$i]->close(); + for ($i= array_key_last($this->select); $i > 0; $i--) { + isset($this->select[$i]) && $this->select[$i]->close(); // FIXME $continuation->throw() or ->send(INTERRUPTED)? or nothing? } break; From 12fb0483b211bb742cea5a812912453c0555dba5 Mon Sep 17 00:00:00 2001 From: Timm Friebe Date: Sun, 21 Aug 2022 18:23:01 +0200 Subject: [PATCH 03/12] Wait for writeability --- src/test/php/peer/unittest/server/TestingProtocol.class.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/php/peer/unittest/server/TestingProtocol.class.php b/src/test/php/peer/unittest/server/TestingProtocol.class.php index 5b97623..a45f477 100755 --- a/src/test/php/peer/unittest/server/TestingProtocol.class.php +++ b/src/test/php/peer/unittest/server/TestingProtocol.class.php @@ -98,7 +98,8 @@ public function handleData($socket) { case 'ASNC': { for ($i= 0, $s= (int)substr($cmd, 5); $i < $s; $i++) { - yield $socket->write(($i + 1)."\n"); + yield 'write' => $socket; + $socket->write(($i + 1)."\n"); } $socket->write(".\n"); break; From 28ba87ca9efca52b60ad4004a4275511e26b58d3 Mon Sep 17 00:00:00 2001 From: Timm Friebe Date: Sun, 21 Aug 2022 18:23:15 +0200 Subject: [PATCH 04/12] Prevent endless loops --- src/test/php/peer/unittest/server/AsyncServerTest.class.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/php/peer/unittest/server/AsyncServerTest.class.php b/src/test/php/peer/unittest/server/AsyncServerTest.class.php index 192b6d9..7a15b54 100755 --- a/src/test/php/peer/unittest/server/AsyncServerTest.class.php +++ b/src/test/php/peer/unittest/server/AsyncServerTest.class.php @@ -32,7 +32,7 @@ public function read_synchronously_written_data() { $this->connect(); $this->conn->write("SYNC 3\n"); $read= []; - while ('.' !== ($line= $this->conn->readLine())) { + while ('.' !== ($line= $this->conn->readLine()) && !$this->conn->eof()) { $read[]= $line; } $this->conn->close(); @@ -45,7 +45,7 @@ public function read_asynchronously_written_data() { $this->connect(); $this->conn->write("ASNC 3\n"); $read= []; - while ('.' !== ($line= $this->conn->readLine())) { + while ('.' !== ($line= $this->conn->readLine()) && !$this->conn->eof()) { $read[]= $line; } $this->conn->close(); From 4d82b3c09b9cfc5a94ce8aafd2fb55f8427b56dc Mon Sep 17 00:00:00 2001 From: Timm Friebe Date: Sun, 21 Aug 2022 18:27:16 +0200 Subject: [PATCH 05/12] QA: WS --- src/main/php/peer/server/AsyncServer.class.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/php/peer/server/AsyncServer.class.php b/src/main/php/peer/server/AsyncServer.class.php index 0218663..d28845b 100755 --- a/src/main/php/peer/server/AsyncServer.class.php +++ b/src/main/php/peer/server/AsyncServer.class.php @@ -94,7 +94,7 @@ public function select($socket, $handler) { yield 'read' => $socket; yield from $handler->handleData($socket) ?? []; } - + // Handle disconnnect gracefully, ensure socket is closed $handler->handleDisconnect($socket); $socket->close(); @@ -117,7 +117,7 @@ public function select($socket, $handler) { * its next invocation should occur, overwriting the default value * given here. If this integer is negative, the task stops running. * Returns the added task's ID. - * + * * Note: If the task function raises any exception the task stops * running. To continue executing, exceptions must be caught and * handled within the function! @@ -222,7 +222,7 @@ public function service() { break; } - // echo date('H:i:s'), " SELECT ", \util\Objects::stringOf($wait), " @ {\n", + // echo date('H:i:s'), " SELECT ", \util\Objects::stringOf($wait), " @ {\n", // " R: ", \util\Objects::stringOf($readable), "\n", // " W: ", \util\Objects::stringOf($writeable), "\n", // "}\n"; From 64e15874401cd5fd193b5400fe3d7fc2abb8bda5 Mon Sep 17 00:00:00 2001 From: Timm Friebe Date: Sun, 21 Aug 2022 18:34:09 +0200 Subject: [PATCH 06/12] Fix PHP < 7.3.0 Need to pass preserve_keys = true to array_slice() --- src/main/php/peer/server/AsyncServer.class.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/php/peer/server/AsyncServer.class.php b/src/main/php/peer/server/AsyncServer.class.php index d28845b..51fdc56 100755 --- a/src/main/php/peer/server/AsyncServer.class.php +++ b/src/main/php/peer/server/AsyncServer.class.php @@ -27,7 +27,7 @@ static function __static() { // For PHP < 7.3.0 if (!function_exists('array_key_last')) { function array_key_last(&$array) { - return key(array_slice($array, -1)); + return key(array_slice($array, -1, 1, true)); } } } From 6a2787f8157ed3bb7cf07f19aa9ea264c803e8ef Mon Sep 17 00:00:00 2001 From: Timm Friebe Date: Mon, 22 Aug 2022 10:39:44 +0200 Subject: [PATCH 07/12] Allow AsyncServer to be started without sockets --- .../php/peer/server/AsyncServer.class.php | 30 +++++++++++-------- .../unittest/server/AsyncServerTest.class.php | 16 ++++++++++ 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/src/main/php/peer/server/AsyncServer.class.php b/src/main/php/peer/server/AsyncServer.class.php index 51fdc56..faaefd6 100755 --- a/src/main/php/peer/server/AsyncServer.class.php +++ b/src/main/php/peer/server/AsyncServer.class.php @@ -127,7 +127,7 @@ public function select($socket, $handler) { * @return int */ public function schedule($seconds, $function) { - $i= $this->last ? array_key_last($this->last) - 1 : -1; + $i= $this->tasks ? array_key_last($this->tasks) - 1 : -1; $this->tasks[$i]= $function; $this->continuation[$i]= new Continuation(function($function) use($seconds) { try { @@ -148,15 +148,14 @@ public function schedule($seconds, $function) { * @throws lang.IllegalStateException */ public function service() { - if (empty($this->select)) { - throw new IllegalStateException('No sockets to select on'); + if (empty($this->select) && empty($this->tasks)) { + throw new IllegalStateException('No sockets or tasks to execute'); } - $sockets= current($this->select)->kind(); $readable= $writeable= $waitable= $write= []; - $errors= null; - $time= microtime(true); + $sockets= $errors= null; do { + $time= microtime(true); $wait= []; foreach ($this->continuation as $i => $continuation) { if ($continuation->next >= $time) { @@ -222,13 +221,18 @@ public function service() { break; } - // echo date('H:i:s'), " SELECT ", \util\Objects::stringOf($wait), " @ {\n", - // " R: ", \util\Objects::stringOf($readable), "\n", - // " W: ", \util\Objects::stringOf($writeable), "\n", - // "}\n"; - $sockets->select($readable, $writeable, $errors, $wait ? min($wait) : null); - $time= microtime(true); - } while ($this->select); + if ($this->select) { + // echo date('H:i:s'), " SELECT ", \util\Objects::stringOf($wait), " @ {\n", + // " R: ", \util\Objects::stringOf($readable), "\n", + // " W: ", \util\Objects::stringOf($writeable), "\n", + // "}\n"; + $sockets ?? $sockets= current($this->select)->kind(); + $sockets->select($readable, $writeable, $errors, $wait ? min($wait) : null); + } else { + // echo date('H:i:s'), " SLEEP ", \util\Objects::stringOf($wait), "\n"; + $wait && usleep(min($wait) * 1000000); + } + } while (!$this->terminate); } /** diff --git a/src/test/php/peer/unittest/server/AsyncServerTest.class.php b/src/test/php/peer/unittest/server/AsyncServerTest.class.php index 7a15b54..6227342 100755 --- a/src/test/php/peer/unittest/server/AsyncServerTest.class.php +++ b/src/test/php/peer/unittest/server/AsyncServerTest.class.php @@ -1,5 +1,6 @@ schedule(1, function() use($s, &$invoked) { + $invoked++; + $s->shutdown(); + }); + + $this->assertEquals(0, $invoked, 'before service()'); + $s->service(); + $this->assertEquals(1, $invoked, 'after service()'); + } + #[Test] public function connected() { $this->connect(); From 46b536ca14acd7989862b5cc4b65ea7695cfdc76 Mon Sep 17 00:00:00 2001 From: Timm Friebe Date: Mon, 22 Aug 2022 10:54:59 +0200 Subject: [PATCH 08/12] Add tests for repeated execution of a scheduled function --- .../php/peer/server/AsyncServer.class.php | 2 +- .../unittest/server/AsyncServerTest.class.php | 25 +++++++++++++++++-- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/src/main/php/peer/server/AsyncServer.class.php b/src/main/php/peer/server/AsyncServer.class.php index faaefd6..ae703e2 100755 --- a/src/main/php/peer/server/AsyncServer.class.php +++ b/src/main/php/peer/server/AsyncServer.class.php @@ -122,7 +122,7 @@ public function select($socket, $handler) { * running. To continue executing, exceptions must be caught and * handled within the function! * - * @param int $seconds + * @param int|float $seconds * @param function(): ?int|float * @return int */ diff --git a/src/test/php/peer/unittest/server/AsyncServerTest.class.php b/src/test/php/peer/unittest/server/AsyncServerTest.class.php index 6227342..0eb9f6e 100755 --- a/src/test/php/peer/unittest/server/AsyncServerTest.class.php +++ b/src/test/php/peer/unittest/server/AsyncServerTest.class.php @@ -18,18 +18,39 @@ public static function startServer() { #[Test] public function scheduled_function_immediately_invoked() { $invoked= 0; - $s= new AsyncServer(); $s->schedule(1, function() use($s, &$invoked) { $invoked++; $s->shutdown(); }); - $this->assertEquals(0, $invoked, 'before service()'); + $before= $invoked; $s->service(); + + $this->assertEquals(0, $before, 'before service()'); $this->assertEquals(1, $invoked, 'after service()'); } + #[Test, Values([1, 2, 3])] + public function scheduled_function_invoked_after_delay($executions) { + $delay= 0.05; // 50 ms + + $invoked= 0; + $s= new AsyncServer(); + $s->schedule($delay, function() use($s, $executions, &$invoked) { + $invoked++; + if ($invoked >= $executions) $s->shutdown(); + }); + + $start= microtime(true); + $s->service(); + $time= microtime(true) - $start; + $expected= $delay * ($executions - 1); + + $this->assertEquals($executions, $invoked); + $this->assertTrue($time >= $expected, $time.' >= '.$expected); + } + #[Test] public function connected() { $this->connect(); From e0bca4d9c2de05ddd5fbd9a8088bbd09730898cf Mon Sep 17 00:00:00 2001 From: Timm Friebe Date: Mon, 22 Aug 2022 11:03:14 +0200 Subject: [PATCH 09/12] Stop if there are neither sockets to select nor tasks to run --- src/main/php/peer/server/AsyncServer.class.php | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/main/php/peer/server/AsyncServer.class.php b/src/main/php/peer/server/AsyncServer.class.php index ae703e2..ef324e9 100755 --- a/src/main/php/peer/server/AsyncServer.class.php +++ b/src/main/php/peer/server/AsyncServer.class.php @@ -113,10 +113,9 @@ public function select($socket, $handler) { /** * Schedule a given task to execute every given seconds. The task - * function can return an integer to indicate in how many seconds - * its next invocation should occur, overwriting the default value - * given here. If this integer is negative, the task stops running. - * Returns the added task's ID. + * function can return how many seconds its next invocation should + * occur, overwriting the default value given here. If this number + * is negative, the task stops running. Returns the added task's ID. * * Note: If the task function raises any exception the task stops * running. To continue executing, exceptions must be caught and @@ -216,7 +215,6 @@ public function service() { if ($this->terminate) { for ($i= array_key_last($this->select); $i > 0; $i--) { isset($this->select[$i]) && $this->select[$i]->close(); - // FIXME $continuation->throw() or ->send(INTERRUPTED)? or nothing? } break; } @@ -232,7 +230,7 @@ public function service() { // echo date('H:i:s'), " SLEEP ", \util\Objects::stringOf($wait), "\n"; $wait && usleep(min($wait) * 1000000); } - } while (!$this->terminate); + } while ($this->select || $this->tasks); } /** From 33e7a92a0295df71dfd836b12146ab8d507b3087 Mon Sep 17 00:00:00 2001 From: Timm Friebe Date: Mon, 22 Aug 2022 11:09:00 +0200 Subject: [PATCH 10/12] Fix "Implicit conversion from float 1625.7762908935547 to int loses precision" in ::usleep() --- src/main/php/peer/server/AsyncServer.class.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/php/peer/server/AsyncServer.class.php b/src/main/php/peer/server/AsyncServer.class.php index ef324e9..f5e93a5 100755 --- a/src/main/php/peer/server/AsyncServer.class.php +++ b/src/main/php/peer/server/AsyncServer.class.php @@ -228,7 +228,7 @@ public function service() { $sockets->select($readable, $writeable, $errors, $wait ? min($wait) : null); } else { // echo date('H:i:s'), " SLEEP ", \util\Objects::stringOf($wait), "\n"; - $wait && usleep(min($wait) * 1000000); + $wait && usleep(1000000 * (int)min($wait)); } } while ($this->select || $this->tasks); } From dde51581d74b8af2e75d887c58e610e5493e918a Mon Sep 17 00:00:00 2001 From: Timm Friebe Date: Mon, 22 Aug 2022 12:46:04 +0200 Subject: [PATCH 11/12] Add timeout handling This will ensure kept-alive HTTP sockets are closed after a while --- .../php/peer/server/AsyncServer.class.php | 22 ++++++++++++++++--- .../php/peer/server/Continuation.class.php | 16 ++++++++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/src/main/php/peer/server/AsyncServer.class.php b/src/main/php/peer/server/AsyncServer.class.php index f5e93a5..13a55f5 100755 --- a/src/main/php/peer/server/AsyncServer.class.php +++ b/src/main/php/peer/server/AsyncServer.class.php @@ -58,10 +58,13 @@ public function listen($socket, ServerProtocol $protocol) { } $this->tcpnodelay && $connection->useNoDelay(); + $protocol->handleConnect($connection); yield 'accept' => $this->select($connection, $protocol); } while (!$this->terminate); }); + // Never time out sockets we listen on + $this->continuation[$i]->next= null; return $this; } @@ -87,7 +90,6 @@ public function select($socket, $handler) { if ($handler instanceof ServerProtocol) { $this->continuation[$i]= new Continuation(function($socket) use($handler) { try { - $handler->handleConnect($socket); // Check for readability, then handle incoming data while ($socket->isConnected() && !$socket->eof()) { @@ -157,7 +159,7 @@ public function service() { $time= microtime(true); $wait= []; foreach ($this->continuation as $i => $continuation) { - if ($continuation->next >= $time) { + if (null !== $continuation->next && $continuation->next >= $time) { $wait[]= $continuation->next - $time; continue; } else if (isset($this->tasks[$i])) { @@ -165,10 +167,24 @@ public function service() { unset($waitable[$i]); } else if (isset($readable[$i]) || isset($writeable[$i]) || isset($waitable[$i])) { $execute= $continuation->continue($this->select[$i]); + if (null !== $continuation->next) $continuation->next= $time; unset($readable[$i], $writeable[$i], $waitable[$i]); } else { isset($write[$i]) ? $writeable[$i]= $this->select[$i] : $readable[$i]= $this->select[$i]; - continue; + if (null === $continuation->next) continue; + + // Check if the socket has timed out... + $idle= $time - $continuation->next; + $timeout= $this->select[$i]->getTimeout(); + if ($idle < $timeout) { + $wait[]= $timeout - $idle; + continue; + } + + // ...and if so, throw an exception, allowing the continuation to handle it. + $execute= $continuation->throw($this->select[$i], new SocketTimeoutException('Timed out', $timeout)); + $continuation->next= $time; + unset($readable[$i], $writeable[$i]); } // Check whether execution has finished diff --git a/src/main/php/peer/server/Continuation.class.php b/src/main/php/peer/server/Continuation.class.php index a5b07ce..95909e1 100755 --- a/src/main/php/peer/server/Continuation.class.php +++ b/src/main/php/peer/server/Continuation.class.php @@ -26,4 +26,20 @@ public function continue($arg) { return $this->continuation->valid() ? $this->continuation : $this->continuation= null; } + + /** + * Throw an exception into the execution flow + * + * @param var $arg + * @param Throwable $t + * @return ?Generator + */ + public function throw($arg, $t) { + if (null === $this->continuation) { + $this->continuation= ($this->function)($arg); + } + $this->continuation->throw($t); + + return $this->continuation->valid() ? $this->continuation : $this->continuation= null; + } } From 81bc50c6d1dcce314f9d5d687fc48e7a1543b53d Mon Sep 17 00:00:00 2001 From: Timm Friebe Date: Mon, 22 Aug 2022 14:34:07 +0200 Subject: [PATCH 12/12] Document Continuation as non-public --- src/main/php/peer/server/Continuation.class.php | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/php/peer/server/Continuation.class.php b/src/main/php/peer/server/Continuation.class.php index 95909e1..0dd26b1 100755 --- a/src/main/php/peer/server/Continuation.class.php +++ b/src/main/php/peer/server/Continuation.class.php @@ -1,5 +1,10 @@