-
Notifications
You must be signed in to change notification settings - Fork 7
feat: Add CapTP infrastructure for kernel communication #751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
773f8d9
aa9593a
7d2c3b4
dda1724
cf74818
56d6ead
1c07cdf
8c4927e
916f94a
e9b82e0
da8a92c
fa89c36
25aad34
91d5741
bef3b9d
6e88cad
fc29d6b
aacb481
cbf22fd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,20 +1,32 @@ | ||
| import { E } from '@endo/eventual-send'; | ||
| import { | ||
| connectToKernel, | ||
| rpcMethodSpecs, | ||
| makeBackgroundCapTP, | ||
| makeCapTPNotification, | ||
| isCapTPNotification, | ||
| getCapTPMessage, | ||
| } from '@metamask/kernel-browser-runtime'; | ||
| import type { | ||
| KernelFacade, | ||
| CapTPMessage, | ||
| } from '@metamask/kernel-browser-runtime'; | ||
| import defaultSubcluster from '@metamask/kernel-browser-runtime/default-cluster'; | ||
| import { RpcClient } from '@metamask/kernel-rpc-methods'; | ||
| import { delay } from '@metamask/kernel-utils'; | ||
| import type { JsonRpcCall } from '@metamask/kernel-utils'; | ||
| import { delay, isJsonRpcMessage, stringify } from '@metamask/kernel-utils'; | ||
| import type { JsonRpcMessage } from '@metamask/kernel-utils'; | ||
| import { Logger } from '@metamask/logger'; | ||
| import { kernelMethodSpecs } from '@metamask/ocap-kernel/rpc'; | ||
| import { ChromeRuntimeDuplexStream } from '@metamask/streams/browser'; | ||
| import { isJsonRpcResponse } from '@metamask/utils'; | ||
| import type { JsonRpcResponse } from '@metamask/utils'; | ||
|
|
||
| defineGlobals(); | ||
|
|
||
| const OFFSCREEN_DOCUMENT_PATH = '/offscreen.html'; | ||
| const logger = new Logger('background'); | ||
| let bootPromise: Promise<void> | null = null; | ||
| let kernelP: Promise<KernelFacade>; | ||
| let ping: () => Promise<void>; | ||
|
|
||
| // With this we can click the extension action button to wake up the service worker. | ||
| chrome.action.onClicked.addListener(() => { | ||
| ping?.().catch(logger.error); | ||
| }); | ||
rekmarks marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Install/update | ||
| chrome.runtime.onInstalled.addListener(() => { | ||
|
|
@@ -79,85 +91,98 @@ async function main(): Promise<void> { | |
| // Without this delay, sending messages via the chrome.runtime API can fail. | ||
| await delay(50); | ||
|
|
||
| // Create stream for CapTP messages | ||
| const offscreenStream = await ChromeRuntimeDuplexStream.make< | ||
| JsonRpcResponse, | ||
| JsonRpcCall | ||
| >(chrome.runtime, 'background', 'offscreen', isJsonRpcResponse); | ||
|
|
||
| const rpcClient = new RpcClient( | ||
| kernelMethodSpecs, | ||
| async (request) => { | ||
| await offscreenStream.write(request); | ||
| JsonRpcMessage, | ||
| JsonRpcMessage | ||
| >(chrome.runtime, 'background', 'offscreen', isJsonRpcMessage); | ||
|
|
||
| // Set up CapTP for E() based communication with the kernel | ||
| const backgroundCapTP = makeBackgroundCapTP({ | ||
| send: (captpMessage: CapTPMessage) => { | ||
| const notification = makeCapTPNotification(captpMessage); | ||
| offscreenStream.write(notification).catch((error) => { | ||
| logger.error('Failed to send CapTP message:', error); | ||
| }); | ||
| }, | ||
| 'background:', | ||
| ); | ||
| }); | ||
|
|
||
| // Get the kernel remote presence | ||
| kernelP = backgroundCapTP.getKernel(); | ||
|
|
||
| const ping = async (): Promise<void> => { | ||
| const result = await rpcClient.call('ping', []); | ||
| ping = async () => { | ||
| const result = await E(kernelP).ping(); | ||
| logger.info(result); | ||
| }; | ||
|
|
||
| // globalThis.kernel will exist due to dev-console.js in background-trusted-prelude.js | ||
| Object.defineProperties(globalThis.kernel, { | ||
| ping: { | ||
| value: ping, | ||
| }, | ||
| sendMessage: { | ||
| value: async (message: JsonRpcCall) => | ||
| await offscreenStream.write(message), | ||
| }, | ||
| }); | ||
| harden(globalThis.kernel); | ||
|
|
||
| // With this we can click the extension action button to wake up the service worker. | ||
| chrome.action.onClicked.addListener(() => { | ||
| ping().catch(logger.error); | ||
| // Handle incoming CapTP messages from the kernel | ||
| const drainPromise = offscreenStream.drain((message) => { | ||
| if (isCapTPNotification(message)) { | ||
| const captpMessage = getCapTPMessage(message); | ||
| backgroundCapTP.dispatch(captpMessage); | ||
| } else { | ||
| throw new Error(`Unexpected message: ${stringify(message)}`); | ||
| } | ||
| }); | ||
|
|
||
| // Pipe responses back to the RpcClient | ||
| const drainPromise = offscreenStream.drain(async (message) => | ||
| rpcClient.handleResponse(message.id as string, message), | ||
rekmarks marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ); | ||
| drainPromise.catch(logger.error); | ||
|
|
||
| await ping(); // Wait for the kernel to be ready | ||
| await startDefaultSubcluster(); | ||
| await startDefaultSubcluster(kernelP); | ||
|
|
||
| try { | ||
| await drainPromise; | ||
| } catch (error) { | ||
| throw new Error('Offscreen connection closed unexpectedly', { | ||
| const finalError = new Error('Offscreen connection closed unexpectedly', { | ||
| cause: error, | ||
| }); | ||
| backgroundCapTP.abort(finalError); | ||
| throw finalError; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Idempotently starts the default subcluster. | ||
| * | ||
| * @param kernelPromise - Promise for the kernel facade. | ||
| */ | ||
| async function startDefaultSubcluster(): Promise<void> { | ||
| const kernelStream = await connectToKernel({ label: 'background', logger }); | ||
| const rpcClient = new RpcClient( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing stream cleanup on initialization failure causes duplicate listenersMedium Severity When Additional Locations (1) |
||
| rpcMethodSpecs, | ||
| async (request) => { | ||
| await kernelStream.write(request); | ||
| }, | ||
| 'background', | ||
| ); | ||
rekmarks marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| async function startDefaultSubcluster( | ||
| kernelPromise: Promise<KernelFacade>, | ||
| ): Promise<void> { | ||
| const status = await E(kernelPromise).getStatus(); | ||
|
|
||
| kernelStream | ||
| .drain(async (message) => | ||
| rpcClient.handleResponse(message.id as string, message), | ||
| ) | ||
| .catch(logger.error); | ||
|
|
||
| const status = await rpcClient.call('getStatus', []); | ||
| if (status.subclusters.length === 0) { | ||
| const result = await rpcClient.call('launchSubcluster', { | ||
| config: defaultSubcluster, | ||
| }); | ||
| const result = await E(kernelPromise).launchSubcluster(defaultSubcluster); | ||
| logger.info(`Default subcluster launched: ${JSON.stringify(result)}`); | ||
| } else { | ||
| logger.info('Subclusters already exist. Not launching default subcluster.'); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Define globals accessible via the background console. | ||
| */ | ||
| function defineGlobals(): void { | ||
| Object.defineProperty(globalThis, 'kernel', { | ||
| configurable: false, | ||
| enumerable: true, | ||
| writable: false, | ||
| value: {}, | ||
| }); | ||
|
|
||
| Object.defineProperties(globalThis.kernel, { | ||
| ping: { | ||
| get: () => ping, | ||
| }, | ||
| getKernel: { | ||
| value: async () => kernelP, | ||
| }, | ||
| }); | ||
| harden(globalThis.kernel); | ||
|
|
||
| Object.defineProperty(globalThis, 'E', { | ||
| value: E, | ||
| configurable: false, | ||
| enumerable: true, | ||
| writable: false, | ||
| }); | ||
| } | ||
This file was deleted.
This file was deleted.
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| import type { KernelFacade } from '@metamask/kernel-browser-runtime'; | ||
|
|
||
| // Type declarations for kernel dev console API. | ||
| declare global { | ||
| /** | ||
| * The E() function from @endo/eventual-send for making eventual sends. | ||
| * Set globally in the trusted prelude before lockdown. | ||
| * | ||
| * @example | ||
| * ```typescript | ||
| * const kernel = await kernel.getKernel(); | ||
| * const status = await E(kernel).getStatus(); | ||
| * ``` | ||
| */ | ||
| // eslint-disable-next-line no-var,id-length | ||
| var E: typeof import('@endo/eventual-send').E; | ||
|
|
||
| // eslint-disable-next-line no-var | ||
| var kernel: { | ||
| /** | ||
| * Ping the kernel to verify connectivity. | ||
| */ | ||
| ping: () => Promise<void>; | ||
|
|
||
| /** | ||
| * Get the kernel remote presence for use with E(). | ||
| * | ||
| * @returns A promise for the kernel facade remote presence. | ||
| * @example | ||
| * ```typescript | ||
| * const kernel = await kernel.getKernel(); | ||
| * const status = await E(kernel).getStatus(); | ||
| * ``` | ||
| */ | ||
| getKernel: () => Promise<KernelFacade>; | ||
| }; | ||
| } | ||
|
|
||
| export {}; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,8 +3,8 @@ import { | |
| PlatformServicesServer, | ||
| createRelayQueryString, | ||
| } from '@metamask/kernel-browser-runtime'; | ||
| import { delay, isJsonRpcCall } from '@metamask/kernel-utils'; | ||
| import type { JsonRpcCall } from '@metamask/kernel-utils'; | ||
| import { delay, isJsonRpcMessage } from '@metamask/kernel-utils'; | ||
| import type { JsonRpcMessage } from '@metamask/kernel-utils'; | ||
|
Comment on lines
+6
to
+7
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we are swapping out JSON-RPC for CapTP as our IPC layer, what's going on here? The changes to this file substitute
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, we get rid of the RPC Client / Server stuff and the kernel's command stream, but the captp wire messages have a JSON-RPC wrapper. This buys us a little bit of flexibility in case we ever want to pass other kinds of data over these streams. I think I can just remove the wrapper without side effects if you feel strongly about it.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I may actually have found another message type we want to pass over this stream so I think it's best to leave it. |
||
| import { Logger } from '@metamask/logger'; | ||
| import type { DuplexStream } from '@metamask/streams'; | ||
| import { | ||
|
|
@@ -13,8 +13,6 @@ import { | |
| MessagePortDuplexStream, | ||
| } from '@metamask/streams/browser'; | ||
| import type { PostMessageTarget } from '@metamask/streams/browser'; | ||
| import type { JsonRpcResponse } from '@metamask/utils'; | ||
| import { isJsonRpcResponse } from '@metamask/utils'; | ||
|
|
||
| const logger = new Logger('offscreen'); | ||
|
|
||
|
|
@@ -27,11 +25,11 @@ async function main(): Promise<void> { | |
| // Without this delay, sending messages via the chrome.runtime API can fail. | ||
| await delay(50); | ||
|
|
||
| // Create stream for messages from the background script | ||
| // Create stream for CapTP messages from the background script | ||
| const backgroundStream = await ChromeRuntimeDuplexStream.make< | ||
| JsonRpcCall, | ||
| JsonRpcResponse | ||
| >(chrome.runtime, 'offscreen', 'background', isJsonRpcCall); | ||
| JsonRpcMessage, | ||
| JsonRpcMessage | ||
| >(chrome.runtime, 'offscreen', 'background', isJsonRpcMessage); | ||
|
|
||
| const kernelStream = await makeKernelWorker(); | ||
|
|
||
|
|
@@ -48,7 +46,7 @@ async function main(): Promise<void> { | |
| * @returns The message port stream for worker communication | ||
| */ | ||
| async function makeKernelWorker(): Promise< | ||
| DuplexStream<JsonRpcResponse, JsonRpcCall> | ||
| DuplexStream<JsonRpcMessage, JsonRpcMessage> | ||
| > { | ||
| // Assign local relay address generated from `yarn ocap relay` | ||
| const relayQueryString = createRelayQueryString([ | ||
|
|
@@ -72,9 +70,9 @@ async function makeKernelWorker(): Promise< | |
| ); | ||
|
|
||
| const kernelStream = await MessagePortDuplexStream.make< | ||
| JsonRpcResponse, | ||
| JsonRpcCall | ||
| >(port, isJsonRpcResponse); | ||
| JsonRpcMessage, | ||
| JsonRpcMessage | ||
| >(port, isJsonRpcMessage); | ||
|
|
||
| await PlatformServicesServer.make(worker as PostMessageTarget, (vatId) => | ||
| makeIframeVatWorker({ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general I hate that we have to build before anything except the e2e tests but we already live in sin so this is fine for now. I'm of a mind to start caching builds in CI after this PR.