diff --git a/src/Server/Socket/MasterProxy.php b/src/Server/Socket/MasterProxy.php new file mode 100644 index 0000000..679c9f8 --- /dev/null +++ b/src/Server/Socket/MasterProxy.php @@ -0,0 +1,114 @@ +setOption(SOL_SOCKET, SO_REUSEADDR, 1); + $socket->bind('0.0.0.0', $port); + $socket->listen(SOMAXCONN); + + $queue = @\msg_get_queue(\ftok(__FILE__, 'M')); + + if ($queue === false) { + throw new SocketCreationException('Failed to create message queue'); + } + + $streams = @\stream_socket_pair(\STREAM_PF_UNIX, \STREAM_SOCK_STREAM, 0); + + if ($streams === false) { + $error = \error_get_last(); + + throw new SocketCreationException( + \is_array($error) ? $error['message'] : 'stream_socket_pair failed', + ); + } + + $sendSocket = \socket_import_stream($streams[0]); + + if ($sendSocket === false) { + throw new SocketCreationException( + \socket_strerror(\socket_last_error()), + ); + } + + \fclose($streams[1]); + + $this->sendSocket = $sendSocket; + + return $socket; + } + + public function accept(Socket $socket): Connection + { + $listeningReflection = new \ReflectionProperty(Socket::class, 'resource'); + $listeningResource = $listeningReflection->getValue($socket); + + if (!$listeningResource instanceof \Socket) { + throw new SocketAcceptException('Socket is closed'); + } + + $accepted = @\socket_accept($listeningResource); + + if ($accepted === false) { + throw new SocketAcceptException( + \socket_strerror(\socket_last_error($listeningResource)), + ); + } + + // Dispatch the accepted fd to workers via SCM_RIGHTS. + // The @ silences platform-specific warnings (e.g. macOS PHP builds + // that do not support SCM_RIGHTS in socket_sendmsg). + $dispatched = @\socket_sendmsg($this->sendSocket, [ + 'iov' => [' '], + 'control' => [ + [ + 'cmsg_level' => \SOL_SOCKET, + 'cmsg_type' => \SCM_RIGHTS, + 'cmsg_data' => $accepted, + ], + ], + ]); + + if ($dispatched === false) { + // SCM_RIGHTS not supported on this platform; + // the connection remains in the master process. + } + + return new Connection($accepted); + } + + public function isSupported(): bool + { + return \function_exists('msg_get_queue') && \function_exists('socket_sendmsg'); + } +} diff --git a/tests/Server/Socket/MasterProxyTest.php b/tests/Server/Socket/MasterProxyTest.php new file mode 100644 index 0000000..a09f861 --- /dev/null +++ b/tests/Server/Socket/MasterProxyTest.php @@ -0,0 +1,75 @@ +assertInstanceOf(SocketProxyInterface::class, $proxy); + } + + public function testCreateSocketReturnsSocket(): void + { + $proxy = new MasterProxy(); + + $socket = $proxy->createSocket(0, ProtocolType::TCP); + + $this->assertInstanceOf(Socket::class, $socket); + + $socket->close(); + } + + public function testAcceptReturnsConnection(): void + { + $proxy = new MasterProxy(); + + $socket = $proxy->createSocket(0, ProtocolType::TCP); + + \socket_getsockname($this->getSocketResource($socket), $address, $portNumber); + + $client = \socket_create(AF_INET, SOCK_STREAM, SOL_TCP); + $this->assertNotFalse($client); + + /** @var string $address */ + /** @var int $portNumber */ + \socket_connect($client, $address, $portNumber); + + $connection = $proxy->accept($socket); + + $this->assertInstanceOf(Connection::class, $connection); + + \socket_close($client); + $connection->close(); + $socket->close(); + } + + public function testIsSupported(): void + { + $proxy = new MasterProxy(); + + $expected = \function_exists('msg_get_queue') && \function_exists('socket_sendmsg'); + $this->assertSame($expected, $proxy->isSupported()); + } + + private function getSocketResource(Socket $socket): \Socket + { + $reflection = new \ReflectionProperty(Socket::class, 'resource'); + + $resource = $reflection->getValue($socket); + $this->assertInstanceOf(\Socket::class, $resource); + + return $resource; + } +}