diff --git a/src/main/php/peer/server/AsyncServer.class.php b/src/main/php/peer/server/AsyncServer.class.php index b176412..470d038 100755 --- a/src/main/php/peer/server/AsyncServer.class.php +++ b/src/main/php/peer/server/AsyncServer.class.php @@ -18,6 +18,7 @@ * * @see peer.ServerSocket * @test peer.unittest.server.AsyncServerTest + * @deprecated In favor of AsynchronousServer */ class AsyncServer extends Server { private $select= [], $tasks= [], $continuation= []; @@ -26,9 +27,7 @@ static function __static() { // For PHP < 7.3.0 if (!function_exists('array_key_last')) { - function array_key_last(&$array) { - return key(array_slice($array, -1, 1, true)); - } + eval('function array_key_last($array) { end($array); return key($array); }'); } } diff --git a/src/main/php/peer/server/AsynchronousServer.class.php b/src/main/php/peer/server/AsynchronousServer.class.php new file mode 100755 index 0000000..5ec1f79 --- /dev/null +++ b/src/main/php/peer/server/AsynchronousServer.class.php @@ -0,0 +1,248 @@ +listen(new ServerSocket('127.0.0.1', 6100), new MyProtocol()); + * $server->service(); + * ``` + * + * @see peer.ServerSocket + * @test peer.unittest.server.AsynchronousServerTest + */ +class AsynchronousServer extends ServerImplementation { + private $terminate= false; + private $select= [], $tasks= [], $continuation= []; + + static function __static() { + + // For PHP < 7.3.0 + if (!function_exists('array_key_last')) { + eval('function array_key_last($array) { end($array); return key($array); }'); + } + } + + /** + * Adds server socket to listen on, associating protocol handler with it + * + * @param peer.ServerSocket|peer.BSDServerSocket $socket + * @param peer.server.ServerProtocol $protocol + * @return self + */ + public function listen($socket, ServerProtocol $protocol) { + $socket->create(); + $socket->bind(true); + $socket->listen(); + + $protocol->initialize($this); + + $i= $this->select ? array_key_last($this->select) + 1 : 1; + $this->select[$i]= $socket; + $this->continuation[$i]= new Continuation(function($socket) use($protocol) { + do { + $connection= $socket->accept(); + if ($protocol instanceof SocketAcceptHandler && !$protocol->handleAccept($connection)) { + $connection->close(); + return; + } + + $this->tcpnodelay && $connection->useNoDelay(); + $protocol->handleConnect($connection); + $this->select($connection, $protocol); + yield 'accept' => $connection; + } while (!$this->terminate); + }); + + // Never time out sockets we listen on + $this->continuation[$i]->next= null; + return $this; + } + + /** + * Adds socket to select, associating a function to call for data + * + * @param peer.Socket|peer.BSDSocket $socket + * @param peer.ServerProtocol|function(peer.Socket|peer.BSDSocket): void $handler + * @param bool $timeout + * @return self + */ + public function select($socket, $handler, $timeout= false) { + $i= $this->select ? array_key_last($this->select) + 1 : 1; + $this->select[$i]= $socket; + if ($handler instanceof ServerProtocol) { + $this->continuation[$i]= new Continuation(function($socket) use($handler) { + try { + + // Check for readability, then handle incoming data + while ($socket->isConnected() && !$socket->eof()) { + yield 'read' => $socket; + yield from $handler->handleData($socket) ?? []; + } + + // Handle disconnnect gracefully, ensure socket is closed + $handler->handleDisconnect($socket); + $socket->close(); + } catch (Throwable $t) { + + // Handle any errors, then close socket + $handler->handleError($socket, $t); + $socket->close(); + } + }); + } else { + $this->continuation[$i]= new Continuation($handler); + } + + // Unless explicitely given, ensure sockets we select on never timeout + $timeout || $this->continuation[$i]->next= null; + return $this; + } + + /** + * Schedule a given task to execute every given interval. + * + * @param int|float $interval + * @param function(): ?int|float + * @return self + */ + public function schedule($interval, $function) { + $i= $this->tasks ? array_key_last($this->tasks) - 1 : -1; + $this->tasks[$i]= $function; + $this->continuation[$i]= new Continuation(function($function) use($interval) { + try { + while (($interval= $function() ?? $interval) >= 0) { + yield 'delay' => $interval * 1000; + } + } catch (Throwable $t) { + // Not displayed, simply stops execution + } + }); + + return $this; + } + + /** + * Runs service until shutdown() is called. + * + * @return void + * @throws lang.IllegalStateException + */ + public function service() { + if (empty($this->select) && empty($this->tasks)) { + throw new IllegalStateException('No sockets or tasks to execute'); + } + + $readable= $writeable= $waitable= $write= []; + $sockets= $errors= null; + do { + $time= microtime(true); + $wait= []; + foreach ($this->continuation as $i => $continuation) { + if (null !== $continuation->next && $continuation->next >= $time) { + $wait[]= $continuation->next - $time; + continue; + } else if (isset($this->tasks[$i])) { + $execute= $continuation->continue($this->tasks[$i]); + unset($waitable[$i]); + } else if (isset($readable[$i]) || isset($writeable[$i]) || isset($waitable[$i])) { + $execute= $continuation->continue($this->select[$i]); + if (null !== $continuation->next) $continuation->next= $time; + unset($readable[$i], $writeable[$i], $waitable[$i]); + } else { + isset($write[$i]) ? $writeable[$i]= $this->select[$i] : $readable[$i]= $this->select[$i]; + if (null === $continuation->next) continue; + + // Check if the socket has timed out... + $idle= $time - $continuation->next; + $timeout= $this->select[$i]->getTimeout(); + if ($idle < $timeout) { + $wait[]= $timeout - $idle; + continue; + } + + // ...and if so, throw an exception, allowing the continuation to handle it. + $execute= $continuation->throw($this->select[$i], new SocketTimeoutException('Timed out', $timeout)); + $continuation->next= $time; + unset($readable[$i], $writeable[$i]); + } + + // Check whether execution has finished + if (null === $execute) { + unset($this->tasks[$i], $this->select[$i], $this->continuation[$i], $write[$i]); + continue; + } + + // `yield 'accept' => $socket`: Check for being able to read from socket + // `yield 'read' => $_`: Continue as soon as the socket becomes readable + // `yield 'write' => $_`: Continue as soon as the socket becomes writeable + // `yield 'delay' => $millis`: Wait a specified number of milliseconds + // `yield`: Continue at the next possible execution slot (`delay => 0`) + switch ($execute->key()) { + case 'accept': + $socket= $execute->current(); + $readable[array_key_last($this->select)]= $socket; + $readable[$i]= $this->select[$i]; + $wait[]= $socket->getTimeout(); + break; + + case 'write': + $write[$i]= true; + $writeable[$i]= $this->select[$i]; + $wait[]= $this->select[$i]->getTimeout(); + break; + + case 'read': + unset($write[$i]); + $readable[$i]= $this->select[$i]; + $wait[]= $this->select[$i]->getTimeout(); + break; + + case 'delay': default: + $delay= $execute->current() / 1000; + $continuation->next= $time + $delay; + $waitable[$i]= true; + $wait[]= $delay; + break; + } + } + + // When asked to terminate, close sockets in reverse order + if ($this->terminate) { + for ($i= array_key_last($this->select); $i > 0; $i--) { + isset($this->select[$i]) && $this->select[$i]->close(); + } + break; + } + + if ($this->select) { + // echo date('H:i:s'), " SELECT ", \util\Objects::stringOf($wait), " @ {\n", + // " R: ", \util\Objects::stringOf($readable), "\n", + // " W: ", \util\Objects::stringOf($writeable), "\n", + // "}\n"; + $sockets ?? $sockets= current($this->select)->kind(); + $sockets->select($readable, $writeable, $errors, $wait ? min($wait) : null); + } else { + // echo date('H:i:s'), " SLEEP ", \util\Objects::stringOf($wait), "\n"; + $wait && usleep(1000000 * (int)min($wait)); + } + } while ($this->select || $this->tasks); + } + + /** + * Shutdown the server + * + * @return void + */ + public function shutdown() { + $this->terminate= true; + } +} \ No newline at end of file diff --git a/src/main/php/peer/server/EventServer.class.php b/src/main/php/peer/server/EventServer.class.php index f36eea7..2bcfef2 100755 --- a/src/main/php/peer/server/EventServer.class.php +++ b/src/main/php/peer/server/EventServer.class.php @@ -5,6 +5,7 @@ * * @ext event * @see http://pecl.php.net/package/event + * @deprecated In favor of AsynchronousServer */ class EventServer extends Server { diff --git a/src/main/php/peer/server/ForkedServer.class.php b/src/main/php/peer/server/ForkedServer.class.php new file mode 100755 index 0000000..861d1ae --- /dev/null +++ b/src/main/php/peer/server/ForkedServer.class.php @@ -0,0 +1,331 @@ +listen(new ServerSocket('127.0.0.1', 6100), new MyProtocol()); + * $server->service(); + * ``` + * + * @ext pcntl + * @see peer.server.Server + * @test peer.unittest.server.ForkedServerTest + */ +class ForkedServer extends ServerImplementation { + use Pcntl; + + private $parent, $children, $maxrequests; + private $listen= []; + private $tasks= []; + private $select= []; + + /** + * Constructor + * + * @param int $children default 10 number of children to fork + * @param int $maxrequests default 1000 maxmimum # of requests per child + * @throws lang.IllegalAccessException + */ + public function __construct(int $children= 10, int $maxrequests= 1000) { + self::extension(); + $this->parent= getmypid(); + $this->children= $children; + $this->maxrequests= $maxrequests; + } + + /** + * Adds server socket to listen on, associating protocol handler with it + * + * @param peer.ServerSocket|peer.BSDServerSocket $socket + * @param peer.server.ServerProtocol $protocol + * @return self + */ + public function listen($socket, ServerProtocol $protocol) { + $socket->create(); + $socket->bind(true); + $socket->listen(); + + $protocol->initialize($this); + + $this->listen[]= [$socket, $protocol]; + return $this; + } + + /** + * Adds socket to select, associating a function to call for data + * + * @param peer.Socket|peer.BSDSocket $socket + * @param peer.ServerProtocol|function(peer.Socket|peer.BSDSocket): void $handler + * @param bool $timeout + * @return self + */ + public function select($socket, $handler, $timeout= false) { + $next= $timeout ? microtime(true) + $socket->getTimeout() : null; + if ($handler instanceof ServerProtocol) { + $this->select[]= [$socket, $next, new Continuation(function($socket) use($handler) { + try { + + // Check for readability, then handle incoming data + while ($socket->isConnected() && !$socket->eof()) { + yield 'read' => $socket; + yield from $handler->handleData($socket) ?? []; + } + + // Handle disconnnect gracefully, ensure socket is closed + $handler->handleDisconnect($socket); + $socket->close(); + } catch (Throwable $t) { + + // Handle any errors, then close socket + $handler->handleError($socket, $t); + $socket->close(); + } + })]; + } else { + $this->select[]= [$socket, $next, new Continuation($handler)]; + } + + return $this; + } + + /** + * Schedule a given task to execute every given interval + * + * @param int|float $interval + * @param function(): ?int|float + * @return self + */ + public function schedule($interval, $function) { + $id= -sizeof($this->tasks) - 1; + $this->tasks[$id]= function() use($interval, $function) { + static $signals= [SIGINT, SIGHUP]; + + try { + while (($interval= $function() ?? $interval) >= 0) { + $sec= (int)$interval; + if (pcntl_sigtimedwait($signals, $_, $sec, 1000 * ($sec - $interval)) > 0) return 127; + } + $this->cat && $this->cat->info('Task finished'); + return 0; + } catch (Throwable $t) { + $this->cat && $this->cat->error('Task stopped by ', $t); + return 255; + } + }; + + return $this; + } + + /** + * Spawns a function in a new process and returns the PID + * + * @param function(): int $function + * @return int + * @throws lang.RuntimeError + */ + private function spawn($function) { + $pid= pcntl_fork(); + if (-1 === $pid) { + throw new RuntimeError('Could not fork'); + } else if (0 === $pid) { + exit($function()); + } + return $pid; + } + + /** + * Dispatches a given signal to the children + * + * @param [:int] $children + * @param int $signal + * @return void + */ + private function dispatch($children, $signal) { + foreach ($children as $pid => $i) { + $this->cat && $this->cat->debugf('Dispatching signal %d -> pid %d', $signal, $pid); + posix_kill($pid, $signal); + } + } + + /** + * Service + * + * @return void + * @throws lang.IllegalStateException + */ + public function service() { + $children= []; + $child= function() { + $sockets= current($this->listen)[0]->kind(); + $terminate= false; + + // Gracefully finish current request and exit when restarting or shutting down + $handler= function($sig) use(&$terminate) { + $this->cat && $this->cat->infof('Listener shutting down on signal %d', $sig); + $terminate= true; + }; + pcntl_signal(SIGHUP, $handler); + pcntl_signal(SIGINT, $handler); + pcntl_signal(SIGTERM, SIG_DFL); + + $null= null; + for ($request= 1; $request <= $this->maxrequests; $request++) { + $connection= null; + do { + pcntl_signal_dispatch(); + if ($terminate) return 1; + + $readable= []; + foreach ($this->listen as $i => $listen) { + $readable[$i]= $listen[0]; + } + + // The call to accept() w/o blocking will return null if there is no client available. + // This is the case if select() detects activity but another child has already + // accepted this client. In this case, retry. + if ($sockets->select($readable, $null, $null, null)) { + $selected= key($readable); + $connection= $readable[$selected]->accept(false); + } + } while (null === $connection); + + $protocol= $this->listen[$selected][1]; + $this->cat && $this->cat->debugf('Child handling request #%d with protocol #%d', $request, $selected); + if ($protocol instanceof SocketAcceptHandler && !$protocol->handleAccept($connection)) { + $connection->close(); + continue; + } + + $this->tcpnodelay && $connection->useNoDelay(); + $protocol->handleConnect($connection); + + try { + do { + pcntl_signal_dispatch(); + if ($terminate) break; + + foreach ($protocol->handleData($connection) ?? [] as $_) { } + } while ($connection->isConnected() && !$connection->eof()); + + // Handle disconnnect gracefully, ensure connection is closed + $protocol->handleDisconnect($connection); + $connection->close(); + } catch (Throwable $t) { + + // Handle any errors, then close connection + $protocol->handleError($connection, $t); + $connection->close(); + } + } + + // Respawn listeners + return 1; + }; + + // Spawn the specified number of children + for ($i= 0; $i < $this->children; $i++) { + $pid= $this->spawn($child); + $this->cat && $this->cat->debugf('Spawned listener #%d (pid %d)', $i, $pid); + $children[$pid]= $i; + } + + // Spawn any tasks + foreach ($this->tasks as $i => $task) { + $pid= $this->spawn($task); + $this->cat && $this->cat->debugf('Spawned task #%d (pid %d)', -($i + 1), $pid); + $children[$pid]= $i; + } + + $this->cat && $this->cat->info('Parent started with', array_keys($children)); + + // Setup signal handlers for terminating and restarting + $terminate= false; + $restart= function($sig) use(&$children) { + $this->cat && $this->cat->infof('Parent restarting children', $sig); + $this->dispatch($children, SIGHUP); + // Children will be respawned from within the main loop + }; + $shutdown= function($sig) use(&$terminate) { + $this->cat && $this->cat->infof('Parent shutting down on signal %d', $sig); + $terminate= true; + }; + pcntl_signal(SIGHUP, $restart); + pcntl_signal(SIGINT, $shutdown); + pcntl_signal(SIGTERM, $shutdown); + + // Main loop + $null= null; + do { + if ($this->select) { + $sockets ?? $sockets= current($this->listen)[0]->kind(); + $readable= []; + foreach ($this->select as $i => $select) { + $readable[$i]= $select[0]; + } + + // echo date('H:i:s'), " SELECT 1 @ {", \util\Objects::stringOf($readable), "}\n"; + if ($sockets->select($readable, $null, $null, 1)) { + foreach ($readable as $i => $socket) { + if (null === $this->select[$i][2]->continue($socket)) { + unset($this->select[$i]); + } + } + } + } else { + // echo date('H:i:s'), " SLEEP 1\n"; + sleep(1); + } + + pcntl_signal_dispatch(); + if ($terminate) break; + + // Respawn children and tasks if necessary + while (($exited= pcntl_wait($status, WNOHANG)) > 0) { + $i= $children[$exited] ?? null; + unset($children[$exited]); + if (0 === $status || null === $i) continue; + + $pid= $this->spawn($this->tasks[$i] ?? $child); + $this->cat && $this->cat->debugf('Respawned pid %d (exit status %d) -> %d)', $exited, $status, $pid); + $children[$pid]= $i; + } + } while (true); + + // Tell all children to quit and wait 1 second for them to exit + $this->dispatch($children, SIGINT); + for ($i= 0; $i < 10; $i++) { + while (($exited= pcntl_wait($status, WNOHANG)) > 0) { + $this->cat && $this->cat->debugf('Waited for pid %d (exit status %d) after %d ms', $exited, $status, $i * 100); + unset($children[$exited]); + } + + if (empty($children)) break; + usleep(100000); + } + + // Forcefully terminate children which didn't exit yet + $this->dispatch($children, SIGKILL); + + // Close all listening sockets in reverse order + for ($i= array_key_last($this->listen); $i > 0; $i--) { + $this->listen[$i][0]->close(); + } + $this->cat && $this->cat->infof('Shutdown complete'); + } + + /** + * Shutdown the server + * + * @return void + */ + public function shutdown() { + posix_kill($this->parent, SIGTERM); + } +} \ No newline at end of file diff --git a/src/main/php/peer/server/ForkingServer.class.php b/src/main/php/peer/server/ForkingServer.class.php index 8b221ec..d123b67 100755 --- a/src/main/php/peer/server/ForkingServer.class.php +++ b/src/main/php/peer/server/ForkingServer.class.php @@ -7,6 +7,7 @@ * * @ext pcntl * @see peer.server.Server + * @deprecated In favor of ForkedServer */ class ForkingServer extends Server { use Pcntl; diff --git a/src/main/php/peer/server/PreforkingServer.class.php b/src/main/php/peer/server/PreforkingServer.class.php index 935099c..94a1a5f 100755 --- a/src/main/php/peer/server/PreforkingServer.class.php +++ b/src/main/php/peer/server/PreforkingServer.class.php @@ -8,6 +8,7 @@ * * @ext pcntl * @see peer.server.Server + * @deprecated In favor of ForkedServer */ class PreforkingServer extends Server implements Traceable { use Pcntl; diff --git a/src/main/php/peer/server/Server.class.php b/src/main/php/peer/server/Server.class.php index a04afc0..5577dba 100755 --- a/src/main/php/peer/server/Server.class.php +++ b/src/main/php/peer/server/Server.class.php @@ -16,6 +16,7 @@ * @ext sockets * @see peer.ServerSocket * @test peer.unittest.server.ServerTest + * @deprecated In favor of the new ServerImplementation interface */ class Server { public diff --git a/src/main/php/peer/server/ServerImplementation.class.php b/src/main/php/peer/server/ServerImplementation.class.php new file mode 100755 index 0000000..6f63887 --- /dev/null +++ b/src/main/php/peer/server/ServerImplementation.class.php @@ -0,0 +1,85 @@ +cat= $cat; + } + + /** + * Set TCP_NODELAY + * + * @param bool $tcpnodelay + * @return self + */ + public function useNoDelay($tcpnodelay) { + $this->tcpnodelay= $tcpnodelay; + return $this; + } + + /** + * Sets socket to listen on and protocol to implement + * + * @param peer.ServerSocket|peer.BSDServerSocket $socket + * @param peer.ServerProtocol $protocol + * @return self + */ + public abstract function listen($socket, ServerProtocol $protocol); + + /** + * Adds socket to select, associating a function to call for data + * + * @param peer.Socket|peer.BSDSocket $socket + * @param peer.ServerProtocol|function(peer.Socket|peer.BSDSocket): void $handler + * @param bool $timeout + * @return self + */ + public abstract function select($socket, $handler, $timeout= false); + + /** + * Schedule a given task to execute every given interval. The task + * function can return how many seconds its next invocation should + * occur, overwriting the default value given here. If this number + * is negative, the task stops running. Returns the added task's ID. + * + * Note: If the task function raises any exception the task stops + * running. To continue executing, exceptions must be caught and + * handled within the function! + * + * @param int|float $interval + * @param function(): ?int|float + * @return self + */ + public abstract function schedule($interval, $function); + + /** + * Service + * + * @return void + * @throws lang.IllegalStateException + */ + public abstract function service(); + + /** + * Shutdown the server + * + * @return void + */ + public abstract function shutdown(); +} \ No newline at end of file diff --git a/src/test/php/peer/unittest/StartServer.class.php b/src/test/php/peer/unittest/StartServer.class.php index 5aabaf3..0c6b791 100755 --- a/src/test/php/peer/unittest/StartServer.class.php +++ b/src/test/php/peer/unittest/StartServer.class.php @@ -2,14 +2,24 @@ use lang\{Runtime, IllegalStateException}; use peer\SocketEndpoint; +use peer\server\AsynchronousServer; use test\Provider; use test\execution\Context; class StartServer implements Provider { private $process, $endpoint; - public function __construct($implementation, $arguments= []) { - $this->process= Runtime::getInstance()->newInstance(null, 'class', strtr($implementation, '\\', '.'), $arguments); + /** + * Starts a testing server + * + * @param string $protocol Protocol class + * @param string $implementation Server implementation class + */ + public function __construct($protocol, $implementation= AsynchronousServer::class) { + $this->process= Runtime::getInstance()->newInstance(null, 'class', 'peer.unittest.TestingServer', [ + $protocol, + $implementation, + ]); $this->process->in->close(); // Check if startup succeeded @@ -22,6 +32,7 @@ public function __construct($implementation, $arguments= []) { $this->endpoint= SocketEndpoint::valueOf($endpoint); } + /** @return var */ public function values(Context $context) { return [$this->process, $this->endpoint]; } diff --git a/src/test/php/peer/unittest/server/TestingServer.class.php b/src/test/php/peer/unittest/TestingServer.class.php similarity index 75% rename from src/test/php/peer/unittest/server/TestingServer.class.php rename to src/test/php/peer/unittest/TestingServer.class.php index a8efc4d..ea950df 100755 --- a/src/test/php/peer/unittest/server/TestingServer.class.php +++ b/src/test/php/peer/unittest/TestingServer.class.php @@ -1,4 +1,4 @@ -newInstance(); - $socket= new ServerSocket('127.0.0.1', 0); + $protocol= XPClass::forName($args[0])->newInstance(); + $impl= XPClass::forName($args[1])->newInstance(); + $socket= new ServerSocket('127.0.0.1', 0); try { - $s->listen($socket, XPClass::forName($args[0])->newInstance()); + $impl->listen($socket, $protocol); Console::writeLinef('+ Service %s:%d', $socket->host, $socket->port); - $s->service(); + $impl->service(); Console::writeLine('+ Done'); } catch (Throwable $e) { Console::writeLine('- ', $e->getMessage()); diff --git a/src/test/php/peer/unittest/server/AbstractServerTest.class.php b/src/test/php/peer/unittest/server/AbstractServerTest.class.php index f561173..28dfd91 100755 --- a/src/test/php/peer/unittest/server/AbstractServerTest.class.php +++ b/src/test/php/peer/unittest/server/AbstractServerTest.class.php @@ -28,7 +28,6 @@ protected function connectTo(Socket $socket): string { return $socket->readLine(); } - #[After] public function closeSockets() { foreach ($this->sockets as $socket) { @@ -38,6 +37,6 @@ public function closeSockets() { #[After] public function shutdownServer() { - $this->server->terminate(10); + $this->server->terminate(2); } } \ No newline at end of file diff --git a/src/test/php/peer/unittest/server/AcceptTestingProtocol.class.php b/src/test/php/peer/unittest/server/AcceptTestingProtocol.class.php index 825b84d..f25eb81 100755 --- a/src/test/php/peer/unittest/server/AcceptTestingProtocol.class.php +++ b/src/test/php/peer/unittest/server/AcceptTestingProtocol.class.php @@ -12,7 +12,7 @@ class AcceptTestingProtocol extends TestingProtocol implements SocketAcceptHandl * @return bool */ public function handleAccept($socket) { - Console::$err->writeLine('ACCEPT ', $this->hashOf($socket)); + Console::$err->writeLine('ACCEPT ', $socket->hashCode()); return true; } } \ No newline at end of file diff --git a/src/test/php/peer/unittest/server/AcceptingServerTest.class.php b/src/test/php/peer/unittest/server/AcceptingServerTest.class.php index 8cbd35d..4af39b6 100755 --- a/src/test/php/peer/unittest/server/AcceptingServerTest.class.php +++ b/src/test/php/peer/unittest/server/AcceptingServerTest.class.php @@ -3,7 +3,7 @@ use peer\unittest\StartServer; use test\{Assert, Test}; -#[StartServer(TestingServer::class, ['peer.unittest.server.AcceptTestingProtocol', 'peer.server.Server'])] +#[StartServer(protocol: AcceptTestingProtocol::class)] class AcceptingServerTest extends AbstractServerTest { #[Test] diff --git a/src/test/php/peer/unittest/server/AsyncServerTest.class.php b/src/test/php/peer/unittest/server/AsyncServerTest.class.php index 8dfe53b..55d7162 100755 --- a/src/test/php/peer/unittest/server/AsyncServerTest.class.php +++ b/src/test/php/peer/unittest/server/AsyncServerTest.class.php @@ -2,9 +2,10 @@ use peer\server\AsyncServer; use peer\unittest\StartServer; -use test\{Assert, Ignore, Test, Values}; +use test\{Assert, Test, Values}; -#[StartServer(TestingServer::class, ['peer.unittest.server.TestingProtocol', 'peer.server.AsyncServer'])] +/** @deprecated in favor of AsynchronousServerTest */ +#[StartServer(protocol: TestingProtocol::class, implementation: AsyncServer::class)] class AsyncServerTest extends AbstractServerTest { #[Test] diff --git a/src/test/php/peer/unittest/server/AsynchronousServerTest.class.php b/src/test/php/peer/unittest/server/AsynchronousServerTest.class.php new file mode 100755 index 0000000..73c2955 --- /dev/null +++ b/src/test/php/peer/unittest/server/AsynchronousServerTest.class.php @@ -0,0 +1,97 @@ +schedule(1, function() use($s, &$invoked) { + $invoked++; + $s->shutdown(); + }); + + $before= $invoked; + $s->service(); + + Assert::equals(0, $before, 'before service()'); + Assert::equals(1, $invoked, 'after service()'); + } + + #[Test, Values([1, 2, 3])] + public function scheduled_function_invoked_after_delay($executions) { + $delay= 0.05; // 50 ms + + $invoked= 0; + $s= new AsynchronousServer(); + $s->schedule($delay, function() use($s, $executions, &$invoked) { + $invoked++; + if ($invoked >= $executions) $s->shutdown(); + }); + + $start= microtime(true); + $s->service(); + $time= microtime(true) - $start; + $expected= $delay * ($executions - 1); + + Assert::equals($executions, $invoked); + Assert::true($time >= $expected, $time.' >= '.$expected); + } + + #[Test] + public function connected() { + $socket= $this->newSocket(); + $client= $this->connectTo($socket); + + Assert::equals('CONNECT '.$client, $this->server->err->readLine()); + } + + #[Test] + public function disconnected() { + $socket= $this->newSocket(); + $client= $this->connectTo($socket); + $socket->close(); + + Assert::equals('CONNECT '.$client, $this->server->err->readLine()); + Assert::equals('DISCONNECT '.$client, $this->server->err->readLine()); + } + + #[Test] + public function read_synchronously_written_data() { + $socket= $this->newSocket(); + $client= $this->connectTo($socket); + + $socket->write("SYNC 3\n"); + $read= []; + while ('.' !== ($line= $socket->readLine()) && !$socket->eof()) { + $read[]= $line; + } + $socket->close(); + Assert::equals(['1', '2', '3'], $read); + + Assert::equals('CONNECT '.$client, $this->server->err->readLine()); + Assert::equals('DISCONNECT '.$client, $this->server->err->readLine()); + } + + #[Test] + public function read_asynchronously_written_data() { + $socket= $this->newSocket(); + $client= $this->connectTo($socket); + + $socket->write("ASNC 3\n"); + $read= []; + while ('.' !== ($line= $socket->readLine()) && !$socket->eof()) { + $read[]= $line; + } + $socket->close(); + Assert::equals(['1', '2', '3'], $read); + + Assert::equals('CONNECT '.$client, $this->server->err->readLine()); + Assert::equals('DISCONNECT '.$client, $this->server->err->readLine()); + } +} \ No newline at end of file diff --git a/src/test/php/peer/unittest/server/ForkedServerTest.class.php b/src/test/php/peer/unittest/server/ForkedServerTest.class.php new file mode 100755 index 0000000..3c9e0c4 --- /dev/null +++ b/src/test/php/peer/unittest/server/ForkedServerTest.class.php @@ -0,0 +1,28 @@ +newSocket(); + $client= $this->connectTo($socket); + + Assert::equals('CONNECT '.$client, $this->server->err->readLine()); + } + + #[Test] + public function disconnected() { + $socket= $this->newSocket(); + $client= $this->connectTo($socket); + $socket->close(); + + Assert::equals('CONNECT '.$client, $this->server->err->readLine()); + Assert::equals('DISCONNECT '.$client, $this->server->err->readLine()); + } +} \ No newline at end of file diff --git a/src/test/php/peer/unittest/server/ServerTest.class.php b/src/test/php/peer/unittest/server/ServerTest.class.php index e824078..6215ceb 100755 --- a/src/test/php/peer/unittest/server/ServerTest.class.php +++ b/src/test/php/peer/unittest/server/ServerTest.class.php @@ -1,9 +1,11 @@ hashCode(); + public function initialize($server= null) { + $this->server= $server; } /** @@ -38,7 +32,7 @@ protected function hashOf($socket) { * @param peer.Socket socket */ public function handleDisconnect($socket) { - Console::$err->writeLine('DISCONNECT ', $this->hashOf($socket)); + Console::$err->writeLine('DISCONNECT ', $socket->hashCode()); } /** @@ -48,7 +42,7 @@ public function handleDisconnect($socket) { * @param lang.XPException e */ public function handleError($socket, $e) { - Console::$err->writeLine('ERROR ', $this->hashOf($socket)); + Console::$err->writeLine('ERROR ', $socket->hashCode()); } /** @@ -57,7 +51,7 @@ public function handleError($socket, $e) { * @param peer.Socket socket */ public function handleConnect($socket) { - Console::$err->writeLine('CONNECT ', $this->hashOf($socket)); + Console::$err->writeLine('CONNECT ', $socket->hashCode()); } /** @@ -69,7 +63,7 @@ public function handleData($socket) { $cmd= $socket->readLine(); switch (substr($cmd, 0, 4)) { case 'CLNT': { - $socket->write($this->hashOf($socket)."\n"); + $socket->write($socket->hashCode()."\n"); break; } @@ -97,7 +91,7 @@ public function handleData($socket) { case 'HALT': { $socket->write("+HALT\n"); - $this->server->terminate= true; + $this->server->shutdown(); break; } } diff --git a/src/test/php/peer/unittest/sockets/AbstractSocketTest.class.php b/src/test/php/peer/unittest/sockets/AbstractSocketTest.class.php index 5bdf128..f3939be 100755 --- a/src/test/php/peer/unittest/sockets/AbstractSocketTest.class.php +++ b/src/test/php/peer/unittest/sockets/AbstractSocketTest.class.php @@ -5,7 +5,7 @@ use peer\{ConnectException, Socket, SocketEndpoint, SocketException, SocketTimeoutException}; use test\{After, Assert, Expect, Ignore, Test}; -#[StartServer(TestingServer::class)] +#[StartServer(protocol: TestingProtocol::class)] abstract class AbstractSocketTest { protected $server, $endpoint; protected $sockets= []; diff --git a/src/test/php/peer/unittest/sockets/TestingProtocol.class.php b/src/test/php/peer/unittest/sockets/TestingProtocol.class.php new file mode 100755 index 0000000..46dd46c --- /dev/null +++ b/src/test/php/peer/unittest/sockets/TestingProtocol.class.php @@ -0,0 +1,65 @@ +readLine(); + switch (substr($cmd, 0, 4)) { + case "ECHO": { + $socket->write("+ECHO ".substr($cmd, 5)."\n"); + break; + } + case "LINE": { + sscanf(substr($cmd, 5), "%d %s", $l, $sep); + for ($i= 0, $sbytes= urldecode($sep); $i < $l; $i++) { + $socket->write("+LINE ".$i.$sbytes); + } + $socket->write("+LINE .\n"); + break; + } + case "CLOS": { + $socket->close(); + break; + } + case "HALT": { + $socket->write("+HALT\n"); + $this->server->terminate= TRUE; + break; + } + } + } + + public function handleDisconnect($socket) { } + + public function handleError($socket, $e) { } +} \ No newline at end of file diff --git a/src/test/php/peer/unittest/sockets/TestingServer.class.php b/src/test/php/peer/unittest/sockets/TestingServer.class.php deleted file mode 100755 index 28631d1..0000000 --- a/src/test/php/peer/unittest/sockets/TestingServer.class.php +++ /dev/null @@ -1,97 +0,0 @@ - - *