Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions src/Server/Socket/MasterProxy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
<?php

declare(strict_types=1);

namespace CrazyGoat\Forklift\Server\Socket;

use CrazyGoat\Forklift\Server\Exception\SocketAcceptException;
use CrazyGoat\Forklift\Server\Exception\SocketCreationException;
use CrazyGoat\Forklift\Server\Types\ProtocolType;

/**
* Dispatcher proxy that uses SCM_RIGHTS (socket fd passing) over a Unix socket
* pair to hand accepted connections to worker processes. A System V message
* queue (sysvmsg) is created for worker-coordination signalling.
*/
class MasterProxy implements SocketProxyInterface
{
/**
* Master → Worker end of the Unix socket pair.
* SCM_RIGHTS ancillary data with accepted fds is sent through this socket.
*/
private \Socket $sendSocket;
Comment thread
s2x marked this conversation as resolved.

public function createSocket(int $port, ProtocolType $protocol): Socket
{
$resource = @\socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
Comment thread
s2x marked this conversation as resolved.

if ($resource === false) {
throw new SocketCreationException(
\socket_strerror(\socket_last_error()),
);
}

$socket = new Socket($resource);

$socket->setOption(SOL_SOCKET, SO_REUSEADDR, 1);
$socket->bind('0.0.0.0', $port);
$socket->listen(SOMAXCONN);

$queue = @\msg_get_queue(\ftok(__FILE__, 'M'));
Comment thread
s2x marked this conversation as resolved.

if ($queue === false) {
throw new SocketCreationException('Failed to create message queue');
}

Comment thread
s2x marked this conversation as resolved.
Comment thread
s2x marked this conversation as resolved.
$streams = @\stream_socket_pair(\STREAM_PF_UNIX, \STREAM_SOCK_STREAM, 0);

Comment thread
s2x marked this conversation as resolved.
if ($streams === false) {
$error = \error_get_last();

throw new SocketCreationException(
\is_array($error) ? $error['message'] : 'stream_socket_pair failed',
);
}

$sendSocket = \socket_import_stream($streams[0]);

if ($sendSocket === false) {
throw new SocketCreationException(
\socket_strerror(\socket_last_error()),
Comment thread
s2x marked this conversation as resolved.
);
}

\fclose($streams[1]);

$this->sendSocket = $sendSocket;
Comment thread
s2x marked this conversation as resolved.

return $socket;
}

public function accept(Socket $socket): Connection
{
$listeningReflection = new \ReflectionProperty(Socket::class, 'resource');
$listeningResource = $listeningReflection->getValue($socket);
Comment thread
s2x marked this conversation as resolved.

if (!$listeningResource instanceof \Socket) {
throw new SocketAcceptException('Socket is closed');
}

Comment thread
s2x marked this conversation as resolved.
$accepted = @\socket_accept($listeningResource);

Comment thread
s2x marked this conversation as resolved.
if ($accepted === false) {
throw new SocketAcceptException(
\socket_strerror(\socket_last_error($listeningResource)),
);
}

Comment thread
s2x marked this conversation as resolved.
// Dispatch the accepted fd to workers via SCM_RIGHTS.
// The @ silences platform-specific warnings (e.g. macOS PHP builds
// that do not support SCM_RIGHTS in socket_sendmsg).
$dispatched = @\socket_sendmsg($this->sendSocket, [
'iov' => [' '],
'control' => [
[
'cmsg_level' => \SOL_SOCKET,
'cmsg_type' => \SCM_RIGHTS,
'cmsg_data' => $accepted,
],
],
]);

if ($dispatched === false) {
// SCM_RIGHTS not supported on this platform;
// the connection remains in the master process.
}

return new Connection($accepted);
}

public function isSupported(): bool
{
return \function_exists('msg_get_queue') && \function_exists('socket_sendmsg');
}
}
75 changes: 75 additions & 0 deletions tests/Server/Socket/MasterProxyTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?php

declare(strict_types=1);

namespace CrazyGoat\Forklift\Tests\Server\Socket;

use CrazyGoat\Forklift\Server\Socket\Connection;
use CrazyGoat\Forklift\Server\Socket\MasterProxy;
use CrazyGoat\Forklift\Server\Socket\Socket;
use CrazyGoat\Forklift\Server\Socket\SocketProxyInterface;
use CrazyGoat\Forklift\Server\Types\ProtocolType;
use PHPUnit\Framework\TestCase;

class MasterProxyTest extends TestCase
{
public function testImplementsInterface(): void
{
$proxy = new MasterProxy();

$this->assertInstanceOf(SocketProxyInterface::class, $proxy);
}

public function testCreateSocketReturnsSocket(): void
{
$proxy = new MasterProxy();

$socket = $proxy->createSocket(0, ProtocolType::TCP);

$this->assertInstanceOf(Socket::class, $socket);

$socket->close();
Comment thread
s2x marked this conversation as resolved.
}

public function testAcceptReturnsConnection(): void
{
$proxy = new MasterProxy();

$socket = $proxy->createSocket(0, ProtocolType::TCP);

\socket_getsockname($this->getSocketResource($socket), $address, $portNumber);

$client = \socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
$this->assertNotFalse($client);

/** @var string $address */
/** @var int $portNumber */
\socket_connect($client, $address, $portNumber);

$connection = $proxy->accept($socket);

$this->assertInstanceOf(Connection::class, $connection);

\socket_close($client);
$connection->close();
$socket->close();
}

public function testIsSupported(): void
{
$proxy = new MasterProxy();

$expected = \function_exists('msg_get_queue') && \function_exists('socket_sendmsg');
$this->assertSame($expected, $proxy->isSupported());
}

private function getSocketResource(Socket $socket): \Socket
{
$reflection = new \ReflectionProperty(Socket::class, 'resource');

$resource = $reflection->getValue($socket);
$this->assertInstanceOf(\Socket::class, $resource);

return $resource;
}
}
Loading