Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/lib/libcore.js
Original file line number Diff line number Diff line change
Expand Up @@ -2617,7 +2617,11 @@ function wrapSyscallFunction(x, library, isWasi) {
post = handler + post;

if (pre || post) {
t = modifyJSFunction(t, (args, body) => `function (${args}) {\n${pre}${body}${post}}\n`);
if (library[x + '__async']) {
t = modifyJSFunction(t, (args, body) => `async function (${args}) {\n${pre}${body}${post}}\n`);
} else {
t = modifyJSFunction(t, (args, body) => `function (${args}) {\n${pre}${body}${post}}\n`);
}
}

library[x] = eval('(' + t + ')');
Expand Down
3 changes: 3 additions & 0 deletions src/lib/libeventloop.js
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ LibraryJSEventLoop = {
emscripten_set_main_loop__deps: ['$setMainLoop'],
emscripten_set_main_loop: (func, fps, simulateInfiniteLoop) => {
var iterFunc = {{{ makeDynCall('v', 'func') }}};
#if JSPI
iterFunc = WebAssembly.promising(iterFunc);
#endif
setMainLoop(iterFunc, fps, simulateInfiniteLoop);
},

Expand Down
282 changes: 278 additions & 4 deletions src/lib/libsockfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ addToLibrary({
$SOCKFS__postset: () => {
addAtInit('SOCKFS.root = FS.mount(SOCKFS, {}, null);');
},
$SOCKFS__deps: ['$FS'],
$SOCKFS__deps: ['$FS', '$DNS'],
$SOCKFS: {
#if expectToReceiveOnModule('websocket')
websocketArgs: {},
Expand Down Expand Up @@ -69,6 +69,8 @@ addToLibrary({
pending: [],
recv_queue: [],
#if SOCKET_WEBRTC
#elif SOCKET_WEBTRANSPORT
sock_ops: SOCKFS.webtransport_sock_ops
#else
sock_ops: SOCKFS.websocket_sock_ops
#endif
Expand Down Expand Up @@ -104,9 +106,9 @@ addToLibrary({
},
// node and stream ops are backend agnostic
stream_ops: {
poll(stream) {
poll(stream, timeout) {
var sock = stream.node.sock;
return sock.sock_ops.poll(sock);
return sock.sock_ops.poll(sock, timeout);
},
ioctl(stream, request, varargs) {
var sock = stream.node.sock;
Expand Down Expand Up @@ -138,6 +140,277 @@ addToLibrary({
return `socket[${SOCKFS.nextname.current++}]`;
},
// backend-specific stream ops
#if SOCKET_WEBRTC
#elif SOCKET_WEBTRANSPORT
webtransport_sock_ops: {
getSession(sock, addr, port) {
return sock.peers[`${addr}:${port}`];
},
initSession(sock, session, addr, port) {
const outgoing = [];

session.write = (buffer) => {
outgoing.push(buffer);
};

sock.peers[`${addr}:${port}`] = session;

(async () => {
try {
await session.ready;

const writer = session.datagrams.writable.getWriter();
const reader = session.datagrams.readable.getReader();
let first = true;

while (outgoing.length) {
writer.write(outgoing.shift());
}

session.write = (buffer) => {
writer.write(buffer);
};

while (true) {
const { done, value } = await reader.read();

if (done) {
break;
}

// handle the internal port identification message
if (first && value[0] === 0xff && value[1] === 0xff && value[2] === 0xff && value[3] === 0xff &&
value[4] === 'p' && value[5] === 'o' && value[6] === 'r' && value[7] === 't') {
// update cache key
delete sock.peers[`${addr}:${port}`];
port = parseInt(String.fromCharCode.apply(null, value.subarray(9)), 10);
sock.peers[`${addr}:${port}`] = session;
} else {
sock.recv_queue.push({ addr: addr, port: port, buffer: value });
}

if (sock.pendingPollResolve) {
sock.pendingPollResolve();
}

first = false;
}

console.log(`Session ${addr}:${port} shutdown gracefully`);
} catch (e) {
console.error(`Session ${addr}:${port} terminated due to error`, e);
} finally {
delete sock.peers[`${addr}:${port}`];
}
})();
},
newSession(sock, addr, port) {
let hostname = DNS.lookup_addr(addr);

if (!hostname) {
hostname = addr;
}

const session = new WebTransport(`https://${hostname}:${port}`);

SOCKFS.webtransport_sock_ops.initSession(sock, session, addr, port);

// send the original bound port number to the peer
if (sock.type === {{{ cDefs.SOCK_DGRAM }}} && typeof sock.sport != 'undefined') {
const msg = Uint8Array.from(`\xff\xff\xff\xffport ${sock.sport}\x00`, x => x.charCodeAt(0));
session.write(msg);
}

return session;
},
acceptSession(sock, session) {
#if ENVIRONMENT_MAY_BE_NODE
const split = session.peerAddress.split(':');

const addr = split[0];
const port = parseInt(split[1], 10);

SOCKFS.webtransport_sock_ops.initSession(sock, session, addr, port);
#endif
},
stopListenServer(sock) {
#if ENVIRONMENT_MAY_BE_NODE
if (!ENVIRONMENT_IS_NODE) {
return;
}

if (!sock.h3) {
return;
}

sock.h3.stopServer();
sock.h3 = null;
#endif
},
startListenServer(sock) {
#if ENVIRONMENT_MAY_BE_NODE
if (!ENVIRONMENT_IS_NODE) {
return;
}

SOCKFS.webtransport_sock_ops.stopListenServer(sock);

sock.h3 = new Http3Server({
host: sock.saddr,
port: sock.sport,
secret: require('crypto').randomBytes(16).toString('hex'),
cert: Module['cert'],
privKey: Module['key']
});

(async () => {
try {
sock.h3.startServer();

await sock.h3.ready;

const sessionStream = await sock.h3.sessionStream('/');
const sessionReader = sessionStream.getReader();

while (true) {
const { done, value } = await sessionReader.read();

if (done) {
break;
}

SOCKFS.webtransport_sock_ops.acceptSession(sock, value);
}
} catch (e) {
sock.error = {{{ cDefs.EHOSTUNREACH }}};
} finally {
sock.h3 = null;
}
})();
#endif
},

// actual sock ops
#if ASYNCIFY
async poll(sock, timeout)
#else
poll(sock, timeout)
#endif
{
let mask = 0;

if (sock.type === {{{ cDefs.SOCK_STREAM }}}) {
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
} else {
#if ASYNCIFY
if (!sock.recv_queue.length) {
await new Promise((resolve, reject) => {
sock.pendingPromiseResolve = resolve;
setTimeout(resolve, timeout);
}).finally(() => {
sock.pendingPromiseResolve = null;
});
}
#endif

if (sock.recv_queue.length) {
mask |= {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}};
}

/* always ready to write */
mask |= {{{ cDefs.POLLOUT }}};
}

return mask;
},
ioctl(sock, request, arg) {
switch (request) {
default:
return {{{ cDefs.EINVAL }}};
}
},
close(sock) {
for (const session of Object.values(sock.peers)) {
session.close();
}

SOCKFS.webtransport_sock_ops.stopListenServer(sock);

return 0;
},
bind(sock, addr, port) {
if (typeof sock.saddr !== 'undefined' || typeof sock.sport !== 'undefined') {
throw new FS.ErrnoError({{{ cDefs.EINVAL }}}); // already bound
}

sock.saddr = addr;
sock.sport = port;

if (sock.type === {{{ cDefs.SOCK_DGRAM }}}) {
SOCKFS.webtransport_sock_ops.startListenServer(sock);
} else {
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
}
},
connect(sock, addr, port) {
if (sock.h3) {
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
}

if (sock.type === {{{ cDefs.SOCK_STREAM }}}) {
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
} else {
sock.daddr = addr;
sock.dport = port;
}
},
listen(sock, backlog) {
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
},
sendmsg(sock, buffer, offset, length, addr, port) {
let session = null;

if (sock.type === {{{ cDefs.SOCK_STREAM }}}) {
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
} else {
if (addr === undefined || port === undefined) {
addr = sock.daddr;
port = sock.dport;
}

session = SOCKFS.webtransport_sock_ops.getSession(sock, addr, port);

if (!session) {
session = SOCKFS.webtransport_sock_ops.newSession(sock, addr, port);
}
}

if (!session) {
throw new FS.ErrnoError({{{ cDefs.EDESTADDRREQ }}});
}

// copy off the buffer because write is async
buffer = buffer.slice(offset, offset + length);

session.write(buffer);

return length;
},
recvmsg(sock, length) {
if (sock.type === {{{ cDefs.SOCK_STREAM }}}) {
throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}});
}

const msg = sock.recv_queue.shift();

if (!msg) {
throw new FS.ErrnoError({{{ cDefs.EAGAIN }}});
}

return msg;
},
},
#else
websocket_sock_ops: {
//
// peers are a small wrapper around a WebSocket to help in
Expand Down Expand Up @@ -375,7 +648,7 @@ addToLibrary({
//
// actual sock ops
//
poll(sock) {
poll(sock, timeout) {
if (sock.type === {{{ cDefs.SOCK_STREAM }}} && sock.server) {
// listen sockets should only say they're available for reading
// if there are pending clients.
Expand Down Expand Up @@ -728,6 +1001,7 @@ addToLibrary({
return res;
}
}
#endif
},

/*
Expand Down
19 changes: 17 additions & 2 deletions src/lib/libsyscall.js
Original file line number Diff line number Diff line change
Expand Up @@ -484,11 +484,11 @@ var SyscallsLibrary = {
var sock = getSocketFromFD(fd);
if (!addr) {
// send, no address provided
return FS.write(sock.stream, HEAP8, message, length);
return FS.write(sock.stream, HEAPU8, message, length);
}
var dest = getSocketAddress(addr, addr_len);
// sendto an address
return sock.sock_ops.sendmsg(sock, HEAP8, message, length, dest.addr, dest.port);
return sock.sock_ops.sendmsg(sock, HEAPU8, message, length, dest.addr, dest.port);
},
__syscall_getsockopt__deps: ['$getSocketFromFD'],
__syscall_getsockopt: (fd, level, optname, optval, optlen, d1) => {
Expand Down Expand Up @@ -606,7 +606,12 @@ var SyscallsLibrary = {
'_emscripten_proxy_newselect',
#endif
],
#if ASYNCIFY
__syscall__newselect__async: true,
__syscall__newselect: async (nfds, readfds, writefds, exceptfds, timeoutInMillis) => {
#else
__syscall__newselect: (nfds, readfds, writefds, exceptfds, timeoutInMillis) => {
#endif
#if PTHREADS
if (ENVIRONMENT_IS_PTHREAD) {
return __emscripten_proxy_newselect(nfds,
Expand All @@ -631,7 +636,12 @@ var SyscallsLibrary = {
'_emscripten_proxy_newselect_finish',
#endif
],
#if ASYNCIFY
_newselect_js__async: true,
_newselect_js: async (ctx, arg, nfds, readfds, writefds, exceptfds, timeoutInMillis) => {
#else
_newselect_js: (ctx, arg, nfds, readfds, writefds, exceptfds, timeoutInMillis) => {
#endif
// readfds are supported,
// writefds checks socket open status
// exceptfds are supported, although on web, such exceptional conditions never arise in web sockets
Expand Down Expand Up @@ -701,6 +711,11 @@ var SyscallsLibrary = {
#endif
return stream.stream_ops.poll(stream, timeoutInMillis);
})();

#if ASYNCIFY
/* poll is possibly a promise */
flags = await flags;
#endif
} else {
#if ASSERTIONS
if (timeoutInMillis != 0) warnOnce('non-zero select() timeout not supported: ' + timeoutInMillis)
Expand Down
Loading