Skip to content

Commit 2ab103c

Browse files
authored
Merge pull request #22 from xp-framework/refactor/async-server
Asynchronous server with finer-grained I/O control
2 parents e45481c + a8630de commit 2ab103c

File tree

4 files changed

+246
-124
lines changed

4 files changed

+246
-124
lines changed

src/main/php/peer/server/AsyncServer.class.php

+155-121
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
<?php namespace peer\server;
22

3-
use lang\{IllegalStateException, Throwable};
3+
use Throwable;
4+
use lang\IllegalStateException;
45
use peer\server\protocol\SocketAcceptHandler;
56
use peer\{ServerSocket, SocketException, SocketTimeoutException};
67

@@ -19,7 +20,17 @@
1920
* @test xp://peer.unittest.server.AsyncServerTest
2021
*/
2122
class AsyncServer extends Server {
22-
private $select= [], $handle= [], $tasks= [];
23+
private $select= [], $tasks= [], $continuation= [];
24+
25+
static function __static() {
26+
27+
// For PHP < 7.3.0
28+
if (!function_exists('array_key_last')) {
29+
function array_key_last(&$array) {
30+
return key(array_slice($array, -1, 1, true));
31+
}
32+
}
33+
}
2334

2435
/**
2536
* Adds server socket to listen on, associating protocol handler with it
@@ -36,22 +47,24 @@ public function listen($socket, ServerProtocol $protocol) {
3647
$protocol->server= $this;
3748
$protocol->initialize();
3849

39-
$this->select($socket, function($socket) use($protocol) {
40-
$connection= $socket->accept();
41-
if ($protocol instanceof SocketAcceptHandler && !$protocol->handleAccept($connection)) {
42-
$connection->close();
43-
return;
44-
}
50+
$i= $this->select ? array_key_last($this->select) + 1 : 1;
51+
$this->select[$i]= $socket;
52+
$this->continuation[$i]= new Continuation(function($socket) use($protocol) {
53+
do {
54+
$connection= $socket->accept();
55+
if ($protocol instanceof SocketAcceptHandler && !$protocol->handleAccept($connection)) {
56+
$connection->close();
57+
return;
58+
}
4559

46-
$this->tcpnodelay && $connection->useNoDelay();
47-
$protocol->handleConnect($connection);
48-
$this->select($connection, [
49-
[$protocol, 'handleData'],
50-
[$protocol, 'handleDisconnect'],
51-
[$protocol, 'handleError']
52-
]);
60+
$this->tcpnodelay && $connection->useNoDelay();
61+
$protocol->handleConnect($connection);
62+
yield 'accept' => $this->select($connection, $protocol);
63+
} while (!$this->terminate);
5364
});
5465

66+
// Never time out sockets we listen on
67+
$this->continuation[$i]->next= null;
5568
return $this;
5669
}
5770

@@ -68,42 +81,64 @@ public function shutdown() {
6881
* Adds socket to select, associating a function to call for data
6982
*
7083
* @param peer.Socket|peer.BSDSocket $socket
71-
* @param callable[]|function(peer.Socket|peer.BSDSocket): void $function
84+
* @param peer.Protocol|function(peer.Socket|peer.BSDSocket): void $handler
7285
* @return peer.Socket|peer.BSDSocket
7386
*/
74-
public function select($socket, $function) {
75-
if ($this->select) {
76-
end($this->select);
77-
$i= key($this->select) + 1;
87+
public function select($socket, $handler) {
88+
$i= $this->select ? array_key_last($this->select) + 1 : 1;
89+
$this->select[$i]= $socket;
90+
if ($handler instanceof ServerProtocol) {
91+
$this->continuation[$i]= new Continuation(function($socket) use($handler) {
92+
try {
93+
94+
// Check for readability, then handle incoming data
95+
while ($socket->isConnected() && !$socket->eof()) {
96+
yield 'read' => $socket;
97+
yield from $handler->handleData($socket) ?? [];
98+
}
99+
100+
// Handle disconnnect gracefully, ensure socket is closed
101+
$handler->handleDisconnect($socket);
102+
$socket->close();
103+
} catch (Throwable $t) {
104+
105+
// Handle any errors, then close socket
106+
$handler->handleError($socket, $t);
107+
$socket->close();
108+
}
109+
});
78110
} else {
79-
$i= 0;
111+
$this->continuation[$i]= new Continuation($handler);
80112
}
81-
82-
$this->select[$i]= $socket;
83-
$this->handle[$i]= is_array($function) ? $function : [$function];
84113
return $socket;
85114
}
86115

87116
/**
88117
* Schedule a given task to execute every given seconds. The task
89-
* function can return an integer to indicate in how many seconds
90-
* its next invocation should occur, overwriting the default value
91-
* given here. If this integer is negative, the task stops running.
92-
* Returns the added task's ID.
118+
* function can return how many seconds its next invocation should
119+
* occur, overwriting the default value given here. If this number
120+
* is negative, the task stops running. Returns the added task's ID.
121+
*
122+
* Note: If the task function raises any exception the task stops
123+
* running. To continue executing, exceptions must be caught and
124+
* handled within the function!
93125
*
94-
* @param int $seconds
95-
* @param function(): ?int
126+
* @param int|float $seconds
127+
* @param function(): ?int|float
96128
* @return int
97129
*/
98130
public function schedule($seconds, $function) {
99-
if ($this->tasks) {
100-
end($this->tasks);
101-
$i= key($this->tasks) - 1;
102-
} else {
103-
$i= -1;
104-
}
105-
106-
$this->tasks[$i]= [$seconds, $function];
131+
$i= $this->tasks ? array_key_last($this->tasks) - 1 : -1;
132+
$this->tasks[$i]= $function;
133+
$this->continuation[$i]= new Continuation(function($function) use($seconds) {
134+
try {
135+
while (($seconds= $function() ?? $seconds) >= 0) {
136+
yield 'delay' => $seconds * 1000;
137+
}
138+
} catch (Throwable $t) {
139+
// Not displayed, simply stops execution
140+
}
141+
});
107142
return $i;
108143
}
109144

@@ -114,105 +149,104 @@ public function schedule($seconds, $function) {
114149
* @throws lang.IllegalStateException
115150
*/
116151
public function service() {
117-
if (empty($this->select)) {
118-
throw new IllegalStateException('No sockets to select on');
152+
if (empty($this->select) && empty($this->tasks)) {
153+
throw new IllegalStateException('No sockets or tasks to execute');
119154
}
120155

121-
// Set up scheduled tasks
122-
$time= time();
123-
$next= $continuation= [];
124-
foreach ($this->tasks as $i => $task) {
125-
$next[$i]= $time + $task[0];
126-
}
127-
128-
$null= null;
129-
$sockets= $this->select[0]->kind();
156+
$readable= $writeable= $waitable= $write= [];
157+
$sockets= $errors= null;
130158
do {
131-
132-
// Build array of sockets that we want to check for data. If one of them
133-
// has disconnected in the meantime, notify the listeners (socket will be
134-
// already invalid at that time) and remove it from the clients list.
135-
$read= [];
136-
foreach ($this->select as $i => $socket) {
137-
if (!$socket->isConnected() || $socket->eof()) {
138-
if ($f= $this->handle[$i][1] ?? null) $f($socket);
139-
unset($this->select[$i], $this->handle[$i], $next[$i], $continuation[$i]);
159+
$time= microtime(true);
160+
$wait= [];
161+
foreach ($this->continuation as $i => $continuation) {
162+
if (null !== $continuation->next && $continuation->next >= $time) {
163+
$wait[]= $continuation->next - $time;
140164
continue;
141-
}
142-
143-
// Do not re-enter handler as long as we have a continuation
144-
if (isset($continuation[$i])) continue;
165+
} else if (isset($this->tasks[$i])) {
166+
$execute= $continuation->continue($this->tasks[$i]);
167+
unset($waitable[$i]);
168+
} else if (isset($readable[$i]) || isset($writeable[$i]) || isset($waitable[$i])) {
169+
$execute= $continuation->continue($this->select[$i]);
170+
if (null !== $continuation->next) $continuation->next= $time;
171+
unset($readable[$i], $writeable[$i], $waitable[$i]);
172+
} else {
173+
isset($write[$i]) ? $writeable[$i]= $this->select[$i] : $readable[$i]= $this->select[$i];
174+
if (null === $continuation->next) continue;
145175

146-
// Handle timeouts manually instead of leaving this up to the sockets
147-
// themselves - the latter has proven not to be 100% reliable.
148-
if (isset($next[$i]) && $next[$i] <= $time) {
149-
if ($f= $this->handle[$i][2] ?? null) {
150-
$f($socket, new SocketTimeoutException('Timed out', $socket->getTimeout()));
151-
$socket->close();
152-
unset($this->select[$i], $this->handle[$i], $next[$i], $continuation[$i]);
176+
// Check if the socket has timed out...
177+
$idle= $time - $continuation->next;
178+
$timeout= $this->select[$i]->getTimeout();
179+
if ($idle < $timeout) {
180+
$wait[]= $timeout - $idle;
153181
continue;
154182
}
155-
$next[$i]= $time + $socket->getTimeout();
183+
184+
// ...and if so, throw an exception, allowing the continuation to handle it.
185+
$execute= $continuation->throw($this->select[$i], new SocketTimeoutException('Timed out', $timeout));
186+
$continuation->next= $time;
187+
unset($readable[$i], $writeable[$i]);
156188
}
157189

158-
$read[$i]= $socket;
159-
}
160-
// echo '* SELECT ([', implode(', ', array_keys($next)), '] -> ', $next ? max(0, min($next) - $time) : null, ")\n";
161-
$sockets->select($read, $null, $null, $next ? max(0, min($next) - $time) : null);
162-
163-
// Run scheduled tasks, recording their next run immediately thereafter
164-
$time= time();
165-
foreach ($this->tasks as $i => $task) {
166-
if ($next[$i] <= $time) {
167-
$n= $task[1]();
168-
if ($n < 0) {
169-
unset($this->tasks[$i], $next[$i]);
170-
} else {
171-
$next[$i]= $time + ($n ?? $task[0]);
172-
}
190+
// Check whether execution has finished
191+
if (null === $execute) {
192+
unset($this->tasks[$i], $this->select[$i], $this->continuation[$i], $write[$i]);
193+
continue;
173194
}
174-
}
175195

176-
// There is data on the server socket (meaning a client connection is
177-
// waiting to be socket), or on any of the other sockets, so we'll call
178-
// into their respective data handler. If a generator is returned,
179-
// schedule its continuation for the next possible point.
180-
foreach ($read as $i => $socket) {
181-
try {
182-
$r= $this->handle[$i][0]($socket);
183-
if ($r instanceof \Generator && $r->valid()) {
184-
$continuation[$i]= $r;
185-
$task= $this->schedule(0, function() use(&$continuation, $i) {
186-
try {
187-
if (($c= $continuation[$i] ?? null) && $c->valid()) {
188-
$c->next();
189-
return;
190-
}
191-
} catch (SocketException $t) {
192-
if ($f= $this->handle[$i][2] ?? null) $f($this->select[$i], $t);
193-
$this->select[$i]->close();
194-
}
195-
196-
unset($continuation[$i]);
197-
return -1;
198-
});
199-
$next[$task]= $time;
200-
}
196+
// `yield 'accept' => $socket`: Check for being able to read from socket
197+
// `yield 'read' => $_`: Continue as soon as the socket becomes readable
198+
// `yield 'write' => $_`: Continue as soon as the socket becomes writeable
199+
// `yield 'delay' => $millis`: Wait a specified number of milliseconds
200+
// `yield`: Continue at the next possible execution slot (`delay => 0`)
201+
switch ($execute->key()) {
202+
case 'accept':
203+
$socket= $execute->current();
204+
$readable[array_key_last($this->select)]= $socket;
205+
$readable[$i]= $this->select[$i];
206+
$wait[]= $socket->getTimeout();
207+
break;
201208

202-
$next[$i]= $time + $socket->getTimeout();
203-
} catch (Throwable $t) {
204-
if ($f= $this->handle[$i][2] ?? null) $f($socket, $t);
209+
case 'write':
210+
$write[$i]= true;
211+
$writeable[$i]= $this->select[$i];
212+
$wait[]= $this->select[$i]->getTimeout();
213+
break;
214+
215+
case 'read':
216+
unset($write[$i]);
217+
$readable[$i]= $this->select[$i];
218+
$wait[]= $this->select[$i]->getTimeout();
219+
break;
220+
221+
case 'delay': default:
222+
$delay= $execute->current() / 1000;
223+
$continuation->next= $time + $delay;
224+
$waitable[$i]= true;
225+
$wait[]= $delay;
226+
break;
205227
}
206228
}
207229

208-
$time= time();
209-
} while (!$this->terminate);
230+
// When asked to terminate, close sockets in reverse order
231+
if ($this->terminate) {
232+
for ($i= array_key_last($this->select); $i > 0; $i--) {
233+
isset($this->select[$i]) && $this->select[$i]->close();
234+
}
235+
break;
236+
}
210237

211-
// Close all accepted sockets first, then the listening sockets
212-
for ($i= sizeof($this->select) - 1; $i >= 0; $i--) {
213-
$this->select[$i]->close();
214-
if ($f= $this->handle[$i][1] ?? null) $f($this->select[$i]);
215-
}
238+
if ($this->select) {
239+
// echo date('H:i:s'), " SELECT ", \util\Objects::stringOf($wait), " @ {\n",
240+
// " R: ", \util\Objects::stringOf($readable), "\n",
241+
// " W: ", \util\Objects::stringOf($writeable), "\n",
242+
// "}\n";
243+
$sockets ?? $sockets= current($this->select)->kind();
244+
$sockets->select($readable, $writeable, $errors, $wait ? min($wait) : null);
245+
} else {
246+
// echo date('H:i:s'), " SLEEP ", \util\Objects::stringOf($wait), "\n";
247+
$wait && usleep(1000000 * (int)min($wait));
248+
}
249+
} while ($this->select || $this->tasks);
216250
}
217251

218252
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php namespace peer\server;
2+
3+
/**
4+
* Continuable coroutine. Not a public API!
5+
*
6+
* @see peer.server.AsyncServer
7+
*/
8+
class Continuation {
9+
private $function;
10+
private $continuation= null;
11+
public $next;
12+
13+
/** @param function(var): Generator $function */
14+
public function __construct($function) {
15+
$this->function= $function;
16+
$this->next= microtime(true) - 1;
17+
}
18+
19+
/**
20+
* Continue executing
21+
*
22+
* @param var $arg
23+
* @return ?Generator
24+
*/
25+
public function continue($arg) {
26+
if (null === $this->continuation) {
27+
$this->continuation= ($this->function)($arg);
28+
} else {
29+
$this->continuation->next();
30+
}
31+
32+
return $this->continuation->valid() ? $this->continuation : $this->continuation= null;
33+
}
34+
35+
/**
36+
* Throw an exception into the execution flow
37+
*
38+
* @param var $arg
39+
* @param Throwable $t
40+
* @return ?Generator
41+
*/
42+
public function throw($arg, $t) {
43+
if (null === $this->continuation) {
44+
$this->continuation= ($this->function)($arg);
45+
}
46+
$this->continuation->throw($t);
47+
48+
return $this->continuation->valid() ? $this->continuation : $this->continuation= null;
49+
}
50+
}

0 commit comments

Comments
 (0)