diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts index fada34be2..52cf7af6a 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.test.ts @@ -52,7 +52,7 @@ beforeEach(() => { transport, remoteParticipants$: new BehaviorSubject([]), } as unknown as Connection; - vi.mocked(mockConnection).start = vi.fn(); + vi.mocked(mockConnection).start = vi.fn().mockResolvedValue(undefined); vi.mocked(mockConnection).stop = vi.fn(); // Tie the connection's lifecycle to the scope to test scope lifecycle management scope.onEnd(() => { @@ -235,7 +235,9 @@ describe("connectionManagerData$ stream", () => { transport, remoteParticipants$: getRemoteParticipantsFor(transport), } as unknown as Connection; - vi.mocked(mockConnection).start = vi.fn(); + vi.mocked(mockConnection).start = vi + .fn() + .mockResolvedValue(undefined); vi.mocked(mockConnection).stop = vi.fn(); // Tie the connection's lifecycle to the scope to test scope lifecycle management scope.onEnd(() => { diff --git a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts index 727f68bcc..e01211da4 100644 --- a/src/state/CallViewModel/remoteMembers/ConnectionManager.ts +++ b/src/state/CallViewModel/remoteMembers/ConnectionManager.ts @@ -7,7 +7,14 @@ Please see LICENSE in the repository root for full details. */ import { type LivekitTransportConfig } from "matrix-js-sdk/lib/matrixrtc"; -import { combineLatest, map, of, switchMap } from "rxjs"; +import { + combineLatest, + map, + type Observable, + of, + scan, + switchMap, +} from "rxjs"; import { type Logger } from "matrix-js-sdk/lib/logger"; import { type RemoteParticipant } from "livekit-client"; import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/EncryptionManager"; @@ -15,14 +22,11 @@ import { type CallMembershipIdentityParts } from "matrix-js-sdk/lib/matrixrtc/En import { type Behavior } from "../../Behavior.ts"; import { type Connection } from "./Connection.ts"; import { Epoch, type ObservableScope } from "../../ObservableScope.ts"; -import { generateItemsWithEpoch } from "../../../utils/observable.ts"; import { areLivekitTransportsEqual } from "./MatrixLivekitMembers.ts"; import { type ConnectionFactory } from "./ConnectionFactory.ts"; import { - isLocalTransportWithSFUConfig, type LocalTransportWithSFUConfig, } from "../localMember/LocalTransport.ts"; -import { type SFUConfig } from "../../../livekit/openIDSFU.ts"; export class ConnectionManagerData { private readonly store: Map< @@ -80,11 +84,19 @@ interface Props { ownMembershipIdentity: CallMembershipIdentityParts; } -// TODO - write test for scopes (do we really need to bind scope) export interface IConnectionManager { connectionManagerData$: Behavior>; } +/** + * Incremental state based on prev/current transports and connections. + */ +interface ScannedState { + managedTransports: LivekitTransportConfig[]; + managedConnections: Connection[]; + epoch: number; +} + /** * Crete a `ConnectionManager` * @param props - Configuration object @@ -114,150 +126,241 @@ export function createConnectionManager$({ ownMembershipIdentity, }: Props): IConnectionManager { const logger = parentLogger.getChild("[ConnectionManager]"); - // TODO logger: only construct one logger from the client and make it compatible via a EC specific sing - - /** - * All transports currently managed by the ConnectionManager. - * - * This list does not include duplicate transports. - * - * It is build based on the list of subscribed transports (`transportsSubscriptions$`). - * externally this is modified via `registerTransports()`. - */ - const localAndRemoteTransports$: Behavior< - Epoch<(LivekitTransportConfig | LocalTransportWithSFUConfig)[]> - > = scope.behavior( + + // De-duplicate the list of transports and flatten it into a single list. + // The connection manager should only create one connection per unique transport config, + // even if multiple session members are using the same transport. + const localAndRemoteTransports$ = getLocalAndRemoteTransports$( + scope, + remoteTransports$, + localTransport$, + ); + + // Create and start connections for each transport. + // Incrementally checks for new and removed transports and stop and remove connections accordingly. + const state$ = scanInternalState$( + scope, + localAndRemoteTransports$, + ownMembershipIdentity, + connectionFactory, + logger, + ); + + const connectionManagerData$ = state$.pipe( + switchMap((state) => { + // Map each connection to a stream of {connection, participants} + const connectionWithParticipants$ = state.managedConnections.map( + (connection) => { + return connection.remoteParticipants$.pipe( + map((participants) => ({ + connection, + participants, + })), + ); + }, + ); + + // Handle empty case + if (connectionWithParticipants$.length === 0) { + return of(new Epoch(new ConnectionManagerData(), state.epoch)); + } + + // Combine all the streams and reduce into ConnectionManagerData + return combineLatest(connectionWithParticipants$).pipe( + map((items) => { + const data = new ConnectionManagerData(); + items.forEach(({ connection, participants }) => { + data.add(connection, participants); + }); + return new Epoch(data, state.epoch); + }), + ); + }), + ); + + return { connectionManagerData$: scope.behavior(connectionManagerData$) }; +} + +/* + Each member sends its transport as part of the MatrixRTC membership. + The connection manager will create a connection for each unique transport, + even if multiple session members are using the same transport. + */ +function removeDuplicateTransports( + transports: T[], +): T[] { + return transports.reduce((acc, transport) => { + if (!acc.some((t) => areLivekitTransportsEqual(t, transport))) + acc.push(transport); + return acc; + }, [] as T[]); +} + +type TransportsData = { + local: LocalTransportWithSFUConfig | null; + remotes: LivekitTransportConfig[]; +}; +/** + * All transports currently managed by the ConnectionManager. + * + * This list does not include duplicate transports. + * + * It is build based on the list of subscribed transports (`transportsSubscriptions$`). + * externally this is modified via `registerTransports()`. + */ +function getLocalAndRemoteTransports$( + scope: ObservableScope, + remoteTransports$: Behavior>, + localTransport$: Behavior, +): Behavior> { + return scope.behavior( combineLatest([remoteTransports$, localTransport$]).pipe( - // Combine local and remote transports into one transport array - // and set the forceOldJwtEndpoint property on the local transport map(([remoteTransports, localTransport]) => { - let localTransportAsArray: LocalTransportWithSFUConfig[] = []; - if (localTransport) { - localTransportAsArray = [localTransport]; - } + // Get the unique transports we have to connect to const dedupedRemote = removeDuplicateTransports(remoteTransports.value); + + // For clarity do not include the local transport in the remote list. const remoteWithoutLocal = dedupedRemote.filter( (transport) => - !localTransportAsArray.find((l) => - areLivekitTransportsEqual(l.transport, transport), + !areLivekitTransportsEqual( + localTransport?.transport ?? null, + transport, ), ); - logger.debug( - "remoteWithoutLocal", - remoteWithoutLocal, - "localTransportAsArray", - localTransportAsArray, - ); return new Epoch( - [...localTransportAsArray, ...remoteWithoutLocal], + { + local: localTransport, + remotes: remoteWithoutLocal, + }, remoteTransports.epoch, ); }), ), ); +} + +/** + * Monitors the list of transports and creates and stops connections accordingly. + * + * It will automatically: + * - Creates new connections when transports are added + * - Removes and stops connections when transports are removed; + * + * Returns a state object that contains the list of managed transports and connections. + */ +function scanInternalState$( + scope: ObservableScope, + localAndRemoteTransports$: Behavior>, + ownMembershipIdentity: CallMembershipIdentityParts, + connectionFactory: ConnectionFactory, + logger: Logger, +): Observable { + const initialState: ScannedState = { + managedTransports: [], + managedConnections: [], + epoch: -1, + }; + + return localAndRemoteTransports$.pipe( + scan((state: ScannedState, transportsEpoch) => { + const transports = transportsEpoch.value; + + // XXX do we need to handle the case where a remote transport is promoted to local? + // If so, we could add more info to the state and use that to decide whether to create a new connection or not. + + // Combine local and remote transports into one transport array + const currentTransports = [ + ...(transports.local ? [transports.local.transport] : []), + ...transports.remotes, + ]; + + // Find new and removed transports + const { addedTransports, removedTransports } = computeTransportDiff( + currentTransports, + state.managedTransports, + ); + + if (removedTransports.length > 0) { + logger.debug("Removed transports detected :", removedTransports); - /** - * Connections for each transport in use by one or more session members. - */ - const connections$ = scope.behavior( - localAndRemoteTransports$.pipe( - generateItemsWithEpoch( - "ConnectionManager connections$", - function* (transports) { - for (const transport of transports) { - if (isLocalTransportWithSFUConfig(transport)) { - // This is the local transport; only the `LocalTransportWithSFUConfig` has a `sfuConfig` field. - yield { - keys: [ - transport.transport.livekit_service_url, - transport.sfuConfig, - ], - data: undefined, - }; - } else { - yield { - keys: [ - transport.livekit_service_url, - undefined as SFUConfig | undefined, - ], - data: undefined, - }; - } + // stop connections for removed transports + removedTransports.forEach((transport) => { + const removedCo = state.managedConnections.find((connection) => + areLivekitTransportsEqual(connection.transport, transport), + ); + if (removedCo) { + void removedCo.stop(); } + }); + } + + // Remove all connections for removed transports + const remainingConnections = state.managedConnections.filter( + (connection) => { + return !removedTransports.some((transport) => + areLivekitTransportsEqual(connection.transport, transport), + ); }, - (scope, _data$, serviceUrl, sfuConfig) => { + ); + + let addedConnections: Connection[] = []; + if (addedTransports.length > 0) { + logger.debug("New transports detected", addedTransports); + + addedConnections = addedTransports.map((transport) => { + // let's create a connection for each transport const connection = connectionFactory.createConnection( scope, - { - type: "livekit", - livekit_service_url: serviceUrl, - }, + transport, ownMembershipIdentity, logger, - // TODO: This whole optional SFUConfig parameter is not particularly elegant. - // I would like it if connections always fetched the SFUConfig by themselves. - sfuConfig, + transports.local?.transport?.livekit_service_url === + transport.livekit_service_url + ? transports.local?.sfuConfig + : undefined, ); - // Start the connection immediately - // Use connection state to track connection progress - void connection.start(); + // start the connection immediately + connection.start().catch((e) => { + logger.error("Failed to start connection", e); + }); // TODO subscribe to connection state to retry or log issues? return connection; - }, - ), - ), - ); - - const connectionManagerData$ = scope.behavior( - connections$.pipe( - switchMap((connections) => { - const epoch = connections.epoch; - - // Map the connections to list of {connection, participants}[] - const listOfConnectionsWithRemoteParticipants = connections.value.map( - (connection) => { - return connection.remoteParticipants$.pipe( - map((participants) => ({ - connection, - participants, - })), - ); - }, - ); + }); + } - // probably not required - - if (listOfConnectionsWithRemoteParticipants.length === 0) { - return of(new Epoch(new ConnectionManagerData(), epoch)); - } - - // combineLatest the several streams into a single stream with the ConnectionManagerData - return combineLatest(listOfConnectionsWithRemoteParticipants).pipe( - map( - (lists) => - new Epoch( - lists.reduce((data, { connection, participants }) => { - data.add(connection, participants); - return data; - }, new ConnectionManagerData()), - epoch, - ), - ), - ); - }), - ), - new Epoch(new ConnectionManagerData(), -1), + return { + managedTransports: currentTransports, + managedConnections: [...remainingConnections, ...addedConnections], + epoch: transportsEpoch.epoch, + }; + }, initialState), ); - - return { connectionManagerData$ }; } -function removeDuplicateTransports( - transports: T[], -): T[] { - return transports.reduce((acc, transport) => { - if (!acc.some((t) => areLivekitTransportsEqual(t, transport))) - acc.push(transport); - return acc; - }, [] as T[]); +/** + * Utility function to compute the difference between two lists of transports. + * It returns the transports that are in the current list but not in the previous list (addedTransports) + * and the transports that are in the previous list but not in the current list (removedTransports). + * @param currentTransports - The current list of transports. + * @param prevTransports - The previous list of transports. + */ +function computeTransportDiff( + currentTransports: LivekitTransportConfig[], + prevTransports: LivekitTransportConfig[], +): { + addedTransports: LivekitTransportConfig[]; + removedTransports: LivekitTransportConfig[]; +} { + const newTransports = currentTransports.filter( + (current) => + !prevTransports.some((prev) => areLivekitTransportsEqual(prev, current)), + ); + + const removedTransports = prevTransports.filter( + (prev) => + !currentTransports.some((current) => + areLivekitTransportsEqual(prev, current), + ), + ); + return { addedTransports: newTransports, removedTransports }; }