Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Socket::open() to asynchronously connect #20

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 49 additions & 22 deletions src/main/php/peer/BSDSocket.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ class BSDSocket extends Socket {
$options = [];

protected
$rq = '';
$rq = '',
$state = 0;

/** @return peer.Sockets */
public function kind() { return Sockets::$BSD; }
Expand Down Expand Up @@ -50,7 +51,7 @@ public function localEndpoint() {
* @throws lang.IllegalStateException if socket is already connected
*/
public function setDomain($domain) {
if ($this->isConnected()) {
if ($this->_sock) {
throw new \lang\IllegalStateException('Cannot set domain on connected socket');
}
$this->domain= $domain;
Expand All @@ -72,7 +73,7 @@ public function getDomain() {
* @throws lang.IllegalStateException if socket is already connected
*/
public function setType($type) {
if ($this->isConnected()) {
if ($this->_sock) {
throw new \lang\IllegalStateException('Cannot set type on connected socket');
}
$this->type= $type;
Expand All @@ -95,7 +96,7 @@ public function getType() {
* @throws lang.IllegalStateException if socket is already connected
*/
public function setProtocol($protocol) {
if ($this->isConnected()) {
if ($this->_sock) {
throw new \lang\IllegalStateException('Cannot set protocol on connected socket');
}
$this->protocol= $protocol;
Expand Down Expand Up @@ -130,19 +131,43 @@ public function getLastError() {
public function setOption($level, $name, $value) {
$this->options[$level][$name]= $value;

if ($this->isConnected()) {
if ($this->_sock) {
socket_set_option($this->_sock, $level, $name, $value);
}
}

/**
* Connect
* Returns whether a connection has been established
*
* @param float timeout default 2.0
* @return bool success
* @throws peer.ConnectException
* @return bool
*/
public function isConnected() {
if (null === $this->_sock) return false;

// For asynchronously connected sockets, check whether we can get the
// peer name. Once we get one, we've successfully established a connection!
if (1 === $this->state) {
if (false === socket_getpeername($this->_sock, $_, $_)) {
\xp::gc(__FILE__, __LINE__ - 1);
return false;
}

socket_set_block($this->_sock);
$this->state= 2;
}

return true;
}

/**
* Establish a connection
*
* @param float $timeout
* @param int $flags
* @return bool
* @throws peer.ConnectException
*/
public function connect($timeout= 2.0) {
protected function establish($timeout, $flags) {
static $domains= [
AF_INET => 'AF_INET',
AF_INET6 => 'AF_INET6',
Expand All @@ -155,9 +180,7 @@ public function connect($timeout= 2.0) {
SOCK_SEQPACKET => 'SOCK_SEQPACKET',
SOCK_RDM => 'SOCK_RDM'
];

if ($this->isConnected()) return true; // Short-cuircuit this


// Create socket...
if (!($this->_sock= socket_create($this->domain, $this->type, $this->protocol))) {
$this->_sock= null;
Expand Down Expand Up @@ -185,22 +208,24 @@ public function connect($timeout= 2.0) {
$this->setTimeout($timeout);

// ...and connect it
$host= (string)$this->host;
switch ($this->domain) {
case AF_INET:
case AF_INET6: {
$host= null;
if ($this->host instanceof \InetAddress) {
$host= $this->host->asString();
if ($flags & STREAM_CLIENT_ASYNC_CONNECT) {
socket_set_nonblock($this->_sock);
socket_connect($this->_sock, $host, $this->port);
$this->state= 1;
$r= null;
} else {
// TBD: Refactor
$host= gethostbyname($this->host);
$r= socket_connect($this->_sock, $host, $this->port);
$this->state= 2;
}
$r= socket_connect($this->_sock, $host, $this->port);
break;
}

case AF_UNIX: {
$r= socket_connect($this->_sock, $this->host);
$r= socket_connect($this->_sock, $host);
break;
}
}
Expand All @@ -210,6 +235,7 @@ public function connect($timeout= 2.0) {

// Check return status
if (false === $r) {
$this->state= 0;
$this->_sock= null;
$e= new ConnectException(sprintf(
'Connect to %s:%d failed: %s',
Expand All @@ -222,7 +248,7 @@ public function connect($timeout= 2.0) {
}
return true;
}

/**
* Close socket
*
Expand All @@ -234,6 +260,7 @@ public function close() {
socket_close($this->_sock);
$this->_sock= null;
$this->_eof= false;
$this->state= 0;
return true;
}
/**
Expand Down
84 changes: 62 additions & 22 deletions src/main/php/peer/Socket.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
* @see php://network
*/
class Socket implements Channel, Value {
public
$_eof = false,
$host = '',
$port = 0;
public $host, $port;

public
$_eof = false,
$_sock = null,
$_prefix = 'tcp://',
$_timeout = 60;
Expand Down Expand Up @@ -101,7 +99,7 @@ protected function getSocketOption($wrapper, $option) {
* any PHP error/warning is returned - but since there's no function like
* flasterror() we must rely on this
*
* @return string error
* @return string
*/
public function getLastError() {
return isset(\xp::$errors[__FILE__]) ? trim(key(end(\xp::$errors[__FILE__]))) : 'unknown error';
Expand All @@ -110,41 +108,51 @@ public function getLastError() {
/**
* Returns whether a connection has been established
*
* @return bool connected
* @return bool
*/
public function isConnected() {
return null !== $this->_sock;
if (null === $this->_sock) return false;

// For asynchronously connected sockets, the only way we seem to be able to
// reliably determine connectivity is to look up the remote socket name.
// See https://github.com/reactphp/socket/blob/master/src/TcpConnector.php#L105
if ((stream_context_get_options($this->context)['socket']['flags'] ?? 0) & STREAM_CLIENT_ASYNC_CONNECT) {
if (false === stream_socket_get_name($this->_sock, true)) return false;
}
return true;
}

/**
* Clone method. Ensure reconnect
* Ensures reconnect on clone
*
* @return void
*/
public function __clone() {
if (!$this->isConnected()) return;
if (null === $this->_sock) return;

$options= stream_context_get_options($this->context)['socket'];
$this->close();
$this->connect();
$this->establish($options['timeout'] ?? 2.0, $options['flags'] ?? STREAM_CLIENT_CONNECT);
}

/**
* Connect
* Establish a connection
*
* @param float timeout default 2.0
* @see php://fsockopen
* @return bool success
* @throws peer.ConnectException
* @see https://github.com/xp-framework/networking/issues/2
* @see php://stream_socket_client
* @param float $timeout
* @param int $flags
* @return bool success
* @throws peer.ConnectException
*/
public function connect($timeout= 2.0) {
if ($this->isConnected()) return true;

// Force IPv4 for localhost, see https://github.com/xp-framework/networking/issues/2
protected function establish($timeout, $flags) {
$host= (string)$this->host;
if (!$this->_sock= stream_socket_client(
$this->_prefix.('localhost' === $host ? '127.0.0.1' : $host).':'.$this->port,
$errno,
$errstr,
$timeout,
STREAM_CLIENT_CONNECT,
$flags,
$this->context
)) {
$this->_sock= null;
Expand All @@ -160,15 +168,47 @@ public function connect($timeout= 2.0) {
throw $e;
}

// Remember connection attributes
stream_context_set_option($this->context, 'socket', 'timeout', $timeout);
stream_context_set_option($this->context, 'socket', 'flags', $flags);

$s= (int)$this->_timeout;
stream_set_timeout($this->_sock, $s, (int)(1000 * ($this->_timeout - $s)));
return true;
}

/**
* Open a connection asynchronously. Unlike `connect()`, returns immediately.
*
* @param float $timeout default 2.0
* @return bool
* @throws peer.ConnectException
*/
public function open($timeout= 2.0) {
return null === $this->_sock
? $this->establish($timeout, STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT)
: true
;
}

/**
* Open a connection. Waits for it to be completely established before returning.
*
* @param float $timeout default 2.0
* @return bool
* @throws peer.ConnectException
*/
public function connect($timeout= 2.0) {
return null === $this->_sock
? $this->establish($timeout, STREAM_CLIENT_CONNECT)
: true
;
}

/**
* Close socket
*
* @return bool success
* @return bool
*/
public function close() {
if (null === $this->_sock) return false;
Expand Down
3 changes: 2 additions & 1 deletion src/main/php/peer/Sockets.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ static function __static() { }

public function select0(&$r, &$w, &$e, $timeout= null) {
if (null === $timeout) {
$tv_sec= $tv_usec= null;
$tv_sec= null;
$tv_usec= 0;
} else {
$tv_sec= (int)floor($timeout);
$tv_usec= (int)(($timeout - $tv_sec) * 1000000);
Expand Down
23 changes: 23 additions & 0 deletions src/test/php/peer/unittest/sockets/AbstractSocketTest.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -384,4 +384,27 @@ public function select_keyed_array() {

$this->assertEquals(['fixture' => $this->fixture], $r);
}

#[Test]
public function open_connection_asynchronously() {
$this->fixture->open();

$read= $write= $error= [$this->fixture];
$this->fixture->kind()->select($read, $write, $error);

$this->assertTrue($this->fixture->isConnected());
}

#[Test]
public function open_connection_to_unbound_port() {

// Use port 4 which is unassigned and thus VERY unlikely to be bound, see
// https://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers
$fixture= $this->newSocket(self::$bindAddress[0], 4);
$fixture->open();
$read= $write= $error= [$fixture];
$fixture->kind()->select($read, $write, $error);

$this->assertFalse($fixture->isConnected());
}
}
5 changes: 0 additions & 5 deletions src/test/php/peer/unittest/sockets/SocketTest.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@
use peer\unittest\StartServer;
use unittest\actions\VerifyThat;

/**
* TestCase
*
* @see xp://peer.Socket
*/
#[Action(eval: 'new StartServer(TestingServer::class, "connected", "shutdown")')]
class SocketTest extends AbstractSocketTest {

Expand Down