Skip to content

Commit 3607ef4

Browse files
committed
Refactor Server to improve client management and request handling
1 parent bf49f6f commit 3607ef4

2 files changed

Lines changed: 475 additions & 55 deletions

File tree

src/Executor/Server.php

Lines changed: 148 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@ class Server
1818
{
1919
private string $host;
2020
private int $port;
21-
2221
private OutputInterface $output;
23-
2422
private bool $stop = false;
2523

2624
/**
@@ -29,32 +27,35 @@ class Server
2927
private $socket;
3028

3129
/**
32-
* @var resource[]
30+
* Each client is tracked with its socket, read buffer, and connection timestamp.
31+
* @var array<int, array{socket: resource, buffer: string, connectedAt: float}>
3332
*/
34-
private array $clientSockets = [];
33+
private array $clients = [];
34+
35+
private int $nextClientId = 0;
3536

3637
private Closure $afterCallback;
3738
private Closure $tickerCallback;
3839
private Closure $routerCallback;
3940

40-
public function __construct($host, $port, OutputInterface $output)
41+
/**
42+
* Timeout in seconds for idle client connections.
43+
*/
44+
private float $clientTimeout = 30.0;
45+
46+
private const REASON_PHRASES = [
47+
200 => 'OK',
48+
404 => 'Not Found',
49+
500 => 'Internal Server Error',
50+
];
51+
52+
public function __construct(string $host, int $port, OutputInterface $output)
4153
{
42-
self::checkRequiredExtensionsExists();
4354
$this->host = $host;
4455
$this->port = $port;
4556
$this->output = $output;
4657
}
4758

48-
public static function checkRequiredExtensionsExists(): void
49-
{
50-
if (!function_exists('socket_import_stream')) {
51-
throw new Exception('Required PHP extension "sockets" is not loaded');
52-
}
53-
if (!function_exists('stream_set_blocking')) {
54-
throw new Exception('Required PHP extension "stream" is not loaded');
55-
}
56-
}
57-
5859
public function run(): void
5960
{
6061
try {
@@ -84,6 +85,9 @@ public function run(): void
8485
$this->output->writeln("[master] Stopping server at http://{$this->host}:{$this->port}");
8586
}
8687
} finally {
88+
foreach ($this->clients as $id => $client) {
89+
$this->closeClient($id);
90+
}
8791
if (isset($this->socket)) {
8892
fclose($this->socket);
8993
}
@@ -112,7 +116,7 @@ private function updatePort(): void
112116
{
113117
$name = stream_socket_get_name($this->socket, false);
114118
if ($name) {
115-
list(, $port) = explode(':', $name);
119+
[, $port] = explode(':', $name);
116120
$this->port = (int) $port;
117121
} else {
118122
throw new Exception("Failed to get the assigned port");
@@ -121,46 +125,104 @@ private function updatePort(): void
121125

122126
private function acceptNewConnections(): void
123127
{
124-
$newClientSocket = @stream_socket_accept($this->socket, 0);
125-
if ($newClientSocket) {
128+
while ($newClientSocket = @stream_socket_accept($this->socket, 0)) {
126129
if (!stream_set_blocking($newClientSocket, false)) {
127-
throw new Exception("Failed to set client socket to non-blocking mode");
130+
fclose($newClientSocket);
131+
continue;
128132
}
129-
$this->clientSockets[] = $newClientSocket;
133+
$id = $this->nextClientId++;
134+
$this->clients[$id] = [
135+
'socket' => $newClientSocket,
136+
'buffer' => '',
137+
'connectedAt' => microtime(true),
138+
];
130139
}
131140
}
132141

133142
private function handleClientRequests(): void
134143
{
135-
foreach ($this->clientSockets as $key => $clientSocket) {
136-
if (feof($clientSocket)) {
137-
$this->closeClientSocket($clientSocket, $key);
144+
foreach ($this->clients as $id => $client) {
145+
$socket = $client['socket'];
146+
147+
if (feof($socket)) {
148+
$this->closeClient($id);
138149
continue;
139150
}
140151

141-
$request = $this->readClientRequest($clientSocket);
142-
list($path, $payload) = $this->parseRequest($request);
143-
$response = ($this->routerCallback)($path, $payload);
152+
// Read available data into the buffer.
153+
$data = @fread($socket, 65536);
154+
if ($data !== false && $data !== '') {
155+
$this->clients[$id]['buffer'] .= $data;
156+
}
144157

145-
$this->sendResponse($clientSocket, $response);
146-
$this->closeClientSocket($clientSocket, $key);
158+
// Check if we have a complete request.
159+
$buffer = $this->clients[$id]['buffer'];
160+
if (!self::isCompleteRequest($buffer)) {
161+
// Check for idle timeout.
162+
if (microtime(true) - $client['connectedAt'] > $this->clientTimeout) {
163+
$this->closeClient($id);
164+
}
165+
continue;
166+
}
167+
168+
// Process the complete request.
169+
try {
170+
[$path, $payload] = self::parseRequest($buffer);
171+
$response = ($this->routerCallback)($path, $payload);
172+
$this->sendResponse($socket, $response);
173+
} catch (\Throwable $e) {
174+
$errorResponse = new Response(500, ['error' => $e->getMessage()]);
175+
try {
176+
$this->sendResponse($socket, $errorResponse);
177+
} catch (\Throwable) {
178+
// Best effort — socket may be broken.
179+
}
180+
} finally {
181+
$this->closeClient($id);
182+
}
147183
}
148184
}
149185

150-
private function readClientRequest($clientSocket)
186+
/**
187+
* Check if the buffer contains a complete HTTP request
188+
* (headers terminated by \r\n\r\n, and body length matches Content-Length).
189+
*/
190+
public static function isCompleteRequest(string $buffer): bool
151191
{
152-
$request = stream_get_contents($clientSocket);
192+
$headerEnd = strpos($buffer, "\r\n\r\n");
193+
if ($headerEnd === false) {
194+
return false;
195+
}
196+
197+
$headers = substr($buffer, 0, $headerEnd);
198+
$bodyStart = $headerEnd + 4;
153199

154-
if ($request === false) {
155-
throw new Exception('Socket read failed');
200+
// Extract Content-Length from headers.
201+
if (preg_match('/^Content-Length:\s*(\d+)/mi', $headers, $matches)) {
202+
$contentLength = (int) $matches[1];
203+
return strlen($buffer) - $bodyStart >= $contentLength;
156204
}
157205

158-
return $request;
206+
// No Content-Length header — request is complete after headers.
207+
return true;
159208
}
160209

161-
private function parseRequest($request)
210+
/**
211+
* Parse a complete HTTP request into path and JSON payload.
212+
*
213+
* @return array{0: string, 1: mixed}
214+
*/
215+
public static function parseRequest(string $request): array
162216
{
163-
$lines = explode("\r\n", $request);
217+
$headerEnd = strpos($request, "\r\n\r\n");
218+
if ($headerEnd === false) {
219+
throw new Exception("Malformed request: no header terminator found");
220+
}
221+
222+
$headerSection = substr($request, 0, $headerEnd);
223+
$body = substr($request, $headerEnd + 4);
224+
225+
$lines = explode("\r\n", $headerSection);
164226
$requestLine = $lines[0];
165227
$parts = explode(' ', $requestLine);
166228
if (count($parts) !== 3) {
@@ -171,50 +233,81 @@ private function parseRequest($request)
171233
$headers = [];
172234
for ($i = 1; $i < count($lines); $i++) {
173235
$line = $lines[$i];
174-
if (empty($line)) {
175-
break;
236+
if (str_contains($line, ':')) {
237+
[$key, $value] = explode(':', $line, 2);
238+
$headers[strtolower(trim($key))] = trim($value);
176239
}
177-
[$key, $value] = explode(':', $line, 2);
178-
$headers[$key] = trim($value);
179240
}
180-
if (empty($headers['Content-Type']) || $headers['Content-Type'] !== 'application/json') {
241+
242+
if (empty($headers['content-type']) || $headers['content-type'] !== 'application/json') {
181243
throw new Exception("Malformed request: invalid Content-Type");
182244
}
183245

184-
$payload = json_decode(implode("\n", array_slice($lines, $i + 1)), true, flags: JSON_THROW_ON_ERROR);
246+
// Trim body to Content-Length if present (ignore trailing data).
247+
if (isset($headers['content-length'])) {
248+
$body = substr($body, 0, (int) $headers['content-length']);
249+
}
250+
251+
$payload = json_decode($body, true, flags: JSON_THROW_ON_ERROR);
185252
return [$path, $payload];
186253
}
187254

188-
private function sendResponse($clientSocket, Response $response)
255+
/**
256+
* @param resource $socket
257+
*/
258+
private function sendResponse($socket, Response $response): void
189259
{
190260
$code = $response->getStatus();
261+
$reason = self::REASON_PHRASES[$code] ?? 'Unknown';
191262
$content = json_encode($response->getBody(), flags: JSON_PRETTY_PRINT);
192-
$headers = "HTTP/1.1 $code OK\r\n"
263+
$data = "HTTP/1.1 $code $reason\r\n"
193264
. "Content-Type: application/json\r\n"
194265
. "Content-Length: " . strlen($content) . "\r\n"
195-
. "Connection: close\r\n\r\n";
196-
fwrite($clientSocket, $headers . $content);
266+
. "Connection: close\r\n\r\n"
267+
. $content;
268+
269+
self::writeAll($socket, $data);
197270
}
198271

199-
private function closeClientSocket($clientSocket, $key): void
272+
/**
273+
* Write all data to socket, handling partial writes.
274+
*
275+
* @param resource $socket
276+
*/
277+
public static function writeAll($socket, string $data): void
200278
{
201-
fclose($clientSocket);
202-
unset($this->clientSockets[$key]);
279+
$written = 0;
280+
$len = strlen($data);
281+
while ($written < $len) {
282+
$bytes = @fwrite($socket, substr($data, $written));
283+
if ($bytes === false || $bytes === 0) {
284+
throw new Exception('Socket write failed');
285+
}
286+
$written += $bytes;
287+
}
288+
}
289+
290+
private function closeClient(int $id): void
291+
{
292+
if (isset($this->clients[$id])) {
293+
fclose($this->clients[$id]['socket']);
294+
unset($this->clients[$id]);
295+
}
203296
}
204297

205-
public function afterRun(Closure $param): void
298+
public function afterRun(Closure $callback): void
206299
{
207-
$this->afterCallback = $param;
300+
$this->afterCallback = $callback;
208301
}
209302

210-
public function ticker(Closure $param): void
303+
public function ticker(Closure $callback): void
211304
{
212-
$this->tickerCallback = $param;
305+
$this->tickerCallback = $callback;
213306
}
214307

215-
public function router(Closure $param)
308+
public function router(Closure $callback): void
216309
{
217-
$this->routerCallback = $param;
310+
$this->routerCallback = $callback;
218311
}
219312

220313
public function stop(): void

0 commit comments

Comments
 (0)