Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion backend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
"pino": "7.11.0",
"pluralize": "8.0.0",
"prom-client": "^14.2.0",
"raw-body": "2.5.3"
"raw-body": "2.5.3",
"ws": "^8.20.0"
},
"devDependencies": {
"@types/eslint": "9.6.1",
Expand All @@ -45,6 +46,7 @@
"@types/node-fetch": "2.6.13",
"@types/node-localstorage": "^1.3.3",
"@types/pluralize": "^0.0.33",
"@types/ws": "^8.18.1",
"@typescript-eslint/eslint-plugin": "^8.59.0",
"@typescript-eslint/parser": "^8.59.0",
"concurrently": "9.2.1",
Expand Down
7 changes: 6 additions & 1 deletion backend/src/lib/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import type { Http2Server, Http2ServerRequest, Http2ServerResponse } from 'node:
import { constants, createSecureServer, createServer } from 'node:http2'
import type { Socket } from 'node:net'
import type { TLSSocket } from 'node:tls'
import { logger } from './logger'
import { managedClusterProxy } from '../routes/managedClusterProxy'
import { searchWebSocket } from '../routes/search'
import { logger } from './logger'

// Explicitly set ECDH curves to enable PQC (X25519MLKEM768).
// The default image crypto policy (/etc/crypto-policies/config) does not include them.
Expand Down Expand Up @@ -84,6 +85,10 @@ export function startServer(options: ServerOptions): Promise<Http2Server | undef
})
})
.on('upgrade', (req: Http2ServerRequest, socket: TLSSocket, head: Buffer) => {
if (req.url.startsWith('/multicloud/proxy/search')) {
req.url = req.url.substring(11)
return searchWebSocket(req, socket, head)
}
if (req.url.startsWith('/multicloud/managedclusterproxy')) {
req.url = req.url.substring(11)
return managedClusterProxy(req, socket, head)
Expand Down
210 changes: 209 additions & 1 deletion backend/src/routes/search.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
/* Copyright Contributors to the Open Cluster Management project */
import type { IncomingMessage } from 'node:http'
import type { Http2ServerRequest, Http2ServerResponse, OutgoingHttpHeaders } from 'node:http2'
import { constants } from 'node:http2'
import { request } from 'node:https'
import https, { request } from 'node:https'
import { pipeline } from 'node:stream'
import type { TLSSocket } from 'node:tls'
import WebSocket, { type RawData, WebSocketServer } from 'ws'
import { logger } from '../lib/logger'
import { notFound } from '../lib/respond'
import { getServiceCACertificate } from '../lib/serviceAccountToken'
import { getAuthenticatedToken } from '../lib/token'
import { getSearchRequestOptions } from '../lib/search'

Expand All @@ -16,6 +20,115 @@ const proxyHeaders = [
constants.HTTP2_HEADER_CONTENT_TYPE,
]

/** Case-insensitive header lookup for HTTP/2 request pseudo-headers and normal headers. */
function getHeaderValue(headers: Http2ServerRequest['headers'], name: string): string | undefined {
const lower = name.toLowerCase()
for (const [key, value] of Object.entries(headers)) {
if (key.toLowerCase() !== lower) continue
if (value === undefined) return undefined
return Array.isArray(value) ? value[0] : String(value)
}
return undefined
}

/** Adds `Authorization` to a graphql-ws `connection_init` message (exported for tests). */
export function injectSearchWsConnectionInitAuthorization(connectionInitJson: string, bearerToken: string): string {
const bearer = bearerToken.startsWith('Bearer ') ? bearerToken : `Bearer ${bearerToken}`
try {
const msg = JSON.parse(connectionInitJson) as { type?: string; payload?: Record<string, unknown> }
if (msg.type !== 'connection_init') return connectionInitJson
const prev =
msg.payload !== null && typeof msg.payload === 'object' && !Array.isArray(msg.payload) ? msg.payload : {}
return JSON.stringify({
...msg,
payload: { ...prev, Authorization: bearer },
})
} catch {
return connectionInitJson
}
}

/** `Sec-WebSocket-Protocol` from the client, or default `graphql-transport-ws`. */
function subprotocolsForUpstream(req: Http2ServerRequest): string | string[] {
const raw = getHeaderValue(req.headers, 'sec-websocket-protocol')
if (!raw) return ['graphql-transport-ws']
const list = raw
.split(',')
.map((s) => s.trim())
.filter(Boolean)
return list.length ? list : ['graphql-transport-ws']
}

/** Decodes a `ws` text frame payload to UTF-8 (handles Buffer / fragment arrays). */
function rawDataToUtf8(data: RawData): string {
if (typeof data === 'string') return data
if (Buffer.isBuffer(data)) return data.toString('utf8')
if (Array.isArray(data)) return Buffer.concat(data).toString('utf8')
return Buffer.from(data).toString('utf8')
}

/** Bidirectional relay; rewrites the first `connection_init` from the browser to include the bearer token. */
function relayClientToUpstream(clientWs: WebSocket, upstreamWs: WebSocket, rawToken: string): void {
let connectionInitHandled = false

clientWs.on('message', (data, isBinary) => {
if (!connectionInitHandled && !isBinary) {
const text = rawDataToUtf8(data)
try {
const parsed = JSON.parse(text) as { type?: string }
if (parsed.type === 'connection_init') {
connectionInitHandled = true
upstreamWs.send(injectSearchWsConnectionInitAuthorization(text, rawToken))
return
}
} catch {
// fall through
}
connectionInitHandled = true
} else if (!connectionInitHandled) {
connectionInitHandled = true
}
upstreamWs.send(data, { binary: Boolean(isBinary) })
})

upstreamWs.on('message', (data, isBinary) => {
if (clientWs.readyState === WebSocket.OPEN) {
clientWs.send(data, { binary: Boolean(isBinary) })
}
})

const closeBoth = () => {
try {
clientWs.close()
} catch {
/* ignore */
}
try {
upstreamWs.close()
} catch {
/* ignore */
}
}

clientWs.on('close', closeBoth)
upstreamWs.on('close', () => {
try {
clientWs.close()
} catch {
/* ignore */
}
})
clientWs.on('error', (err) => {
logger.error({ msg: 'search websocket relay: client error', err })
closeBoth()
})
upstreamWs.on('error', (err) => {
logger.error({ msg: 'search websocket relay: upstream error after open', err })
closeBoth()
})
}

/** Proxies search GraphQL POST to search-api (HTTPS). */
export async function search(req: Http2ServerRequest, res: Http2ServerResponse): Promise<void> {
const token = await getAuthenticatedToken(req, res)
if (token) {
Expand All @@ -39,3 +152,98 @@ export async function search(req: Http2ServerRequest, res: Http2ServerResponse):
)
}
}

/**
* WS relay: opens `wss` to search-api with the session bearer, completes the browser upgrade, injects
* `Authorization` into graphql-ws `connection_init` (search-v2-api expects the token in that payload).
*/
export async function searchWebSocket(req: Http2ServerRequest, socket: TLSSocket, head: Buffer): Promise<void> {
let clientUpgradeCompleted = false

/** Sends an HTTP error on the TCP socket if the WS upgrade to the browser has not completed yet. */
const failClientUpgrade = (statusCode: number, statusText: string) => {
if (clientUpgradeCompleted) return
try {
socket.write(`HTTP/1.1 ${statusCode} ${statusText}\r\nConnection: close\r\n\r\n`)
} catch {
/* ignore */
}
socket.destroy()
}

try {
const token = await getAuthenticatedToken(req, socket)
const headers: OutgoingHttpHeaders = { authorization: `Bearer ${token}` }
const options = await getSearchRequestOptions(headers)
Comment thread
zlayne marked this conversation as resolved.

const upstreamHost = String(options.hostname)
const upstreamPort = options.port ? Number(options.port) : 443
const upstreamPath = String(options.path ?? '')
const upstreamWsUrl =
upstreamPort === 443
? `wss://${upstreamHost}${upstreamPath}`
: `wss://${upstreamHost}:${upstreamPort}${upstreamPath}`
const bearerHeader = `Bearer ${token}`
const hostHeader = upstreamPort === 443 ? upstreamHost : `${upstreamHost}:${upstreamPort}`

logger.info({
msg: 'search websocket relay: opening upstream',
clientUrl: req.url,
upstreamWsUrl,
})

const httpsAgent = new https.Agent({
ca: getServiceCACertificate(),
keepAlive: true,
})

const upstreamWs = new WebSocket(upstreamWsUrl, subprotocolsForUpstream(req), {
agent: httpsAgent,
headers: {
Authorization: bearerHeader,
Host: hostHeader,
},
})

let upstreamOpen = false

const connectTimeout = setTimeout(() => {
logger.error({ msg: 'search websocket relay: upstream connect timeout', upstreamWsUrl })
upstreamWs.terminate()
if (!upstreamOpen) {
failClientUpgrade(504, 'Gateway Timeout')
}
}, 60_000)

upstreamWs.on('error', (err) => {
clearTimeout(connectTimeout)
if (!upstreamOpen) {
logger.error({ msg: 'search websocket relay: upstream connect failed', err, upstreamWsUrl })
failClientUpgrade(502, 'Bad Gateway')
}
})

upstreamWs.once('open', () => {
upstreamOpen = true
clearTimeout(connectTimeout)
const wss = new WebSocketServer({ noServer: true, perMessageDeflate: false })
wss.handleUpgrade(req as unknown as IncomingMessage, socket, head, (clientWs) => {
clientUpgradeCompleted = true
relayClientToUpstream(clientWs, upstreamWs, token)
})
})
} catch (err: unknown) {
const message =
err instanceof Error ? err.message : typeof err === 'string' ? err : 'search websocket relay: unknown error'
logger.error({ msg: 'search websocket relay: handler error', error: message })
if (!clientUpgradeCompleted) {
failClientUpgrade(500, 'Internal Server Error')
} else {
try {
socket.destroy()
} catch {
/* ignore */
}
}
}
}
Loading