-
Notifications
You must be signed in to change notification settings - Fork 33
Expand file tree
/
Copy pathSocketServerBroadcast.php
More file actions
executable file
·120 lines (100 loc) · 3.08 KB
/
SocketServerBroadcast.php
File metadata and controls
executable file
·120 lines (100 loc) · 3.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
<?php
namespace Sock;
require_once "SocketServer.php";
require_once "SocketClientBroadcast.php";
class SocketServerBroadcast extends SocketServer {
const PIPENAME = '/tmp/broadcastserver.pid';
protected $pid;
public $pipe;
private $connections = array();
public function __construct( $port = 4444, $address = '127.0.0.1' ) {
parent::__construct( $port, $address );
$this->pid = posix_getpid();
if(!file_exists(self::PIPENAME)) {
umask(0);
if( ! posix_mkfifo(self::PIPENAME, 0666 ) ) {
die('Cant create a pipe: "' . self::PIPENAME . '"');
}
}
$this->pipe = fopen(self::PIPENAME, 'r+');
}
public function handleProcess() {
$header = fread($this->pipe, 4);
$len = $this->bytesToInt( $header );
$message = unserialize( fread( $this->pipe, $len ) );
if( $message['type'] == 'msg' ) {
$client = $this->connections[ $message['pid'] ];
$msg = sprintf('[%s] (%d):%s', $client->getAddress(), $message['pid'], $message['data'] );
$this->log( "Broadcast: $msg\n");
foreach( $this->connections as $pid => $conn ) {
if( $pid == $message['pid'] ) {
continue;
}
$conn->send( $msg );
}
}
else if( $message['type'] == 'disc' ) {
unset( $this->connections[ $message['pid'] ] );
}
}
public function bytesToInt($char) {
$num = ord($char[0]);
$num += ord($char[1]) << 8;
$num += ord($char[2]) << 16;
$num += ord($char[3]) << 24;
return $num;
}
protected function beforeServerLoop() {
parent::beforeServerLoop();
socket_set_nonblock( $this->sockServer );
pcntl_signal(SIGUSR1, array($this, 'handleProcess'), true);
}
protected function serverLoop() {
while( $this->_listenLoop ) {
if( ( $client = @socket_accept( $this->sockServer ) ) === false ) {
$info = array();
if( pcntl_sigtimedwait(array(SIGUSR1),$info,1) > 0 ) {
if( $info['signo'] == SIGUSR1 ) {
$this->handleProcess();
}
}
continue;
}
$socketClient = new SocketClientBroadcast( $client, $this );
if( is_array( $this->connectionHandler ) ) {
$object = $this->connectionHandler[0];
$method = $this->connectionHandler[1];
$childPid = $object->$method( $socketClient );
}
else {
$function = $this->connectionHandler;
$childPid = $function( $socketClient );
}
if( ! $childPid ) {
// force child process to exit from loop
return;
}
$this->connections[ $childPid ] = $socketClient;
}
}
public function broadcast( Array $msg ) {
$msg['pid'] = posix_getpid();
$message = serialize( $msg );
$f = fopen(self::PIPENAME, 'w+');
if( !$f ) {
$this->log("ERROR: Can't open PIPE for writting\n");
return;
}
fwrite($f, $this->strlenInBytes($message) . $message );
fclose($f);
posix_kill($this->pid, SIGUSR1);
}
protected function strlenInBytes($str) {
$len = strlen($str);
$chars = chr( $len & 0xFF );
$chars .= chr( ($len >> 8 ) & 0xFF );
$chars .= chr( ($len >> 16 ) & 0xFF );
$chars .= chr( ($len >> 24 ) & 0xFF );
return $chars;
}
}