From da8a8e217ef3120db5633b3636c10d110e273c39 Mon Sep 17 00:00:00 2001 From: Emiliano Sanchez Date: Fri, 27 Mar 2026 14:48:11 -0300 Subject: [PATCH] Add SecureSplitHttpClient and PushManagerSecure for JWT management --- src/services/secureSplitHttpClient.ts | 208 ++++++++++++++++++++++++ src/services/splitApi.ts | 8 +- src/services/types.ts | 7 + src/sync/streaming/pushManagerSecure.ts | 203 +++++++++++++++++++++++ 4 files changed, 423 insertions(+), 3 deletions(-) create mode 100644 src/services/secureSplitHttpClient.ts create mode 100644 src/sync/streaming/pushManagerSecure.ts diff --git a/src/services/secureSplitHttpClient.ts b/src/services/secureSplitHttpClient.ts new file mode 100644 index 00000000..5fa78e66 --- /dev/null +++ b/src/services/secureSplitHttpClient.ts @@ -0,0 +1,208 @@ +import { IRequestOptions, IResponse, ISplitHttpClient, NetworkError, ISecureSplitHttpClient } from './types'; +import { objectAssign } from '../utils/lang/objectAssign'; +import { ERROR_HTTP, ERROR_CLIENT_CANNOT_GET_READY } from '../logger/constants'; +import { ISettings } from '../types'; +import { IPlatform } from '../sdkFactory/types'; +import { decorateHeaders, removeNonISO88591 } from './decorateHeaders'; +import { splitHttpClientFactory } from './splitHttpClient'; +import { timeout } from '../utils/promise/timeout'; +import { decodeJWTtoken } from '../utils/jwt'; +import { SECONDS_BEFORE_EXPIRATION } from '../sync/streaming/constants'; +import { IAuthToken } from '../sync/streaming/AuthClient/types'; + +const PENDING_FETCH_ERROR_TIMEOUT = 100; +const messageNoFetch = 'Global fetch API is not available.'; + +/** + * Creates an auth data manager that transparently handles JWT credential lifecycle: + * fetching, caching, expiry checks, invalidation, and deduplication of concurrent requests. + * + * @param innerHttpClient - standard HTTP client (authenticated with SDK key) used to call the auth endpoint + * @param settings - SDK settings, used to build the auth endpoint URL + */ +function authDataManagerFactory(innerHttpClient: ISplitHttpClient, settings: ISettings) { + let currentToken: IAuthToken | null = null; + let pendingRequest: Promise | null = null; + + function fetchToken(): Promise { + const url = settings.urls.auth + '/v2/auth?s=' + settings.sync.flagSpecVersion; + return innerHttpClient(url) + .then(function (resp) { return resp.json(); }) + .then(function (json) { + let authToken: IAuthToken; + if (json.token) { + const decodedToken = decodeJWTtoken(json.token); + if (typeof decodedToken.iat !== 'number' || typeof decodedToken.exp !== 'number') { + throw new Error('token properties "issuedAt" (iat) or "expiration" (exp) are missing or invalid'); + } + const channels = JSON.parse(decodedToken['x-ably-capability']); + authToken = objectAssign({ decodedToken, channels }, json) as IAuthToken; + } else { + authToken = json as IAuthToken; + } + currentToken = authToken; + return authToken; + }); + } + + function isExpired(token: IAuthToken): boolean { + // Consider token expired SECONDS_BEFORE_EXPIRATION (600s) before actual expiry, + // so that proactive refresh (e.g., for streaming) gets a fresh token + return !token.pushEnabled || Date.now() / 1000 >= token.decodedToken.exp - SECONDS_BEFORE_EXPIRATION; + } + + return { + getAuthData(): Promise { + // Return cached token if valid and not expired + if (currentToken && !isExpired(currentToken)) { + return Promise.resolve(currentToken); + } + + // Deduplicate concurrent requests + if (pendingRequest) return pendingRequest; + + pendingRequest = fetchToken().then( + function (token) { + pendingRequest = null; + return token; + }, + function (error) { + pendingRequest = null; + throw error; + } + ); + + return pendingRequest; + }, + + // Internal: used by the secure HTTP client on 401 to force a fresh token + invalidate() { + currentToken = null; + } + }; +} + +/** + * Factory of Secure Split HTTP clients. Like `splitHttpClientFactory`, but transparently + * manages JWT authentication: obtains a JWT from the auth endpoint (using the SDK key internally), + * caches it, and retries once on 401 responses with a fresh token. + * + * @param settings - SDK settings + * @param platform - object containing environment-specific dependencies + * @returns an object with `httpClient` (ISplitHttpClient) and `getAuthData` to retrieve current auth token + */ +export function secureSplitHttpClientFactory( + settings: ISettings, + platform: Pick +): ISecureSplitHttpClient { + + const { getOptions, getFetch } = platform; + const { log, version, runtime: { ip, hostname } } = settings; + const options = getOptions && getOptions(settings); + const fetch = getFetch && getFetch(settings); + + // if fetch is not available, log Error + if (!fetch) log.error(ERROR_CLIENT_CANNOT_GET_READY, [messageNoFetch]); + + const commonHeaders: Record = { + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'SplitSDKVersion': version + }; + + if (ip) commonHeaders['SplitSDKMachineIP'] = ip; + if (hostname) commonHeaders['SplitSDKMachineName'] = removeNonISO88591(hostname); + + // Inner standard HTTP client for auth endpoint calls (authenticates with SDK key) + const innerHttpClient = splitHttpClientFactory(settings, platform); + const authDataManager = authDataManagerFactory(innerHttpClient, settings); + + function doFetch(url: string, request: Record): Promise { + return fetch!(url, request) + .then(function (response) { + if (!response.ok) { + return timeout(PENDING_FETCH_ERROR_TIMEOUT, response.text()).then( + function (message) { return Promise.reject({ response: response, message: message }); }, + function () { return Promise.reject({ response: response }); } + ); + } + return response; + }); + } + + function buildRequest(reqOpts: IRequestOptions, authToken: string): Record { + const headers = objectAssign({}, commonHeaders, { 'Authorization': 'Bearer ' + authToken }, reqOpts.headers || {}); + return objectAssign({ + headers: decorateHeaders(settings, headers), + method: reqOpts.method || 'GET', + body: reqOpts.body + }, options); + } + + function handleError(error: any, url: string, logErrorsAsInfo: boolean): NetworkError { + const resp = error && error.response; + let msg = ''; + + if (resp) { + switch (resp.status) { + case 404: msg = 'Invalid SDK key or resource not found.'; + break; + default: msg = error.message; + break; + } + } else { + msg = error.message || 'Network Error'; + } + + if (!resp || resp.status !== 403) { + log[logErrorsAsInfo ? 'info' : 'error'](ERROR_HTTP, [resp ? 'status code ' + resp.status : 'no status code', url, msg]); + } + + const networkError: NetworkError = new Error(msg); + networkError.statusCode = resp && resp.status; + return networkError; + } + + function httpClient(url: string, reqOpts: IRequestOptions = {}, latencyTracker: (error?: NetworkError) => void = function () { }, logErrorsAsInfo: boolean = false): Promise { + if (!fetch) return Promise.reject(new Error(messageNoFetch)); + + return authDataManager.getAuthData() + .then(function (authToken) { + const request = buildRequest(reqOpts, authToken.token); + return doFetch(url, request) + .then(function (response) { + latencyTracker(); + return response; + }) + .catch(function (error) { + const resp = error && error.response; + + // On 401, invalidate credential and retry once with a fresh token + if (resp && resp.status === 401) { + authDataManager.invalidate(); + return authDataManager.getAuthData() + .then(function (freshToken) { + const retryRequest = buildRequest(reqOpts, freshToken.token); + return doFetch(url, retryRequest) + .then(function (response) { + latencyTracker(); + return response; + }); + }); + } + + throw error; + }); + }) + .catch(function (error) { + const networkError = handleError(error, url, logErrorsAsInfo); + latencyTracker(networkError); + throw networkError; + }); + } + + return { + httpClient: httpClient, + getAuthData: authDataManager.getAuthData + }; +} diff --git a/src/services/splitApi.ts b/src/services/splitApi.ts index 6860b022..5004ac5c 100644 --- a/src/services/splitApi.ts +++ b/src/services/splitApi.ts @@ -1,7 +1,7 @@ import { IPlatform } from '../sdkFactory/types'; import { ISettings } from '../types'; import { splitHttpClientFactory } from './splitHttpClient'; -import { ISplitApi } from './types'; +import { ISplitApi, ISplitHttpClient } from './types'; import { objectAssign } from '../utils/lang/objectAssign'; import { ITelemetryTracker } from '../trackers/types'; import { SPLITS, IMPRESSIONS, IMPRESSIONS_COUNT, EVENTS, TELEMETRY, TOKEN, SEGMENT, MEMBERSHIPS } from '../utils/constants'; @@ -19,17 +19,19 @@ function userKeyToQueryParam(userKey: string) { * @param settings - validated settings object * @param platform - object containing environment-specific dependencies * @param telemetryTracker - telemetry tracker + * @param _splitHttpClient - optional split http client to use instead of the default one */ export function splitApiFactory( settings: ISettings, platform: Pick, - telemetryTracker: ITelemetryTracker + telemetryTracker: ITelemetryTracker, + _splitHttpClient?: ISplitHttpClient ): ISplitApi { const urls = settings.urls; const filterQueryString = settings.sync.__splitFiltersValidation && settings.sync.__splitFiltersValidation.queryString; const SplitSDKImpressionsMode = settings.sync.impressionsMode; - const splitHttpClient = splitHttpClientFactory(settings, platform); + const splitHttpClient = _splitHttpClient || splitHttpClientFactory(settings, platform); return { // @TODO throw errors if health check requests fail, to log them in the Synchronizer diff --git a/src/services/types.ts b/src/services/types.ts index b747dbb5..18ca7d77 100644 --- a/src/services/types.ts +++ b/src/services/types.ts @@ -1,3 +1,5 @@ +import { IAuthToken } from '../sync/streaming/AuthClient/types'; + export type IRequestOptions = { method?: string, headers?: Record, @@ -33,6 +35,11 @@ export type IHealthCheckAPI = () => Promise export type ISplitHttpClient = (url: string, options?: IRequestOptions, latencyTracker?: (error?: NetworkError) => void, logErrorsAsInfo?: boolean) => Promise +export type ISecureSplitHttpClient = { + httpClient: ISplitHttpClient; + getAuthData(): Promise +} + export type IFetchAuth = (userKeys?: string[]) => Promise export type IFetchSplitChanges = (since: number, noCache?: boolean, till?: number, rbSince?: number) => Promise diff --git a/src/sync/streaming/pushManagerSecure.ts b/src/sync/streaming/pushManagerSecure.ts new file mode 100644 index 00000000..aaefd3f0 --- /dev/null +++ b/src/sync/streaming/pushManagerSecure.ts @@ -0,0 +1,203 @@ +import { IPushEventEmitter, IPushManager } from './types'; +import { ISSEClient } from './SSEClient/types'; +import { IPollingManager, ISegmentsSyncTask } from '../polling/types'; +import { objectAssign } from '../../utils/lang/objectAssign'; +import { Backoff } from '../../utils/Backoff'; +import { SSEHandlerFactory } from './SSEHandler'; +import { SegmentsUpdateWorker } from './UpdateWorkers/SegmentsUpdateWorker'; +import { SplitsUpdateWorker } from './UpdateWorkers/SplitsUpdateWorker'; +import { SSEClient } from './SSEClient'; +import { PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, RB_SEGMENT_UPDATE, PUSH_RETRYABLE_ERROR, PUSH_SUBSYSTEM_UP, SECONDS_BEFORE_EXPIRATION, ControlType } from './constants'; +import { STREAMING_FALLBACK, STREAMING_REFRESH_TOKEN, STREAMING_CONNECTING, STREAMING_DISABLED, ERROR_STREAMING_AUTH, STREAMING_DISCONNECTING, STREAMING_RECONNECT } from '../../logger/constants'; +import { IAuthTokenPushEnabled } from './AuthClient/types'; +import { TOKEN_REFRESH, AUTH_REJECTION } from '../../utils/constants'; +import { ISdkFactoryContextSync } from '../../sdkFactory/types'; +import { ISecureSplitHttpClient } from '../../services/types'; + +/** + * PushManagerSecure factory: server-side only push manager that delegates + * JWT authentication to the SecureSplitHttpClient. + */ +export function pushManagerSecureFactory( + params: ISdkFactoryContextSync, + pollingManager: IPollingManager, + secureSplitHttpClient: ISecureSplitHttpClient +): IPushManager | undefined { + + const { settings, storage, readiness, platform, telemetryTracker } = params; + const log = settings.log; + + let sseClient: ISSEClient; + try { + sseClient = new SSEClient(settings, platform); + } catch (e) { + log.warn(STREAMING_FALLBACK, [e]); + return; + } + + // init feedback loop + const pushEmitter = new platform.EventEmitter() as IPushEventEmitter; + const sseHandler = SSEHandlerFactory(log, pushEmitter, telemetryTracker); + sseClient.setEventHandler(sseHandler); + + // init workers (server-side only) + const segmentsUpdateWorker = SegmentsUpdateWorker(log, pollingManager.segmentsSyncTask as ISegmentsSyncTask, storage.segments); + const splitsUpdateWorker = SplitsUpdateWorker(log, storage, pollingManager.splitsSyncTask, readiness.splits, telemetryTracker, pollingManager.segmentsSyncTask as ISegmentsSyncTask); + + // flag that indicates if `stop/disconnectPush` was called + let disconnected: boolean | undefined; + // flag that indicates a PUSH_NONRETRYABLE_ERROR + let disabled: boolean | undefined; + + /** PushManager functions related to initialization */ + + const connectPushRetryBackoff = new Backoff(connectPush, settings.scheduler.pushRetryBackoffBase); + + let timeoutIdTokenRefresh: ReturnType; + let timeoutIdSseOpen: ReturnType; + + function scheduleTokenRefreshAndSse(authData: IAuthTokenPushEnabled) { + // clear scheduled tasks if exist + if (timeoutIdTokenRefresh) clearTimeout(timeoutIdTokenRefresh); + if (timeoutIdSseOpen) clearTimeout(timeoutIdSseOpen); + + // Set token refresh 10 minutes before `expirationTime - issuedAt` + const decodedToken = authData.decodedToken; + const refreshTokenDelay = decodedToken.exp - decodedToken.iat - SECONDS_BEFORE_EXPIRATION; + // Default connDelay of 60 secs + const connDelay = typeof authData.connDelay === 'number' && authData.connDelay >= 0 ? authData.connDelay : 60; + + log.info(STREAMING_REFRESH_TOKEN, [refreshTokenDelay, connDelay]); + + // AuthProvider considers tokens stale SECONDS_BEFORE_EXPIRATION before actual expiry, + // so getAuthData() will fetch a fresh token when this fires + timeoutIdTokenRefresh = setTimeout(connectPush, refreshTokenDelay * 1000); + + timeoutIdSseOpen = setTimeout(function () { + // halt if disconnected + if (disconnected) return; + sseClient.open(authData); + }, connDelay * 1000); + + telemetryTracker.streamingEvent(TOKEN_REFRESH, decodedToken.exp); + } + + function connectPush() { + // Guard condition in case `stop/disconnectPush` has been called + if (disconnected) return; + log.info(STREAMING_CONNECTING); + disconnected = false; + + secureSplitHttpClient.getAuthData().then((authData) => { + if (disconnected) return; + + if (!authData.pushEnabled) { + log.info(STREAMING_DISABLED); + pushEmitter.emit(PUSH_NONRETRYABLE_ERROR); + return; + } + + scheduleTokenRefreshAndSse(authData); + } + ).catch( + function (error) { + if (disconnected) return; + + log.error(ERROR_STREAMING_AUTH, [error.message]); + + // Handle 4XX HTTP errors: non-retryable + if (error.statusCode >= 400 && error.statusCode < 500) { + telemetryTracker.streamingEvent(AUTH_REJECTION); + pushEmitter.emit(PUSH_NONRETRYABLE_ERROR); + return; + } + + // Handle other HTTP and network errors as recoverable errors + pushEmitter.emit(PUSH_RETRYABLE_ERROR); + } + ); + } + + // close SSE connection and cancel scheduled tasks + function disconnectPush() { + if (disconnected) return; + disconnected = true; + + sseClient.close(); + log.info(STREAMING_DISCONNECTING); + + if (timeoutIdTokenRefresh) clearTimeout(timeoutIdTokenRefresh); + if (timeoutIdSseOpen) clearTimeout(timeoutIdSseOpen); + connectPushRetryBackoff.reset(); + + stopWorkers(); + } + + function stopWorkers() { + splitsUpdateWorker.stop(); + segmentsUpdateWorker.stop(); + } + + pushEmitter.on(PUSH_SUBSYSTEM_DOWN, stopWorkers); + + pushEmitter.on(PUSH_SUBSYSTEM_UP, function () { + connectPushRetryBackoff.reset(); + }); + + /** Fallback to polling without retry */ + pushEmitter.on(PUSH_NONRETRYABLE_ERROR, function handleNonRetryableError() { + disabled = true; + disconnectPush(); + pushEmitter.emit(PUSH_SUBSYSTEM_DOWN); + }); + + /** Fallback to polling with retry */ + pushEmitter.on(PUSH_RETRYABLE_ERROR, function handleRetryableError() { + sseClient.close(); + + const delayInMillis = connectPushRetryBackoff.scheduleCall(); + + log.info(STREAMING_RECONNECT, [delayInMillis / 1000]); + + pushEmitter.emit(PUSH_SUBSYSTEM_DOWN); + }); + + /** STREAMING_RESET notification */ + pushEmitter.on(ControlType.STREAMING_RESET, function handleStreamingReset() { + if (disconnected) return; + + if (timeoutIdTokenRefresh) clearTimeout(timeoutIdTokenRefresh); + + connectPush(); + }); + + /** Wire update workers */ + pushEmitter.on(SPLIT_KILL, splitsUpdateWorker.killSplit); + pushEmitter.on(SPLIT_UPDATE, splitsUpdateWorker.put); + pushEmitter.on(RB_SEGMENT_UPDATE, splitsUpdateWorker.put); + pushEmitter.on(SEGMENT_UPDATE, segmentsUpdateWorker.put); + + return objectAssign( + Object.create(pushEmitter), + { + stop() { + disconnectPush(); + }, + + start() { + if (disabled || disconnected === false) return; + disconnected = false; + + setTimeout(connectPush); + }, + + isRunning() { + return disconnected === false; + }, + + // No-ops for IPushManager interface compatibility (server-side only) + add() { }, + remove() { } + } + ); +}