diff --git a/src/lib/libcore.js b/src/lib/libcore.js index d774ed139d044..f5f5f510e9d5f 100644 --- a/src/lib/libcore.js +++ b/src/lib/libcore.js @@ -2617,7 +2617,11 @@ function wrapSyscallFunction(x, library, isWasi) { post = handler + post; if (pre || post) { - t = modifyJSFunction(t, (args, body) => `function (${args}) {\n${pre}${body}${post}}\n`); + if (library[x + '__async']) { + t = modifyJSFunction(t, (args, body) => `async function (${args}) {\n${pre}${body}${post}}\n`); + } else { + t = modifyJSFunction(t, (args, body) => `function (${args}) {\n${pre}${body}${post}}\n`); + } } library[x] = eval('(' + t + ')'); diff --git a/src/lib/libeventloop.js b/src/lib/libeventloop.js index a92f1415911d8..5565ab662e673 100644 --- a/src/lib/libeventloop.js +++ b/src/lib/libeventloop.js @@ -378,6 +378,9 @@ LibraryJSEventLoop = { emscripten_set_main_loop__deps: ['$setMainLoop'], emscripten_set_main_loop: (func, fps, simulateInfiniteLoop) => { var iterFunc = {{{ makeDynCall('v', 'func') }}}; +#if JSPI + iterFunc = WebAssembly.promising(iterFunc); +#endif setMainLoop(iterFunc, fps, simulateInfiniteLoop); }, diff --git a/src/lib/libsockfs.js b/src/lib/libsockfs.js index 493d2e749341f..99a9410128bd5 100644 --- a/src/lib/libsockfs.js +++ b/src/lib/libsockfs.js @@ -8,7 +8,7 @@ addToLibrary({ $SOCKFS__postset: () => { addAtInit('SOCKFS.root = FS.mount(SOCKFS, {}, null);'); }, - $SOCKFS__deps: ['$FS'], + $SOCKFS__deps: ['$FS', '$DNS'], $SOCKFS: { #if expectToReceiveOnModule('websocket') websocketArgs: {}, @@ -69,6 +69,8 @@ addToLibrary({ pending: [], recv_queue: [], #if SOCKET_WEBRTC +#elif SOCKET_WEBTRANSPORT + sock_ops: SOCKFS.webtransport_sock_ops #else sock_ops: SOCKFS.websocket_sock_ops #endif @@ -104,9 +106,9 @@ addToLibrary({ }, // node and stream ops are backend agnostic stream_ops: { - poll(stream) { + poll(stream, timeout) { var sock = stream.node.sock; - return sock.sock_ops.poll(sock); + return sock.sock_ops.poll(sock, timeout); }, ioctl(stream, request, varargs) { var sock = stream.node.sock; @@ -138,6 +140,288 @@ addToLibrary({ return `socket[${SOCKFS.nextname.current++}]`; }, // backend-specific stream ops +#if SOCKET_WEBRTC +#elif SOCKET_WEBTRANSPORT + webtransport_sock_ops: { + getSession(sock, addr, port) { + return sock.peers[`${addr}:${port}`]; + }, + initSession(sock, session, addr, port) { + sock.peers[`${addr}:${port}`] = session; + + /* buffer writes before session is ready */ + const outgoing = []; + + session.write = (buffer) => { + outgoing.push(buffer); + }; + + /* prevent unhandled rejections before main loop */ + session.ready.catch(() => {}); + session.closed.catch(() => {}); + + (async () => { + try { + const sessionClosed = session.closed.then( + () => ({ done: true, value: null }), + (err) => { throw err; } + ); + + await session.ready; + + const writer = session.datagrams.writable.getWriter(); + const reader = session.datagrams.readable.getReader(); + let first = true; + + while (outgoing.length) { + writer.write(outgoing.shift()); + } + + session.write = (buffer) => { + writer.write(buffer).catch(e => console.warn(`Write failed for ${addr}:${port}`, e)); + }; + + while (true) { + const { done, value } = await Promise.race([ + reader.read(), + sessionClosed + ]); + + if (done) { + break; + } + + // handle the internal port identification message + if (first && value[0] === 0xff && value[1] === 0xff && value[2] === 0xff && value[3] === 0xff && + value[4] === 'p' && value[5] === 'o' && value[6] === 'r' && value[7] === 't') { + // update cache key + delete sock.peers[`${addr}:${port}`]; + port = parseInt(String.fromCharCode.apply(null, value.subarray(9)), 10); + sock.peers[`${addr}:${port}`] = session; + } else { + sock.recv_queue.push({ addr: addr, port: port, buffer: value }); + } + + if (sock.pendingPollResolve) { + sock.pendingPollResolve(); + } + + first = false; + } + } catch (e) { + console.error(`Session ${addr}:${port} terminated`, e); + } finally { + delete sock.peers[`${addr}:${port}`]; + } + })(); + }, + newSession(sock, addr, port) { + let hostname = DNS.lookup_addr(addr); + + if (!hostname) { + hostname = addr; + } + + const session = new WebTransport(`https://${hostname}:${port}`); + + SOCKFS.webtransport_sock_ops.initSession(sock, session, addr, port); + + // send the original bound port number to the peer + if (sock.type === {{{ cDefs.SOCK_DGRAM }}} && typeof sock.sport != 'undefined') { + const msg = Uint8Array.from(`\xff\xff\xff\xffport ${sock.sport}\x00`, x => x.charCodeAt(0)); + session.write(msg); + } + + return session; + }, + acceptSession(sock, session) { +#if ENVIRONMENT_MAY_BE_NODE + const split = session.peerAddress.split(':'); + + const addr = split[0]; + const port = parseInt(split[1], 10); + + SOCKFS.webtransport_sock_ops.initSession(sock, session, addr, port); +#endif + }, + stopListenServer(sock) { +#if ENVIRONMENT_MAY_BE_NODE + if (!ENVIRONMENT_IS_NODE) { + return; + } + + if (!sock.h3) { + return; + } + + sock.h3.stopServer(); + sock.h3 = null; +#endif + }, + startListenServer(sock) { +#if ENVIRONMENT_MAY_BE_NODE + if (!ENVIRONMENT_IS_NODE) { + return; + } + + SOCKFS.webtransport_sock_ops.stopListenServer(sock); + + sock.h3 = new Http3Server({ + host: sock.saddr, + port: sock.sport, + secret: require('crypto').randomBytes(16).toString('hex'), + cert: Module['cert'], + privKey: Module['key'] + }); + + (async () => { + try { + sock.h3.startServer(); + + await sock.h3.ready; + + const sessionStream = await sock.h3.sessionStream('/'); + const sessionReader = sessionStream.getReader(); + + while (true) { + const { done, value } = await sessionReader.read(); + + if (done) { + break; + } + + SOCKFS.webtransport_sock_ops.acceptSession(sock, value); + } + } catch (e) { + sock.error = {{{ cDefs.EHOSTUNREACH }}}; + } finally { + sock.h3 = null; + } + })(); +#endif + }, + + // actual sock ops +#if ASYNCIFY + async poll(sock, timeout) +#else + poll(sock, timeout) +#endif + { + let mask = 0; + + if (sock.type === {{{ cDefs.SOCK_STREAM }}}) { + throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}}); + } else { +#if ASYNCIFY + if (!sock.recv_queue.length) { + await new Promise((resolve, reject) => { + sock.pendingPromiseResolve = resolve; + setTimeout(resolve, timeout); + }).finally(() => { + sock.pendingPromiseResolve = null; + }); + } +#endif + + if (sock.recv_queue.length) { + mask |= {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}; + } + + /* always ready to write */ + mask |= {{{ cDefs.POLLOUT }}}; + } + + return mask; + }, + ioctl(sock, request, arg) { + switch (request) { + default: + return {{{ cDefs.EINVAL }}}; + } + }, + close(sock) { + for (const session of Object.values(sock.peers)) { + session.close(); + } + + SOCKFS.webtransport_sock_ops.stopListenServer(sock); + + return 0; + }, + bind(sock, addr, port) { + if (typeof sock.saddr !== 'undefined' || typeof sock.sport !== 'undefined') { + throw new FS.ErrnoError({{{ cDefs.EINVAL }}}); // already bound + } + + sock.saddr = addr; + sock.sport = port; + + if (sock.type === {{{ cDefs.SOCK_DGRAM }}}) { + SOCKFS.webtransport_sock_ops.startListenServer(sock); + } else { + throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}}); + } + }, + connect(sock, addr, port) { + if (sock.h3) { + throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}}); + } + + if (sock.type === {{{ cDefs.SOCK_STREAM }}}) { + throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}}); + } else { + sock.daddr = addr; + sock.dport = port; + } + }, + listen(sock, backlog) { + throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}}); + }, + sendmsg(sock, buffer, offset, length, addr, port) { + let session = null; + + if (sock.type === {{{ cDefs.SOCK_STREAM }}}) { + throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}}); + } else { + if (addr === undefined || port === undefined) { + addr = sock.daddr; + port = sock.dport; + } + + session = SOCKFS.webtransport_sock_ops.getSession(sock, addr, port); + + if (!session) { + session = SOCKFS.webtransport_sock_ops.newSession(sock, addr, port); + } + } + + if (!session) { + throw new FS.ErrnoError({{{ cDefs.EDESTADDRREQ }}}); + } + + // copy off the buffer because write is async + buffer = buffer.slice(offset, offset + length); + + session.write(buffer); + + return length; + }, + recvmsg(sock, length) { + if (sock.type === {{{ cDefs.SOCK_STREAM }}}) { + throw new FS.ErrnoError({{{ cDefs.EOPNOTSUPP }}}); + } + + const msg = sock.recv_queue.shift(); + + if (!msg) { + throw new FS.ErrnoError({{{ cDefs.EAGAIN }}}); + } + + return msg; + }, + }, +#else websocket_sock_ops: { // // peers are a small wrapper around a WebSocket to help in @@ -375,7 +659,7 @@ addToLibrary({ // // actual sock ops // - poll(sock) { + poll(sock, timeout) { if (sock.type === {{{ cDefs.SOCK_STREAM }}} && sock.server) { // listen sockets should only say they're available for reading // if there are pending clients. @@ -728,6 +1012,7 @@ addToLibrary({ return res; } } +#endif }, /* diff --git a/src/lib/libsyscall.js b/src/lib/libsyscall.js index 3624805944ff8..eeef7d7e4faf6 100644 --- a/src/lib/libsyscall.js +++ b/src/lib/libsyscall.js @@ -484,11 +484,11 @@ var SyscallsLibrary = { var sock = getSocketFromFD(fd); if (!addr) { // send, no address provided - return FS.write(sock.stream, HEAP8, message, length); + return FS.write(sock.stream, HEAPU8, message, length); } var dest = getSocketAddress(addr, addr_len); // sendto an address - return sock.sock_ops.sendmsg(sock, HEAP8, message, length, dest.addr, dest.port); + return sock.sock_ops.sendmsg(sock, HEAPU8, message, length, dest.addr, dest.port); }, __syscall_getsockopt__deps: ['$getSocketFromFD'], __syscall_getsockopt: (fd, level, optname, optval, optlen, d1) => { @@ -606,7 +606,12 @@ var SyscallsLibrary = { '_emscripten_proxy_newselect', #endif ], +#if ASYNCIFY + __syscall__newselect__async: true, + __syscall__newselect: async (nfds, readfds, writefds, exceptfds, timeoutInMillis) => { +#else __syscall__newselect: (nfds, readfds, writefds, exceptfds, timeoutInMillis) => { +#endif #if PTHREADS if (ENVIRONMENT_IS_PTHREAD) { return __emscripten_proxy_newselect(nfds, @@ -631,7 +636,12 @@ var SyscallsLibrary = { '_emscripten_proxy_newselect_finish', #endif ], +#if ASYNCIFY + _newselect_js__async: true, + _newselect_js: async (ctx, arg, nfds, readfds, writefds, exceptfds, timeoutInMillis) => { +#else _newselect_js: (ctx, arg, nfds, readfds, writefds, exceptfds, timeoutInMillis) => { +#endif // readfds are supported, // writefds checks socket open status // exceptfds are supported, although on web, such exceptional conditions never arise in web sockets @@ -701,6 +711,11 @@ var SyscallsLibrary = { #endif return stream.stream_ops.poll(stream, timeoutInMillis); })(); + +#if ASYNCIFY + /* poll is possibly a promise */ + flags = await flags; +#endif } else { #if ASSERTIONS if (timeoutInMillis != 0) warnOnce('non-zero select() timeout not supported: ' + timeoutInMillis) diff --git a/src/modularize.js b/src/modularize.js index b9a8414b2ce7c..21a2560f2aa85 100644 --- a/src/modularize.js +++ b/src/modularize.js @@ -11,6 +11,12 @@ import source wasmModule from './{{{ WASM_BINARY_FILE }}}'; #endif +#if SOCKET_WEBTRANSPORT && ENVIRONMENT_MAY_BE_NODE +#if EXPORT_ES6 +import { Http3Server, WebTransport } from '@fails-components/webtransport'; +#endif +#endif + #if ENVIRONMENT_MAY_BE_WEB && !EXPORT_ES6 && !(MINIMAL_RUNTIME && !PTHREADS) // Single threaded MINIMAL_RUNTIME programs do not need access to // document.currentScript, so a simple export declaration is enough. diff --git a/src/settings.js b/src/settings.js index 1ba3b7856b8da..1292d9fd2c4ab 100644 --- a/src/settings.js +++ b/src/settings.js @@ -402,6 +402,8 @@ var FS_DEBUG = false; // [link] var SOCKET_WEBRTC = false; +var SOCKET_WEBTRANSPORT = false; + // A string containing either a WebSocket URL prefix (ws:// or wss://) or a complete // RFC 6455 URL - "ws[s]:" "//" host [ ":" port ] path [ "?" query ]. // In the (default) case of only a prefix being specified the URL will be constructed from