This repository was archived by the owner on Dec 16, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 504
/
Copy pathNonBlockingSocketServer.php
129 lines (104 loc) · 2.99 KB
/
NonBlockingSocketServer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
<?php
class SocketServer extends Thread {
public $socket;
public $maxThreads;
public $isRunning;
public $threads;
public $host;
public $port;
public $backlog;
public function __construct(int $maxThreads, string $host, int $port, int $backlog) {
$this->maxThreads = $maxThreads;
$this->isRunning = false;
$this->threads = [ ];
$this->host = $host;
$this->port = $port;
$this->backlog = $backlog;
}
public function run() {
$this->socket = new Socket ( AF_INET, SOCK_STREAM, SOL_TCP );
$this->socket->setOption ( SOL_SOCKET, SO_REUSEADDR, 1 );
$this->socket->bind ( $this->host, $this->port );
$this->socket->listen ( $this->backlog );
$this->socket->setBlocking ( false );
$this->isRunning = true;
for($i = 0; $i < $this->maxThreads; $i ++) {
$this->threads [$i] = new RequestHandler ( $this->socket );
$this->threads [$i]->start ();
}
$this->synchronized ( function () {
do {
foreach ( $this->threads as $key => $thread ) {
if ($thread->isRunning ()) {
continue;
}
$thread->join ();
$this->threads [$key] = new RequestHandler ( $this->socket );
$this->threads [$key]->start ();
}
$this->wait ( 500000 ); // 500 ms
} while ( $this->isRunning );
} );
}
public function shutdown() {
$this->isRunning = false;
foreach ( $this->threads as $thread ) {
$thread->shutdown ();
}
$this->socket->close ();
}
}
class RequestHandler extends Thread {
public $mainSocket;
public $threadSocket;
public $isRunning;
public function __construct(Socket $socket) {
$this->mainSocket = $socket;
$this->isRunning = false;
}
public function run() {
$this->isRunning = true;
while ( $this->isRunning ) {
$this->threadSocket = $this->mainSocket->accept ();
if ($this->threadSocket === false) {
$this->synchronized ( function () {
$this->wait ( 5000 ); // 5ms
} );
continue;
}
$threadId = $this->getThreadId ();
$response = "Welcome\nYou are connected with Thread-ID $threadId\nEnter \"quit\" to quit\n\n";
$this->threadSocket->write ( $response, strlen ( $response ) );
do {
$buffer = trim ( $this->threadSocket->read ( 1024 ) );
if ('quit' === $buffer) {
break;
}
$talkBack = "You entered $buffer.\n";
$this->threadSocket->write ( $talkBack, strlen ( $talkBack ) );
} while ( $this->isRunning );
$this->threadSocket->write ( "Goodbye\n", 9 );
$this->threadSocket->close ();
$this->isRunning = false;
}
}
public function shutdown() {
$this->isRunning = false;
}
}
echo 'Multi-Threaded Socket Server started with PID ' . posix_getpid () . "\n";
$server = new SocketServer ( 3, '127.0.0.1', 9004, 2 );
$server->start ();
$running = true;
$shutdown = function () use (&$running, $server) {
echo "Shutting down... \n";
$running = false;
$server->shutdown ();
$server->join ();
echo "Finished\n";
};
pcntl_signal ( SIGINT, $shutdown );
while ( $running ) {
pcntl_signal_dispatch ();
usleep ( 10000 );
}