Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend async I/O API by being able to pass other sockets #28

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions src/main/php/peer/server/AsyncServer.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public function select($socket, $handler) {

// Check for readability, then handle incoming data
while ($socket->isConnected() && !$socket->eof()) {
yield 'read' => $socket;
yield 'read' => null;
yield from $handler->handleData($socket) ?? [];
}

Expand Down Expand Up @@ -140,7 +140,21 @@ public function schedule($seconds, $function) {
}
});
return $i;
}
}

/**
* Returns a slot to watch for a given socket
*
* @param peer.Socket|peer.BSDSocket $socket
* @param int $signal the slot to signal
* @return int
*/
private function watch($socket, $signal) {
$slot= $this->select ? array_key_last($this->select) + 1 : 1;
$this->select[$slot]= $socket;
$this->continuation[$slot]= new Continuation(function() use($signal) { yield 'signal' => $signal; });
return $slot;
}

/**
* Runs service until shutdown() is called.
Expand Down Expand Up @@ -193,11 +207,17 @@ public function service() {
continue;
}

// `yield 'accept' => $socket`: Check for being able to read from socket
// `yield 'read' => $_`: Continue as soon as the socket becomes readable
// `yield 'write' => $_`: Continue as soon as the socket becomes writeable
// `yield 'delay' => $millis`: Wait a specified number of milliseconds
// `yield`: Continue at the next possible execution slot (`delay => 0`)
// Internal use:
// * `yield 'accept' => $socket`: Check for being able to read from socket
// * `yield 'signal' => $n`: Finish signalling task, continue slot #n immediately
//
// Public use:
// * `yield 'read' => null`: Continue once this socket becomes readable
// * `yield 'write' => null`: Continue once this socket becomes writeable
// * `yield 'read' => $socket`: Continue as soon as the socket becomes readable
// * `yield 'write' => $socket`: Continue as soon as the socket becomes writeable
// * `yield 'delay' => $millis`: Wait a specified number of milliseconds
// * `yield`: Continue at the next possible execution slot (`delay => 0`)
switch ($execute->key()) {
case 'accept':
$socket= $execute->current();
Expand All @@ -206,24 +226,38 @@ public function service() {
$wait[]= $socket->getTimeout();
break;

case 'signal':
unset($this->tasks[$i], $this->select[$i], $this->continuation[$i], $write[$i]);
$waitable[$execute->current()]= true;
$wait[]= 0;
break;

case 'write':
if ($s= $execute->current()) $i= $this->watch($s, $i);
$write[$i]= true;
$writeable[$i]= $this->select[$i];
$wait[]= $this->select[$i]->getTimeout();
break;

case 'read':
if ($s= $execute->current()) $i= $this->watch($s, $i);
unset($write[$i]);
$readable[$i]= $this->select[$i];
$wait[]= $this->select[$i]->getTimeout();
break;

case 'delay': default:
case 'delay':
$delay= $execute->current() / 1000;
$continuation->next= $time + $delay;
$waitable[$i]= true;
$wait[]= $delay;
break;

default:
$continuation->next= $time;
$waitable[$i]= true;
$wait[]= 0;
break;
}
}

Expand Down