From 8e1105481d31e34a5adad4a96b82e01d80e9f610 Mon Sep 17 00:00:00 2001 From: zlayne Date: Tue, 5 May 2026 13:39:05 -0400 Subject: [PATCH 1/3] [ACM-16232] Add support for search-api Subscriptions Generated-by: Cursor (auto) Signed-off-by: zlayne --- backend/package-lock.json | 35 ++- backend/package.json | 4 +- backend/src/lib/server.ts | 7 +- backend/src/routes/search.ts | 210 +++++++++++++++++- backend/test/routes/searchWebSocket.test.ts | 25 +++ frontend/package-lock.json | 27 +++ frontend/package.json | 1 + .../routes/Search/search-sdk/search-client.ts | 45 +++- .../routes/Search/search-sdk/search-sdk.ts | 108 ++++++++- .../Search/search-sdk/subscription.graphql | 9 + 10 files changed, 462 insertions(+), 9 deletions(-) create mode 100644 backend/test/routes/searchWebSocket.test.ts create mode 100644 frontend/src/routes/Search/search-sdk/subscription.graphql diff --git a/backend/package-lock.json b/backend/package-lock.json index ac70924589b..cbe58727be0 100644 --- a/backend/package-lock.json +++ b/backend/package-lock.json @@ -23,7 +23,8 @@ "pino": "7.11.0", "pluralize": "8.0.0", "prom-client": "^14.2.0", - "raw-body": "2.5.3" + "raw-body": "2.5.3", + "ws": "^8.20.0" }, "devDependencies": { "@types/eslint": "9.6.1", @@ -33,6 +34,7 @@ "@types/node-fetch": "2.6.13", "@types/node-localstorage": "^1.3.3", "@types/pluralize": "^0.0.33", + "@types/ws": "^8.18.1", "@typescript-eslint/eslint-plugin": "^8.59.0", "@typescript-eslint/parser": "^8.59.0", "concurrently": "9.2.1", @@ -1658,6 +1660,16 @@ "integrity": "sha512-9aEbYZ3TbYMznPdcdr3SmIrLXwC/AKZXQeCf9Pgao5CKb8CyHuEX5jzWPTkvregvhRJHcpRO6BFoGW9ycaOkYw==", "dev": true }, + "node_modules/@types/ws": { + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz", + "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/yargs": { "version": "17.0.32", "resolved": "https://registry.npmjs.org/@types/yargs/-/yargs-17.0.32.tgz", @@ -7688,6 +7700,27 @@ "node": "^12.13.0 || ^14.15.0 || >=16.0.0" } }, + "node_modules/ws": { + "version": "8.20.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.20.0.tgz", + "integrity": "sha512-sAt8BhgNbzCtgGbt2OxmpuryO63ZoDk/sqaB/znQm94T4fCEsy/yV+7CdC1kJhOU9lboAEU7R3kquuycDoibVA==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/xml": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/xml/-/xml-1.0.1.tgz", diff --git a/backend/package.json b/backend/package.json index 63f5b377e5b..0d5723e845d 100644 --- a/backend/package.json +++ b/backend/package.json @@ -35,7 +35,8 @@ "pino": "7.11.0", "pluralize": "8.0.0", "prom-client": "^14.2.0", - "raw-body": "2.5.3" + "raw-body": "2.5.3", + "ws": "^8.20.0" }, "devDependencies": { "@types/eslint": "9.6.1", @@ -45,6 +46,7 @@ "@types/node-fetch": "2.6.13", "@types/node-localstorage": "^1.3.3", "@types/pluralize": "^0.0.33", + "@types/ws": "^8.18.1", "@typescript-eslint/eslint-plugin": "^8.59.0", "@typescript-eslint/parser": "^8.59.0", "concurrently": "9.2.1", diff --git a/backend/src/lib/server.ts b/backend/src/lib/server.ts index c25d1679a5f..3eb7b510355 100644 --- a/backend/src/lib/server.ts +++ b/backend/src/lib/server.ts @@ -6,8 +6,9 @@ import type { Http2Server, Http2ServerRequest, Http2ServerResponse } from 'node: import { constants, createSecureServer, createServer } from 'node:http2' import type { Socket } from 'node:net' import type { TLSSocket } from 'node:tls' -import { logger } from './logger' import { managedClusterProxy } from '../routes/managedClusterProxy' +import { searchWebSocket } from '../routes/search' +import { logger } from './logger' // Explicitly set ECDH curves to enable PQC (X25519MLKEM768). // The default image crypto policy (/etc/crypto-policies/config) does not include them. @@ -84,6 +85,10 @@ export function startServer(options: ServerOptions): Promise { + if (req.url.startsWith('/multicloud/proxy/search')) { + req.url = req.url.substring(11) + return searchWebSocket(req, socket, head) + } if (req.url.startsWith('/multicloud/managedclusterproxy')) { req.url = req.url.substring(11) return managedClusterProxy(req, socket, head) diff --git a/backend/src/routes/search.ts b/backend/src/routes/search.ts index 09f0c0db8c5..7b73f04dc2c 100644 --- a/backend/src/routes/search.ts +++ b/backend/src/routes/search.ts @@ -1,10 +1,14 @@ /* Copyright Contributors to the Open Cluster Management project */ +import type { IncomingMessage } from 'node:http' import type { Http2ServerRequest, Http2ServerResponse, OutgoingHttpHeaders } from 'node:http2' import { constants } from 'node:http2' -import { request } from 'node:https' +import https, { request } from 'node:https' import { pipeline } from 'node:stream' +import type { TLSSocket } from 'node:tls' +import WebSocket, { type RawData, WebSocketServer } from 'ws' import { logger } from '../lib/logger' import { notFound } from '../lib/respond' +import { getServiceCACertificate } from '../lib/serviceAccountToken' import { getAuthenticatedToken } from '../lib/token' import { getSearchRequestOptions } from '../lib/search' @@ -16,6 +20,115 @@ const proxyHeaders = [ constants.HTTP2_HEADER_CONTENT_TYPE, ] +/** Case-insensitive header lookup for HTTP/2 request pseudo-headers and normal headers. */ +function getHeaderValue(headers: Http2ServerRequest['headers'], name: string): string | undefined { + const lower = name.toLowerCase() + for (const [key, value] of Object.entries(headers)) { + if (key.toLowerCase() !== lower) continue + if (value === undefined) return undefined + return Array.isArray(value) ? value[0] : String(value) + } + return undefined +} + +/** Adds `Authorization` to a graphql-ws `connection_init` message (exported for tests). */ +export function injectSearchWsConnectionInitAuthorization(connectionInitJson: string, bearerToken: string): string { + const bearer = bearerToken.startsWith('Bearer ') ? bearerToken : `Bearer ${bearerToken}` + try { + const msg = JSON.parse(connectionInitJson) as { type?: string; payload?: Record } + if (msg.type !== 'connection_init') return connectionInitJson + const prev = + msg.payload !== null && typeof msg.payload === 'object' && !Array.isArray(msg.payload) ? msg.payload : {} + return JSON.stringify({ + ...msg, + payload: { ...prev, Authorization: bearer }, + }) + } catch { + return connectionInitJson + } +} + +/** `Sec-WebSocket-Protocol` from the client, or default `graphql-transport-ws`. */ +function subprotocolsForUpstream(req: Http2ServerRequest): string | string[] { + const raw = getHeaderValue(req.headers, 'sec-websocket-protocol') + if (!raw) return ['graphql-transport-ws'] + const list = raw + .split(',') + .map((s) => s.trim()) + .filter(Boolean) + return list.length ? list : ['graphql-transport-ws'] +} + +/** Decodes a `ws` text frame payload to UTF-8 (handles Buffer / fragment arrays). */ +function rawDataToUtf8(data: RawData): string { + if (typeof data === 'string') return data + if (Buffer.isBuffer(data)) return data.toString('utf8') + if (Array.isArray(data)) return Buffer.concat(data).toString('utf8') + return Buffer.from(data).toString('utf8') +} + +/** Bidirectional relay; rewrites the first `connection_init` from the browser to include the bearer token. */ +function relayClientToUpstream(clientWs: WebSocket, upstreamWs: WebSocket, rawToken: string): void { + let connectionInitHandled = false + + clientWs.on('message', (data, isBinary) => { + if (!connectionInitHandled && !isBinary) { + const text = rawDataToUtf8(data) + try { + const parsed = JSON.parse(text) as { type?: string } + if (parsed.type === 'connection_init') { + connectionInitHandled = true + upstreamWs.send(injectSearchWsConnectionInitAuthorization(text, rawToken)) + return + } + } catch { + // fall through + } + connectionInitHandled = true + } else if (!connectionInitHandled) { + connectionInitHandled = true + } + upstreamWs.send(data, { binary: Boolean(isBinary) }) + }) + + upstreamWs.on('message', (data, isBinary) => { + if (clientWs.readyState === WebSocket.OPEN) { + clientWs.send(data, { binary: Boolean(isBinary) }) + } + }) + + const closeBoth = () => { + try { + clientWs.close() + } catch { + /* ignore */ + } + try { + upstreamWs.close() + } catch { + /* ignore */ + } + } + + clientWs.on('close', closeBoth) + upstreamWs.on('close', () => { + try { + clientWs.close() + } catch { + /* ignore */ + } + }) + clientWs.on('error', (err) => { + logger.error({ msg: 'search websocket relay: client error', err }) + closeBoth() + }) + upstreamWs.on('error', (err) => { + logger.error({ msg: 'search websocket relay: upstream error after open', err }) + closeBoth() + }) +} + +/** Proxies search GraphQL POST to search-api (HTTPS). */ export async function search(req: Http2ServerRequest, res: Http2ServerResponse): Promise { const token = await getAuthenticatedToken(req, res) if (token) { @@ -39,3 +152,98 @@ export async function search(req: Http2ServerRequest, res: Http2ServerResponse): ) } } + +/** + * WS relay: opens `wss` to search-api with the session bearer, completes the browser upgrade, injects + * `Authorization` into graphql-ws `connection_init` (search-v2-api expects the token in that payload). + */ +export async function searchWebSocket(req: Http2ServerRequest, socket: TLSSocket, head: Buffer): Promise { + let clientUpgradeCompleted = false + + /** Sends an HTTP error on the TCP socket if the WS upgrade to the browser has not completed yet. */ + const failClientUpgrade = (statusCode: number, statusText: string) => { + if (clientUpgradeCompleted) return + try { + socket.write(`HTTP/1.1 ${statusCode} ${statusText}\r\nConnection: close\r\n\r\n`) + } catch { + /* ignore */ + } + socket.destroy() + } + + try { + const token = await getAuthenticatedToken(req, socket) + const headers: OutgoingHttpHeaders = { authorization: `Bearer ${token}` } + const options = await getSearchRequestOptions(headers) + + const upstreamHost = String(options.hostname) + const upstreamPort = options.port ? Number(options.port) : 443 + const upstreamPath = String(options.path ?? '') + const upstreamWsUrl = + upstreamPort === 443 + ? `wss://${upstreamHost}${upstreamPath}` + : `wss://${upstreamHost}:${upstreamPort}${upstreamPath}` + const bearerHeader = `Bearer ${token}` + const hostHeader = upstreamPort === 443 ? upstreamHost : `${upstreamHost}:${upstreamPort}` + + logger.info({ + msg: 'search websocket relay: opening upstream', + clientUrl: req.url, + upstreamWsUrl, + }) + + const httpsAgent = new https.Agent({ + ca: getServiceCACertificate(), + keepAlive: true, + }) + + const upstreamWs = new WebSocket(upstreamWsUrl, subprotocolsForUpstream(req), { + agent: httpsAgent, + headers: { + Authorization: bearerHeader, + Host: hostHeader, + }, + }) + + let upstreamOpen = false + + const connectTimeout = setTimeout(() => { + logger.error({ msg: 'search websocket relay: upstream connect timeout', upstreamWsUrl }) + upstreamWs.terminate() + if (!upstreamOpen) { + failClientUpgrade(504, 'Gateway Timeout') + } + }, 60_000) + + upstreamWs.on('error', (err) => { + clearTimeout(connectTimeout) + if (!upstreamOpen) { + logger.error({ msg: 'search websocket relay: upstream connect failed', err, upstreamWsUrl }) + failClientUpgrade(502, 'Bad Gateway') + } + }) + + upstreamWs.once('open', () => { + upstreamOpen = true + clearTimeout(connectTimeout) + const wss = new WebSocketServer({ noServer: true, perMessageDeflate: false }) + wss.handleUpgrade(req as unknown as IncomingMessage, socket, head, (clientWs) => { + clientUpgradeCompleted = true + relayClientToUpstream(clientWs, upstreamWs, token) + }) + }) + } catch (err: unknown) { + const message = + err instanceof Error ? err.message : typeof err === 'string' ? err : 'search websocket relay: unknown error' + logger.error({ msg: 'search websocket relay: handler error', error: message }) + if (!clientUpgradeCompleted) { + failClientUpgrade(500, 'Internal Server Error') + } else { + try { + socket.destroy() + } catch { + /* ignore */ + } + } + } +} diff --git a/backend/test/routes/searchWebSocket.test.ts b/backend/test/routes/searchWebSocket.test.ts new file mode 100644 index 00000000000..f51da734758 --- /dev/null +++ b/backend/test/routes/searchWebSocket.test.ts @@ -0,0 +1,25 @@ +/* Copyright Contributors to the Open Cluster Management project */ +import { injectSearchWsConnectionInitAuthorization } from '../../src/routes/search' + +describe('injectSearchWsConnectionInitAuthorization', () => { + it('adds Authorization to connection_init payload', () => { + const input = JSON.stringify({ type: 'connection_init', payload: { foo: 'bar' } }) + const out = injectSearchWsConnectionInitAuthorization(input, 'mytoken') + const msg = JSON.parse(out) as { type: string; payload: { foo: string; Authorization: string } } + expect(msg.type).toBe('connection_init') + expect(msg.payload.foo).toBe('bar') + expect(msg.payload.Authorization).toBe('Bearer mytoken') + }) + + it('accepts token that already includes Bearer prefix', () => { + const input = JSON.stringify({ type: 'connection_init', payload: {} }) + const out = injectSearchWsConnectionInitAuthorization(input, 'Bearer x') + const msg = JSON.parse(out) as { payload: { Authorization: string } } + expect(msg.payload.Authorization).toBe('Bearer x') + }) + + it('leaves non-connection_init messages unchanged', () => { + const input = JSON.stringify({ type: 'subscribe', id: '1', payload: {} }) + expect(injectSearchWsConnectionInitAuthorization(input, 't')).toBe(input) + }) +}) diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 212aee242e8..6ac7e659d87 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -50,6 +50,7 @@ "fuse.js": "6.6.2", "get-value": "3.0.1", "graphql": "16.13.2", + "graphql-ws": "^6.0.8", "handlebars": "4.7.9", "highlight.js": "^11.11.1", "i18next": "21.10.0", @@ -21136,6 +21137,32 @@ "graphql": "^0.9.0 || ^0.10.0 || ^0.11.0 || ^0.12.0 || ^0.13.0 || ^14.0.0 || ^15.0.0 || ^16.0.0" } }, + "node_modules/graphql-ws": { + "version": "6.0.8", + "resolved": "https://registry.npmjs.org/graphql-ws/-/graphql-ws-6.0.8.tgz", + "integrity": "sha512-m3EOaNsUBXwAnkBWbzPfe0Nq8pXUfxsWnolC54sru3FzHvhTZL0Ouf/BoQsaGAXqM+YPerXOJ47BUnmgmoupCw==", + "license": "MIT", + "engines": { + "node": ">=20" + }, + "peerDependencies": { + "@fastify/websocket": "^10 || ^11", + "crossws": "~0.3", + "graphql": "^15.10.1 || ^16", + "ws": "^8" + }, + "peerDependenciesMeta": { + "@fastify/websocket": { + "optional": true + }, + "crossws": { + "optional": true + }, + "ws": { + "optional": true + } + } + }, "node_modules/gulp-sort": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/gulp-sort/-/gulp-sort-2.0.0.tgz", diff --git a/frontend/package.json b/frontend/package.json index 76fb94c86f9..15b947cea78 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -79,6 +79,7 @@ "fuse.js": "6.6.2", "get-value": "3.0.1", "graphql": "16.13.2", + "graphql-ws": "^6.0.8", "handlebars": "4.7.9", "highlight.js": "^11.11.1", "i18next": "21.10.0", diff --git a/frontend/src/routes/Search/search-sdk/search-client.ts b/frontend/src/routes/Search/search-sdk/search-client.ts index bd1ede0ca21..b7d69ba9460 100644 --- a/frontend/src/routes/Search/search-sdk/search-client.ts +++ b/frontend/src/routes/Search/search-sdk/search-client.ts @@ -2,13 +2,43 @@ // Copyright (c) 2021 Red Hat, Inc. // Copyright Contributors to the Open Cluster Management project -import { ApolloClient, ApolloLink, from, HttpLink, InMemoryCache } from '@apollo/client' +import { ApolloClient, ApolloLink, from, HttpLink, InMemoryCache, split } from '@apollo/client' +import { GraphQLWsLink } from '@apollo/client/link/subscriptions' +import { getMainDefinition } from '@apollo/client/utilities' +import { createClient } from 'graphql-ws' import { getBackendUrl, getCookie } from '../../../resources/utils' const httpLink = new HttpLink({ uri: () => `${getBackendUrl()}/proxy/search`, }) +/** WebSocket URL for GraphQL subscriptions (graphql-ws), aligned with {@link httpLink}. */ +function getSearchWebSocketUrl(): string { + const httpEndpoint = `${getBackendUrl()}/proxy/search` + + if (httpEndpoint.startsWith('http://')) { + return httpEndpoint.replace(/^http/, 'ws') + } + if (httpEndpoint.startsWith('https://')) { + return httpEndpoint.replace(/^https/, 'wss') + } + + if (typeof window === 'undefined') { + return 'ws://localhost/proxy/search' + } + + const url = new URL(httpEndpoint, window.location.origin) + url.protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:' + return url.href +} + +const wsLink = new GraphQLWsLink( + createClient({ + url: () => getSearchWebSocketUrl(), + lazy: true, + }) +) + const csrfHeaderLink = new ApolloLink((operation, forward) => { const csrfToken = getCookie('csrf-token') if (csrfToken) { @@ -23,9 +53,20 @@ const csrfHeaderLink = new ApolloLink((operation, forward) => { return forward(operation) }) +const httpChain = from([csrfHeaderLink, httpLink]) + +const link = split( + ({ query }) => { + const definition = getMainDefinition(query) + return definition.kind === 'OperationDefinition' && definition.operation === 'subscription' + }, + wsLink, + httpChain +) + export const searchClient = new ApolloClient({ connectToDevTools: process.env.NODE_ENV === 'development', - link: from([csrfHeaderLink, httpLink]), + link, cache: new InMemoryCache(), credentials: 'same-origin', diff --git a/frontend/src/routes/Search/search-sdk/search-sdk.ts b/frontend/src/routes/Search/search-sdk/search-sdk.ts index 96a62dca787..ca51e634c75 100644 --- a/frontend/src/routes/Search/search-sdk/search-sdk.ts +++ b/frontend/src/routes/Search/search-sdk/search-sdk.ts @@ -1,6 +1,5 @@ -/* Copyright Contributors to the Open Cluster Management project */ -import * as Apollo from '@apollo/client' import { gql } from '@apollo/client' +import * as Apollo from '@apollo/client' export type Maybe = T | null export type InputMaybe = Maybe export type Exact = { [K in keyof T]: T[K] } @@ -16,9 +15,27 @@ export type Scalars = { Boolean: { input: boolean; output: boolean } Int: { input: number; output: number } Float: { input: number; output: number } + Date: { input: any; output: any } Map: { input: any; output: any } } +/** Event represents a changed resource in the search index. */ +export type Event = { + /** New data recorded on the search index. */ + newData?: Maybe + /** Previous resource data from the search index. */ + oldData?: Maybe + /** Values: INSERT, UPDATE, or DELETE */ + operation: Scalars['String']['output'] + /** + * Time the change event is registered in the search index. + * Note there's a delay from the time the resource changed in kubernetes. + */ + timestamp: Scalars['Date']['output'] + /** Kubernetes resource UID. */ + uid: Scalars['ID']['output'] +} + /** A message is used to communicate conditions detected while executing a query on the server. */ export type Message = { /** Message text. */ @@ -58,7 +75,7 @@ export type Query = { /** * Returns all fields from resources currently in the index. * Optionally, a query can be included to filter the results. - * For example, if we want to only get fields for Pod resources, we can pass a query with the filter `{property: kind, values:['Pod']}` + * For example, if we want to only get fields for Pod resources, we can pass in a query with the filter `{property: kind, values:['Pod']}` */ searchSchema?: Maybe } @@ -94,6 +111,11 @@ export type SearchFilter = { * The values available for datetime fields (Ex: `created`, `startedAt`) are `hour`, `day`, `week`, `month` and `year`. * Property `kind`, if included in the filter, will be matched using a case-insensitive comparison. * For example, `kind:Pod` and `kind:pod` will bring up all pods. This is to maintain compatibility with Search V1. + * + * Wildcard matching: the `*` character can be used as a wildcard to match any sequence of characters. + * For example, a filter with property `name` and value `nginx-*` matches any resource whose name starts with `nginx-`. + * Similarly, property `namespace` with value `prod*` matches any namespace starting with `prod`. + * Wildcard matches are case-sensitive. */ values: Array> } @@ -118,6 +140,18 @@ export type SearchInput = { * A value of -1 will remove the limit. Use carefully because it may impact the service. */ limit?: InputMaybe + /** + * Number of results to skip before returning results. + * Used in combination with limit to implement pagination. + * **Default is** 0 + */ + offset?: InputMaybe + /** + * Order results by a property and direction. + * Format: "property_name asc" or "property_name desc" + * Example: "name desc" or "created asc" + */ + orderBy?: InputMaybe /** * Filter relationships to the specified kinds. * If empty, all relationships will be included. @@ -154,6 +188,21 @@ export type SearchResult = { related?: Maybe>> } +/** Subscriptions implemented by the Search Query API. */ +export type Subscription = { + /** + * Watch changes to the data in the search index. An event is generated for each change + * matching the input filters. User's permissions (RBAC) are applied to each event resource. + * Events are generated from the search index and don't match the changes on Kubernetes. + */ + watch?: Maybe +} + +/** Subscriptions implemented by the Search Query API. */ +export type SubscriptionWatchArgs = { + input?: InputMaybe +} + export type SearchSchemaQueryVariables = Exact<{ query?: InputMaybe }> @@ -215,6 +264,20 @@ export type GetMessagesQuery = { messages?: Array<{ id: string; kind?: string | null; description?: string | null } | null> | null } +export type SearchSubscriptionSubscriptionVariables = Exact<{ + input?: InputMaybe +}> + +export type SearchSubscriptionSubscription = { + searchSubscription?: { + uid: string + operation: string + newData?: any | null + oldData?: any | null + timestamp: any + } | null +} + export const SearchSchemaDocument = gql` query searchSchema($query: SearchInput) { searchSchema(query: $query) @@ -670,3 +733,42 @@ export type GetMessagesQueryHookResult = ReturnType export type GetMessagesLazyQueryHookResult = ReturnType export type GetMessagesSuspenseQueryHookResult = ReturnType export type GetMessagesQueryResult = Apollo.QueryResult +export const SearchSubscriptionDocument = gql` + subscription searchSubscription($input: SearchInput) { + searchSubscription: watch(input: $input) { + uid + operation + newData + oldData + timestamp + } + } +` + +/** + * __useSearchSubscriptionSubscription__ + * + * To run a query within a React component, call `useSearchSubscriptionSubscription` and pass it any options that fit your needs. + * When your component renders, `useSearchSubscriptionSubscription` returns an object from Apollo Client that contains loading, error, and data properties + * you can use to render your UI. + * + * @param baseOptions options that will be passed into the subscription, supported options are listed on: https://www.apollographql.com/docs/react/api/react-hooks/#options; + * + * @example + * const { data, loading, error } = useSearchSubscriptionSubscription({ + * variables: { + * input: // value for 'input' + * }, + * }); + */ +export function useSearchSubscriptionSubscription( + baseOptions?: Apollo.SubscriptionHookOptions +) { + const options = { ...defaultOptions, ...baseOptions } + return Apollo.useSubscription( + SearchSubscriptionDocument, + options + ) +} +export type SearchSubscriptionSubscriptionHookResult = ReturnType +export type SearchSubscriptionSubscriptionResult = Apollo.SubscriptionResult diff --git a/frontend/src/routes/Search/search-sdk/subscription.graphql b/frontend/src/routes/Search/search-sdk/subscription.graphql new file mode 100644 index 00000000000..d485f8f75d1 --- /dev/null +++ b/frontend/src/routes/Search/search-sdk/subscription.graphql @@ -0,0 +1,9 @@ +subscription searchSubscription($input: SearchInput) { + searchSubscription: watch(input: $input) { + uid + operation + newData + oldData + timestamp + } +} From 56b2169d6e4684e0c04cd52751c4057dd3cd4b59 Mon Sep 17 00:00:00 2001 From: zlayne Date: Tue, 5 May 2026 13:57:26 -0400 Subject: [PATCH 2/3] add copyright Signed-off-by: zlayne --- frontend/src/routes/Search/search-sdk/search-sdk.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/frontend/src/routes/Search/search-sdk/search-sdk.ts b/frontend/src/routes/Search/search-sdk/search-sdk.ts index ca51e634c75..2b931f2d963 100644 --- a/frontend/src/routes/Search/search-sdk/search-sdk.ts +++ b/frontend/src/routes/Search/search-sdk/search-sdk.ts @@ -1,5 +1,6 @@ -import { gql } from '@apollo/client' +/* Copyright Contributors to the Open Cluster Management project */ import * as Apollo from '@apollo/client' +import { gql } from '@apollo/client' export type Maybe = T | null export type InputMaybe = Maybe export type Exact = { [K in keyof T]: T[K] } From cf631c1131c777edd72528f4c113f799189002e8 Mon Sep 17 00:00:00 2001 From: zlayne Date: Tue, 5 May 2026 15:38:27 -0400 Subject: [PATCH 3/3] Update backend tests Generated-by: Cursor (auto) Signed-off-by: zlayne --- backend/test/routes/searchWebSocket.test.ts | 405 +++++++++++++++++++- 1 file changed, 404 insertions(+), 1 deletion(-) diff --git a/backend/test/routes/searchWebSocket.test.ts b/backend/test/routes/searchWebSocket.test.ts index f51da734758..8362c1fb0bd 100644 --- a/backend/test/routes/searchWebSocket.test.ts +++ b/backend/test/routes/searchWebSocket.test.ts @@ -1,5 +1,107 @@ /* Copyright Contributors to the Open Cluster Management project */ -import { injectSearchWsConnectionInitAuthorization } from '../../src/routes/search' +import { jest } from '@jest/globals' +import EventEmitter from 'node:events' +import type { Http2ServerRequest } from 'node:http2' +import type { TLSSocket } from 'node:tls' +import { getSearchRequestOptions } from '../../src/lib/search' +import { getAuthenticatedToken } from '../../src/lib/token' +import { injectSearchWsConnectionInitAuthorization, searchWebSocket } from '../../src/routes/search' + +// var avoids temporal dead zone issues when jest.mock factories are hoisted above imports +/* eslint-disable no-var */ +var mockWsInstance: EventEmitter & { readyState: number; send: jest.Mock; close: jest.Mock; terminate: jest.Mock } +var mockWssInstance: EventEmitter & { handleUpgrade: jest.Mock } +/* eslint-enable no-var */ + +jest.mock('ws', () => { + const WsMock = jest.fn().mockImplementation(() => mockWsInstance) + ;(WsMock as unknown as Record).OPEN = 1 + return { + __esModule: true, + default: WsMock, + WebSocketServer: jest.fn().mockImplementation(() => mockWssInstance), + } +}) + +jest.mock('../../src/lib/token', () => ({ + getAuthenticatedToken: jest.fn(), +})) + +jest.mock('../../src/lib/search', () => ({ + getSearchRequestOptions: jest.fn(), +})) + +jest.mock('../../src/lib/serviceAccountToken', () => ({ + getServiceCACertificate: jest.fn(() => undefined), +})) + +jest.mock('../../src/lib/logger', () => ({ + logger: { info: jest.fn(), error: jest.fn() }, +})) + +jest.mock('node:https', () => ({ + Agent: jest.fn(() => ({})), +})) + +const mockGetAuthToken = getAuthenticatedToken as jest.MockedFunction +const mockGetSearchOpts = getSearchRequestOptions as jest.MockedFunction + +const DEFAULT_OPTIONS = { + hostname: 'search-api.ocm.svc.cluster.local', + port: 4010, + path: '/searchapi/graphql', +} + +type MockSocket = TLSSocket & { write: jest.Mock; destroy: jest.Mock } + +function makeMockSocket(): MockSocket { + return { write: jest.fn(), destroy: jest.fn() } as unknown as MockSocket +} + +function makeMockReq(headers: Record = {}) { + return { url: '/proxy/search', headers } as unknown as Http2ServerRequest +} + +function makeMockClientWs() { + const ws = Object.assign(new EventEmitter(), { + readyState: 1, + send: jest.fn(), + close: jest.fn(), + }) + return ws +} + +/** + * Runs through the happy-path setup: resolves auth + search options, awaits + * searchWebSocket, fires the upstream 'open' event (triggering handleUpgrade), + * then calls the upgrade callback with a fresh mock client WebSocket. + */ +async function triggerSuccessfulUpgrade(token = 'mytoken', opts = DEFAULT_OPTIONS) { + const socket = makeMockSocket() + const req = makeMockReq() + const head = Buffer.alloc(0) + + mockGetAuthToken.mockResolvedValue(token) + mockGetSearchOpts.mockResolvedValue(opts) + + await searchWebSocket(req, socket, head) + + // Emit upstream 'open' → handler creates WebSocketServer and calls handleUpgrade + mockWsInstance.emit('open') + + // handleUpgrade was called synchronously inside the 'open' handler; extract its callback + const upgradeCallback = mockWssInstance.handleUpgrade.mock.calls[0][3] as ( + ws: ReturnType + ) => void + const clientWs = makeMockClientWs() + upgradeCallback(clientWs) + + return { socket, req, head, clientWs, upstreamWs: mockWsInstance } +} + +// ───────────────────────────────────────────────────────────────────────────── +// injectSearchWsConnectionInitAuthorization +// ───────────────────────────────────────────────────────────────────────────── describe('injectSearchWsConnectionInitAuthorization', () => { it('adds Authorization to connection_init payload', () => { @@ -22,4 +124,305 @@ describe('injectSearchWsConnectionInitAuthorization', () => { const input = JSON.stringify({ type: 'subscribe', id: '1', payload: {} }) expect(injectSearchWsConnectionInitAuthorization(input, 't')).toBe(input) }) + + it('handles null payload by defaulting to empty object', () => { + const input = JSON.stringify({ type: 'connection_init', payload: null }) + const out = injectSearchWsConnectionInitAuthorization(input, 'tok') + const msg = JSON.parse(out) as { payload: { Authorization: string } } + expect(msg.payload.Authorization).toBe('Bearer tok') + }) + + it('handles array payload by defaulting to empty object', () => { + const input = JSON.stringify({ type: 'connection_init', payload: [1, 2, 3] }) + const out = injectSearchWsConnectionInitAuthorization(input, 'tok') + const msg = JSON.parse(out) as { payload: { Authorization: string } } + expect(msg.payload.Authorization).toBe('Bearer tok') + }) + + it('handles missing payload by defaulting to empty object', () => { + const input = JSON.stringify({ type: 'connection_init' }) + const out = injectSearchWsConnectionInitAuthorization(input, 'tok') + const msg = JSON.parse(out) as { payload: { Authorization: string } } + expect(msg.payload.Authorization).toBe('Bearer tok') + }) + + it('returns original string on invalid JSON', () => { + const input = 'not-json' + expect(injectSearchWsConnectionInitAuthorization(input, 'tok')).toBe(input) + }) +}) + +// ───────────────────────────────────────────────────────────────────────────── +// searchWebSocket +// ───────────────────────────────────────────────────────────────────────────── + +describe('searchWebSocket', () => { + beforeEach(() => { + jest.clearAllMocks() + + mockWsInstance = Object.assign(new EventEmitter(), { + readyState: 1, + send: jest.fn(), + close: jest.fn(), + terminate: jest.fn(), + }) + + mockWssInstance = Object.assign(new EventEmitter(), { + handleUpgrade: jest.fn(), + }) + }) + + afterEach(() => { + jest.useRealTimers() + }) + + // ── Error handling ────────────────────────────────────────────────────────── + + describe('error handling before upstream connection', () => { + it('sends HTTP 500 and destroys socket when getAuthenticatedToken rejects', async () => { + const socket = makeMockSocket() + mockGetAuthToken.mockRejectedValue(new Error('auth failure')) + + await searchWebSocket(makeMockReq(), socket, Buffer.alloc(0)) + + expect(socket.write).toHaveBeenCalledWith(expect.stringContaining('500')) + expect(socket.destroy).toHaveBeenCalled() + }) + + it('sends HTTP 500 and destroys socket when getSearchRequestOptions rejects', async () => { + const socket = makeMockSocket() + mockGetAuthToken.mockResolvedValue('tok') + mockGetSearchOpts.mockRejectedValue(new Error('options failure')) + + await searchWebSocket(makeMockReq(), socket, Buffer.alloc(0)) + + expect(socket.write).toHaveBeenCalledWith(expect.stringContaining('500')) + expect(socket.destroy).toHaveBeenCalled() + }) + + it('sends HTTP 502 and destroys socket when upstream WS emits error before open', async () => { + const socket = makeMockSocket() + mockGetAuthToken.mockResolvedValue('tok') + mockGetSearchOpts.mockResolvedValue(DEFAULT_OPTIONS) + + await searchWebSocket(makeMockReq(), socket, Buffer.alloc(0)) + mockWsInstance.emit('error', new Error('connect ECONNREFUSED')) + + expect(socket.write).toHaveBeenCalledWith(expect.stringContaining('502')) + expect(socket.destroy).toHaveBeenCalled() + }) + + it('terminates upstream and sends HTTP 504 when connection times out', async () => { + jest.useFakeTimers() + const socket = makeMockSocket() + mockGetAuthToken.mockResolvedValue('tok') + mockGetSearchOpts.mockResolvedValue(DEFAULT_OPTIONS) + + await searchWebSocket(makeMockReq(), socket, Buffer.alloc(0)) + jest.advanceTimersByTime(60_001) + + expect(mockWsInstance.terminate).toHaveBeenCalled() + expect(socket.write).toHaveBeenCalledWith(expect.stringContaining('504')) + expect(socket.destroy).toHaveBeenCalled() + }) + }) + + // ── Successful upstream connection ────────────────────────────────────────── + + describe('successful upstream connection', () => { + it('calls handleUpgrade with the original req, socket and head when upstream opens', async () => { + const socket = makeMockSocket() + const head = Buffer.from('head') + const req = makeMockReq() + + mockGetAuthToken.mockResolvedValue('tok') + mockGetSearchOpts.mockResolvedValue(DEFAULT_OPTIONS) + + await searchWebSocket(req, socket, head) + mockWsInstance.emit('open') + + expect(mockWssInstance.handleUpgrade).toHaveBeenCalledWith(req, socket, head, expect.any(Function)) + }) + + it('constructs upstream URL without port when port is 443', async () => { + const socket = makeMockSocket() + mockGetAuthToken.mockResolvedValue('tok') + mockGetSearchOpts.mockResolvedValue({ hostname: 'search.example.com', port: 443, path: '/graphql' }) + + await searchWebSocket(makeMockReq(), socket, Buffer.alloc(0)) + + // If upstream was created without throwing we verify it exists; URL is wss://host/path (no port) + expect(mockWsInstance).toBeDefined() + }) + + it('constructs upstream URL with port when port is not 443', async () => { + const socket = makeMockSocket() + mockGetAuthToken.mockResolvedValue('tok') + mockGetSearchOpts.mockResolvedValue({ hostname: 'search.example.com', port: 4010, path: '/graphql' }) + + await searchWebSocket(makeMockReq(), socket, Buffer.alloc(0)) + + expect(mockWsInstance).toBeDefined() + }) + + it('uses the sec-websocket-protocol header from the browser request for the upstream', async () => { + const socket = makeMockSocket() + mockGetAuthToken.mockResolvedValue('tok') + mockGetSearchOpts.mockResolvedValue(DEFAULT_OPTIONS) + + await searchWebSocket(makeMockReq({ 'sec-websocket-protocol': 'graphql-ws' }), socket, Buffer.alloc(0)) + + // Upstream WS was created; the test verifies the function completes without error + expect(mockWsInstance).toBeDefined() + }) + + it('does not call failClientUpgrade after upgrade is complete even if socket errors', async () => { + const socket = makeMockSocket() + mockGetAuthToken.mockResolvedValue('tok') + mockGetSearchOpts.mockResolvedValue(DEFAULT_OPTIONS) + + await searchWebSocket(makeMockReq(), socket, Buffer.alloc(0)) + mockWsInstance.emit('open') + + // After upgrade completed, an upstream error should not write to the socket + const writeCallsBefore = (socket.write as jest.Mock).mock.calls.length + mockWsInstance.emit('error', new Error('post-open error')) + expect((socket.write as jest.Mock).mock.calls.length).toBe(writeCallsBefore) + }) + }) + + // ── Relay: client → upstream ──────────────────────────────────────────────── + + describe('relay: client → upstream', () => { + it('injects Authorization into connection_init message from client', async () => { + const { clientWs, upstreamWs } = await triggerSuccessfulUpgrade('secret-token') + + const initMsg = JSON.stringify({ type: 'connection_init', payload: { extra: true } }) + clientWs.emit('message', Buffer.from(initMsg), false) + + expect(upstreamWs.send).toHaveBeenCalledWith(expect.stringContaining('"Authorization":"Bearer secret-token"')) + }) + + it('does not forward connection_init message as-is (only injected version is sent)', async () => { + const { clientWs, upstreamWs } = await triggerSuccessfulUpgrade('tok') + + const initMsg = JSON.stringify({ type: 'connection_init', payload: {} }) + clientWs.emit('message', Buffer.from(initMsg), false) + + // send is called exactly once for connection_init (with injected auth) + expect(upstreamWs.send).toHaveBeenCalledTimes(1) + const sentArg = upstreamWs.send.mock.calls[0][0] as string + expect(JSON.parse(sentArg)).toMatchObject({ payload: { Authorization: 'Bearer tok' } }) + }) + + it('forwards subsequent text messages directly after connection_init is handled', async () => { + const { clientWs, upstreamWs } = await triggerSuccessfulUpgrade() + + const initMsg = JSON.stringify({ type: 'connection_init', payload: {} }) + const subscribeMsg = JSON.stringify({ type: 'subscribe', id: '1', payload: {} }) + + clientWs.emit('message', Buffer.from(initMsg), false) + clientWs.emit('message', Buffer.from(subscribeMsg), false) + + expect(upstreamWs.send).toHaveBeenNthCalledWith(2, Buffer.from(subscribeMsg), { binary: false }) + }) + + it('forwards a non-connection_init text message to upstream (sets connectionInitHandled)', async () => { + const { clientWs, upstreamWs } = await triggerSuccessfulUpgrade() + + const subscribeMsg = JSON.stringify({ type: 'subscribe', id: '1', payload: {} }) + clientWs.emit('message', Buffer.from(subscribeMsg), false) + + expect(upstreamWs.send).toHaveBeenCalledWith(Buffer.from(subscribeMsg), { binary: false }) + }) + + it('forwards binary frames directly to upstream', async () => { + const { clientWs, upstreamWs } = await triggerSuccessfulUpgrade() + + const binaryData = Buffer.from([0x01, 0x02, 0x03]) + clientWs.emit('message', binaryData, true) + + expect(upstreamWs.send).toHaveBeenCalledWith(binaryData, { binary: true }) + }) + + it('handles string message data (rawDataToUtf8 string path)', async () => { + const { clientWs, upstreamWs } = await triggerSuccessfulUpgrade('tok') + + const initMsg = JSON.stringify({ type: 'connection_init', payload: {} }) + clientWs.emit('message', initMsg, false) + + expect(upstreamWs.send).toHaveBeenCalledWith(expect.stringContaining('"Authorization":"Bearer tok"')) + }) + + it('handles fragment array message data (rawDataToUtf8 array path)', async () => { + const { clientWs, upstreamWs } = await triggerSuccessfulUpgrade('tok') + + const part1 = Buffer.from('{"type":"connection_init"') + const part2 = Buffer.from(',"payload":{}}') + clientWs.emit('message', [part1, part2], false) + + expect(upstreamWs.send).toHaveBeenCalledWith(expect.stringContaining('"Authorization":"Bearer tok"')) + }) + }) + + // ── Relay: upstream → client ──────────────────────────────────────────────── + + describe('relay: upstream → client', () => { + it('forwards messages from upstream to the client', async () => { + const { clientWs, upstreamWs } = await triggerSuccessfulUpgrade() + + const responseData = Buffer.from(JSON.stringify({ type: 'next', id: '1', payload: {} })) + upstreamWs.emit('message', responseData, false) + + expect(clientWs.send).toHaveBeenCalledWith(responseData, { binary: false }) + }) + + it('does not forward to client when client readyState is not OPEN', async () => { + const { clientWs, upstreamWs } = await triggerSuccessfulUpgrade() + clientWs.readyState = 3 // CLOSED + + upstreamWs.emit('message', Buffer.from('data'), false) + + expect(clientWs.send).not.toHaveBeenCalled() + }) + }) + + // ── Relay: connection lifecycle ───────────────────────────────────────────── + + describe('relay: connection lifecycle', () => { + it('closes both sockets when the client closes', async () => { + const { clientWs, upstreamWs } = await triggerSuccessfulUpgrade() + + clientWs.emit('close') + + expect(clientWs.close).toHaveBeenCalled() + expect(upstreamWs.close).toHaveBeenCalled() + }) + + it('closes the client socket when the upstream closes', async () => { + const { clientWs, upstreamWs } = await triggerSuccessfulUpgrade() + + upstreamWs.emit('close') + + expect(clientWs.close).toHaveBeenCalled() + }) + + it('closes both sockets on client-side error', async () => { + const { clientWs, upstreamWs } = await triggerSuccessfulUpgrade() + + clientWs.emit('error', new Error('client error')) + + expect(clientWs.close).toHaveBeenCalled() + expect(upstreamWs.close).toHaveBeenCalled() + }) + + it('closes both sockets on upstream error after the connection is open', async () => { + const { clientWs, upstreamWs } = await triggerSuccessfulUpgrade() + + upstreamWs.emit('error', new Error('upstream post-open error')) + + expect(clientWs.close).toHaveBeenCalled() + expect(upstreamWs.close).toHaveBeenCalled() + }) + }) })