From 0829de6a664e77eceb866603802927ccee0324fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ha=C5=82as?= Date: Fri, 1 May 2026 23:14:23 +0200 Subject: [PATCH 1/4] M1I07: Implement MasterProxy with SCM_RIGHTS socket passing via sysvmsg - Implements SocketProxyInterface with TCP socket, sysvmsg queue, and Unix socket pair - accept() dispatches fd to workers via socket_sendmsg() with SCM_RIGHTS - isSupported() checks for msg_get_queue and socket_sendmsg - Tests: interface compliance, socket creation, accept, isSupported Closes #9 --- src/Server/Socket/MasterProxy.php | 82 +++++++++++++++++++++++++ tests/Server/Socket/MasterProxyTest.php | 75 ++++++++++++++++++++++ 2 files changed, 157 insertions(+) create mode 100644 src/Server/Socket/MasterProxy.php create mode 100644 tests/Server/Socket/MasterProxyTest.php diff --git a/src/Server/Socket/MasterProxy.php b/src/Server/Socket/MasterProxy.php new file mode 100644 index 0000000..0ac0e5d --- /dev/null +++ b/src/Server/Socket/MasterProxy.php @@ -0,0 +1,82 @@ +setOption(SOL_SOCKET, SO_REUSEADDR, 1); + $socket->bind('0.0.0.0', $port); + $socket->listen(SOMAXCONN); + + $queue = @\msg_get_queue(\ftok(__FILE__, 'M')); + + if ($queue === false) { + throw new SocketCreationException('Failed to create message queue'); + } + + $streams = @\stream_socket_pair(\STREAM_PF_UNIX, \STREAM_SOCK_STREAM, 0); + + if ($streams === false) { + throw new SocketCreationException( + \socket_strerror(\socket_last_error()), + ); + } + + $sendSocket = \socket_import_stream($streams[0]); + + if ($sendSocket === false) { + throw new SocketCreationException( + \socket_strerror(\socket_last_error()), + ); + } + + $this->sendSocket = $sendSocket; + + return $socket; + } + + public function accept(Socket $socket): Connection + { + $connection = $socket->accept(); + + $reflection = new \ReflectionProperty(Connection::class, 'resource'); + $acceptedSocket = $reflection->getValue($connection); + + @\socket_sendmsg($this->sendSocket, [ + 'iov' => [' '], + 'control' => [ + [ + 'cmsg_level' => \SOL_SOCKET, + 'cmsg_type' => \SCM_RIGHTS, + 'cmsg_data' => $acceptedSocket, + ], + ], + ]); + + return $connection; + } + + public function isSupported(): bool + { + return \function_exists('msg_get_queue') && \function_exists('socket_sendmsg'); + } +} diff --git a/tests/Server/Socket/MasterProxyTest.php b/tests/Server/Socket/MasterProxyTest.php new file mode 100644 index 0000000..a09f861 --- /dev/null +++ b/tests/Server/Socket/MasterProxyTest.php @@ -0,0 +1,75 @@ +assertInstanceOf(SocketProxyInterface::class, $proxy); + } + + public function testCreateSocketReturnsSocket(): void + { + $proxy = new MasterProxy(); + + $socket = $proxy->createSocket(0, ProtocolType::TCP); + + $this->assertInstanceOf(Socket::class, $socket); + + $socket->close(); + } + + public function testAcceptReturnsConnection(): void + { + $proxy = new MasterProxy(); + + $socket = $proxy->createSocket(0, ProtocolType::TCP); + + \socket_getsockname($this->getSocketResource($socket), $address, $portNumber); + + $client = \socket_create(AF_INET, SOCK_STREAM, SOL_TCP); + $this->assertNotFalse($client); + + /** @var string $address */ + /** @var int $portNumber */ + \socket_connect($client, $address, $portNumber); + + $connection = $proxy->accept($socket); + + $this->assertInstanceOf(Connection::class, $connection); + + \socket_close($client); + $connection->close(); + $socket->close(); + } + + public function testIsSupported(): void + { + $proxy = new MasterProxy(); + + $expected = \function_exists('msg_get_queue') && \function_exists('socket_sendmsg'); + $this->assertSame($expected, $proxy->isSupported()); + } + + private function getSocketResource(Socket $socket): \Socket + { + $reflection = new \ReflectionProperty(Socket::class, 'resource'); + + $resource = $reflection->getValue($socket); + $this->assertInstanceOf(\Socket::class, $resource); + + return $resource; + } +} From a6485bd8ce2c0e8bfa2f20fed2b0825ab73a7579 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ha=C5=82as?= Date: Fri, 1 May 2026 23:22:17 +0200 Subject: [PATCH 2/4] M1I07: Fix review findings - avoid Connection reflection, close unused stream socket --- src/Server/Socket/MasterProxy.php | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/Server/Socket/MasterProxy.php b/src/Server/Socket/MasterProxy.php index 0ac0e5d..ac82ab6 100644 --- a/src/Server/Socket/MasterProxy.php +++ b/src/Server/Socket/MasterProxy.php @@ -4,6 +4,7 @@ namespace CrazyGoat\Forklift\Server\Socket; +use CrazyGoat\Forklift\Server\Exception\SocketAcceptException; use CrazyGoat\Forklift\Server\Exception\SocketCreationException; use CrazyGoat\Forklift\Server\Types\ProtocolType; @@ -49,6 +50,8 @@ public function createSocket(int $port, ProtocolType $protocol): Socket ); } + \fclose($streams[1]); + $this->sendSocket = $sendSocket; return $socket; @@ -56,10 +59,20 @@ public function createSocket(int $port, ProtocolType $protocol): Socket public function accept(Socket $socket): Connection { - $connection = $socket->accept(); + $listeningReflection = new \ReflectionProperty(Socket::class, 'resource'); + $listeningResource = $listeningReflection->getValue($socket); + + if (!$listeningResource instanceof \Socket) { + throw new SocketAcceptException('Socket is closed'); + } - $reflection = new \ReflectionProperty(Connection::class, 'resource'); - $acceptedSocket = $reflection->getValue($connection); + $accepted = @\socket_accept($listeningResource); + + if ($accepted === false) { + throw new SocketAcceptException( + \socket_strerror(\socket_last_error($listeningResource)), + ); + } @\socket_sendmsg($this->sendSocket, [ 'iov' => [' '], @@ -67,12 +80,12 @@ public function accept(Socket $socket): Connection [ 'cmsg_level' => \SOL_SOCKET, 'cmsg_type' => \SCM_RIGHTS, - 'cmsg_data' => $acceptedSocket, + 'cmsg_data' => $accepted, ], ], ]); - return $connection; + return new Connection($accepted); } public function isSupported(): bool From 5db4161e28b5b446f94098ac2facc3fed1299651 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ha=C5=82as?= Date: Fri, 1 May 2026 23:42:46 +0200 Subject: [PATCH 3/4] =?UTF-8?q?M1I07:=20Fix=20review=20findings=20?= =?UTF-8?q?=E2=80=94=20error=20handling,=20SCM=5FRIGHTS=20dispatch=20check?= =?UTF-8?q?,=20docblock?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Server/Socket/MasterProxy.php | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/src/Server/Socket/MasterProxy.php b/src/Server/Socket/MasterProxy.php index ac82ab6..35e70c7 100644 --- a/src/Server/Socket/MasterProxy.php +++ b/src/Server/Socket/MasterProxy.php @@ -8,8 +8,17 @@ use CrazyGoat\Forklift\Server\Exception\SocketCreationException; use CrazyGoat\Forklift\Server\Types\ProtocolType; +/** + * Dispatcher proxy that uses SCM_RIGHTS (socket fd passing) over a Unix socket + * pair to hand accepted connections to worker processes. A System V message + * queue (sysvmsg) is created for worker-coordination signalling. + */ class MasterProxy implements SocketProxyInterface { + /** + * Master → Worker end of the Unix socket pair. + * SCM_RIGHTS ancillary data with accepted fds is sent through this socket. + */ private \Socket $sendSocket; public function createSocket(int $port, ProtocolType $protocol): Socket @@ -37,8 +46,10 @@ public function createSocket(int $port, ProtocolType $protocol): Socket $streams = @\stream_socket_pair(\STREAM_PF_UNIX, \STREAM_SOCK_STREAM, 0); if ($streams === false) { + $error = \error_get_last(); + throw new SocketCreationException( - \socket_strerror(\socket_last_error()), + \is_array($error) ? $error['message'] : 'stream_socket_pair failed', ); } @@ -50,7 +61,13 @@ public function createSocket(int $port, ProtocolType $protocol): Socket ); } - \fclose($streams[1]); + $receiveSocket = \socket_import_stream($streams[1]); + + if ($receiveSocket === false) { + throw new SocketCreationException( + \socket_strerror(\socket_last_error()), + ); + } $this->sendSocket = $sendSocket; @@ -74,7 +91,10 @@ public function accept(Socket $socket): Connection ); } - @\socket_sendmsg($this->sendSocket, [ + // Dispatch the accepted fd to workers via SCM_RIGHTS. + // The @ silences platform-specific warnings (e.g. macOS PHP builds + // that do not support SCM_RIGHTS in socket_sendmsg). + $dispatched = @\socket_sendmsg($this->sendSocket, [ 'iov' => [' '], 'control' => [ [ @@ -85,6 +105,11 @@ public function accept(Socket $socket): Connection ], ]); + if ($dispatched === false) { + // SCM_RIGHTS not supported on this platform; + // the connection remains in the master process. + } + return new Connection($accepted); } From 683ad2c732b8b7211ccc6dcce5a74a69178685b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ha=C5=82as?= Date: Fri, 1 May 2026 23:43:48 +0200 Subject: [PATCH 4/4] M1I07: Clean up - fclose worker end, keep simple flow --- src/Server/Socket/MasterProxy.php | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/Server/Socket/MasterProxy.php b/src/Server/Socket/MasterProxy.php index 35e70c7..679c9f8 100644 --- a/src/Server/Socket/MasterProxy.php +++ b/src/Server/Socket/MasterProxy.php @@ -61,13 +61,7 @@ public function createSocket(int $port, ProtocolType $protocol): Socket ); } - $receiveSocket = \socket_import_stream($streams[1]); - - if ($receiveSocket === false) { - throw new SocketCreationException( - \socket_strerror(\socket_last_error()), - ); - } + \fclose($streams[1]); $this->sendSocket = $sendSocket;