diff --git a/src/room/PCTransport.ts b/src/room/PCTransport.ts index 46eab7ea57..887237790e 100644 --- a/src/room/PCTransport.ts +++ b/src/room/PCTransport.ts @@ -73,39 +73,40 @@ export default class PCTransport extends EventEmitter { super(); this.config = config; this.mediaConstraints = mediaConstraints; - this.setupPC(); + this._pc = this.setupPC(); } private setupPC() { - this._pc = isChromiumBased() + const pc = isChromiumBased() ? // @ts-expect-error chrome allows additional media constraints to be passed into the RTCPeerConnection constructor new RTCPeerConnection(this.config, this.mediaConstraints) : new RTCPeerConnection(this.config); - this._pc.onicecandidate = (ev) => { + pc.onicecandidate = (ev) => { if (!ev.candidate) return; this.onIceCandidate?.(ev.candidate); }; - this._pc.onicecandidateerror = (ev) => { + pc.onicecandidateerror = (ev) => { this.onIceCandidateError?.(ev); }; - this._pc.oniceconnectionstatechange = () => { + pc.oniceconnectionstatechange = () => { this.onIceConnectionStateChange?.(); }; - this._pc.onsignalingstatechange = () => { + pc.onsignalingstatechange = () => { this.onSignalingStatechange?.(); }; - this._pc.onconnectionstatechange = () => { + pc.onconnectionstatechange = () => { this.onConnectionStateChange?.(this._pc?.connectionState ?? 'closed'); }; - this._pc.ondatachannel = (ev) => { + pc.ondatachannel = (ev) => { this.onDataChannel?.(ev); }; - this._pc.ontrack = (ev) => { + pc.ontrack = (ev) => { this.onTrack?.(ev); }; + return pc; } get isICEConnected(): boolean { @@ -206,10 +207,10 @@ export default class PCTransport extends EventEmitter { } // debounced negotiate interface - negotiate = debounce((onError?: (e: Error) => void) => { + negotiate = debounce(async (onError?: (e: Error) => void) => { this.emit(PCEvents.NegotiationStarted); try { - this.createAndSendOffer(); + await this.createAndSendOffer(); } catch (e) { if (onError) { onError(e as Error); @@ -413,6 +414,7 @@ export default class PCTransport extends EventEmitter { } close = () => { + console.warn('closing pc transport'); if (!this._pc) { return; } @@ -428,8 +430,7 @@ export default class PCTransport extends EventEmitter { this._pc.ontrack = null; this._pc.onconnectionstatechange = null; this._pc.oniceconnectionstatechange = null; - this._pc = null; - this.setupPC(); + this._pc = this.setupPC(); }; private async setMungedSDP(sd: RTCSessionDescriptionInit, munged?: string, remote?: boolean) { diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index d42acf63f3..5a14dd7938 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -1,5 +1,9 @@ import log from '../logger'; import PCTransport from './PCTransport'; +import { roomConnectOptionDefaults } from './defaults'; +import { ConnectionError, ConnectionErrorReason } from './errors'; +import CriticalTimers from './timers'; +import { Mutex, sleep } from './utils'; export enum PCTransportState { IDLE, @@ -11,20 +15,32 @@ export enum PCTransportState { } export class PCTransportManager { + public publisher: PCTransport; + + public subscriber: PCTransport; + + public get needsPublisher() { + return this.isPublisherConnectionRequired; + } + + public get needsSubscriber() { + return this.isSubscriberConnectionRequired; + } + private isPublisherConnectionRequired: boolean; private isSubscriberConnectionRequired: boolean; - public publisher: PCTransport; + private state: PCTransportState; - public subscriber: PCTransport; + private peerConnectionTimeout: number = roomConnectOptionDefaults.peerConnectionTimeout; - private state: PCTransportState; + private connectionLock: Mutex; - onStateChange?: (state: PCTransportState) => void; + public onStateChange?: (state: PCTransportState) => void; constructor(rtcConfig: RTCConfiguration) { - this.isPublisherConnectionRequired = true; + this.isPublisherConnectionRequired = false; this.isSubscriberConnectionRequired = true; const googConstraints = { optional: [{ googDscp: true }] }; this.publisher = new PCTransport(rtcConfig, googConstraints); @@ -38,10 +54,35 @@ export class PCTransportManager { this.subscriber.onSignalingStatechange = this.handleStateChanged; this.state = PCTransportState.IDLE; + + this.connectionLock = new Mutex(); + } + + requirePublisher(require = true) { + this.isPublisherConnectionRequired = require; + this.handleStateChanged(); + } + + requireSubscriber(require = true) { + this.isSubscriberConnectionRequired = require; + this.handleStateChanged(); } - async ensurePCTransportConnection() { - return Promise.all(this.requiredTransports?.map(this.ensureTransportConnected)); + createAndSendOffer(options?: RTCOfferOptions) { + return this.publisher.createAndSendOffer(options); + } + + async ensurePCTransportConnection(abortController?: AbortController, timeout?: number) { + const unlock = await this.connectionLock.lock(); + try { + await Promise.all( + this.requiredTransports?.map((transport) => + this.ensureTransportConnected(transport, abortController, timeout), + ), + ); + } finally { + unlock(); + } } private get requiredTransports() { @@ -71,18 +112,66 @@ export class PCTransportManager { if (previousState !== this.state) { this.onStateChange?.(this.state); + log.info('pc state', { + overall: this.state, + publisher: getPCState(this.publisher), + subscriber: getPCState(this.subscriber), + }); } - log.info('pc state', { - overall: this.state, - publisher: getPCState(this.publisher), - subscriber: getPCState(this.subscriber), - }); }; - private async ensureTransportConnected(pcTransport: PCTransport) { - if (pcTransport.getConnectionState() === 'connected') { - return true; + private async ensureTransportConnected( + pcTransport: PCTransport, + abortController?: AbortController, + timeout: number = this.peerConnectionTimeout, + ) { + const connectionState = pcTransport.getConnectionState(); + if (connectionState === 'connected') { + return; } + // if (this.pcState !== PCState.New) { + // throw new UnexpectedConnectionState( + // 'Expected peer connection to be new on initial connection', + // ); + // } + return new Promise(async (resolve, reject) => { + const abortHandler = () => { + log.warn('abort transport connection'); + CriticalTimers.clearTimeout(connectTimeout); + + reject( + new ConnectionError( + 'room connection has been cancelled', + ConnectionErrorReason.Cancelled, + ), + ); + }; + if (abortController?.signal.aborted) { + abortHandler(); + } + abortController?.signal.addEventListener('abort', abortHandler); + + const connectTimeout = CriticalTimers.setTimeout(() => { + abortController?.signal.removeEventListener('abort', abortHandler); + reject(new ConnectionError('could not establish pc connection')); + }, timeout); + + while (this.state !== PCTransportState.CONNECTED) { + await sleep(50); // FIXME we shouldn't rely on `sleep` in the connection paths, as it invokes `setTimeout` which can be drastically throttled by browser implementations + if (abortController?.signal.aborted) { + reject( + new ConnectionError( + 'room connection has been cancelled', + ConnectionErrorReason.Cancelled, + ), + ); + return; + } + } + CriticalTimers.clearTimeout(connectTimeout); + abortController?.signal.removeEventListener('abort', abortHandler); + resolve(); + }); } } diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index d0daf7324a..655621014b 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -121,10 +121,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit [key: string]: { resolve: (info: TrackInfo) => void; reject: () => void }; } = {}; - // true if publisher connection has already been established. - // this is helpful to know if we need to restart ICE on the publisher connection - private hasPublished: boolean = false; - // keep join info around for reconnect, this could be a region url private url?: string; @@ -251,16 +247,18 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit async cleanupPeerConnections() { if (this.publisher && this.publisher.getSignallingState() !== 'closed') { - this.publisher.getSenders().forEach((sender) => { + const publisher = this.publisher; + for (const sender of publisher.getSenders()) { try { // TODO: react-native-webrtc doesn't have removeTrack yet. - if (this.publisher?.canRemoveTrack()) { - this.publisher?.removeTrack(sender); + if (publisher.canRemoveTrack()) { + console.log('removing track'); + await publisher.removeTrack(sender); } } catch (e) { log.warn('could not removeTrack', { error: e }); } - }); + } } if (this.publisher) { this.publisher.close(); @@ -270,7 +268,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.subscriber.close(); this.subscriber = undefined; } - this.hasPublished = false; + this.primaryTransport = undefined; const dcCleanup = (dc: RTCDataChannel | undefined) => { @@ -1011,7 +1009,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.subscriber.restartingIce = true; // only restart publisher if it's needed - if (this.hasPublished) { + if (this.pcManager?.needsPublisher) { await this.publisher.createAndSendOffer({ iceRestart: true }); } @@ -1029,72 +1027,58 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } async waitForPCInitialConnection(timeout?: number, abortController?: AbortController) { - if (this.pcState === PCState.Connected) { - return; - } - if (this.pcState !== PCState.New) { - throw new UnexpectedConnectionState( - 'Expected peer connection to be new on initial connection', - ); - } - return new Promise((resolve, reject) => { - const abortHandler = () => { - log.warn('closing engine'); - CriticalTimers.clearTimeout(connectTimeout); - - reject( - new ConnectionError( - 'room connection has been cancelled', - ConnectionErrorReason.Cancelled, - ), - ); - }; - if (abortController?.signal.aborted) { - abortHandler(); - } - abortController?.signal.addEventListener('abort', abortHandler); - const onConnected = () => { - CriticalTimers.clearTimeout(connectTimeout); - abortController?.signal.removeEventListener('abort', abortHandler); - resolve(); - }; - const connectTimeout = CriticalTimers.setTimeout(() => { - this.off(EngineEvent.Connected, onConnected); - reject(new ConnectionError('could not establish pc connection')); - }, timeout ?? this.peerConnectionTimeout); - this.once(EngineEvent.Connected, onConnected); - }); + await this.pcManager?.ensurePCTransportConnection(abortController, timeout); + // if (this.pcState === PCState.Connected) { + // return; + // } + // if (this.pcState !== PCState.New) { + // throw new UnexpectedConnectionState( + // 'Expected peer connection to be new on initial connection', + // ); + // } + // return new Promise((resolve, reject) => { + // const abortHandler = () => { + // log.warn('closing engine'); + // CriticalTimers.clearTimeout(connectTimeout); + + // reject( + // new ConnectionError( + // 'room connection has been cancelled', + // ConnectionErrorReason.Cancelled, + // ), + // ); + // }; + // if (abortController?.signal.aborted) { + // abortHandler(); + // } + // abortController?.signal.addEventListener('abort', abortHandler); + // const onConnected = () => { + // CriticalTimers.clearTimeout(connectTimeout); + // abortController?.signal.removeEventListener('abort', abortHandler); + // resolve(); + // }; + // const connectTimeout = CriticalTimers.setTimeout(() => { + // this.off(EngineEvent.Connected, onConnected); + // reject(new ConnectionError('could not establish pc connection')); + // }, timeout ?? this.peerConnectionTimeout); + // this.once(EngineEvent.Connected, onConnected); + // }); } private async waitForPCReconnected() { - const startTime = Date.now(); - let now = startTime; + // const startTime = Date.now(); + // let now = startTime; this.pcState = PCState.Reconnecting; log.debug('waiting for peer connection to reconnect'); - while (now - startTime < this.peerConnectionTimeout) { - if (this.primaryTransport === undefined) { - // we can abort early, connection is hosed - break; - } else if ( - // on Safari, we don't get a connectionstatechanged event during ICE restart - // this means we'd have to check its status manually and update address - // manually - now - startTime > minReconnectWait && - this.primaryTransport?.getConnectionState() === 'connected' && - (!this.hasPublished || this.publisher?.getConnectionState() === 'connected') - ) { - this.pcState = PCState.Connected; - } - if (this.pcState === PCState.Connected) { - return; - } - await sleep(100); - now = Date.now(); + try { + await sleep(minReconnectWait); // FIXME setTimeout again not ideal for a connection critical path + await this.pcManager?.ensurePCTransportConnection(undefined, this.peerConnectionTimeout); + this.pcState = PCState.Connected; + } catch (e: any) { + // TODO do we need a `failed` state here for the PC? + throw new ConnectionError(`could not establish PC connection, ${e.message}`); } - - // have not reconnected, throw - throw new ConnectionError('could not establish PC connection'); } waitForRestarted = () => { @@ -1207,7 +1191,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } // also verify publisher connection if it's needed or different - if (this.hasPublished && this.subscriberPrimary) { + if (this.pcManager?.needsPublisher && this.subscriberPrimary) { if (!this.publisher) { return false; } @@ -1235,7 +1219,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit return; } - this.hasPublished = true; + this.pcManager?.requirePublisher(); const handleClosed = () => { log.debug('engine disconnected while negotiation was ongoing'); diff --git a/src/room/utils.ts b/src/room/utils.ts index 3605da678c..06e7086d1b 100644 --- a/src/room/utils.ts +++ b/src/room/utils.ts @@ -2,6 +2,7 @@ import { ClientInfo, ClientInfo_SDK } from '../proto/livekit_models_pb'; import type { DetectableBrowser } from '../utils/browserParser'; import { getBrowser } from '../utils/browserParser'; import { protocolVersion, version } from '../version'; +import CriticalTimers from './timers'; import type LocalAudioTrack from './track/LocalAudioTrack'; import type RemoteAudioTrack from './track/RemoteAudioTrack'; import { VideoCodec, videoCodecs } from './track/options'; @@ -21,7 +22,7 @@ export function unpackStreamId(packed: string): string[] { } export async function sleep(duration: number): Promise { - return new Promise((resolve) => setTimeout(resolve, duration)); + return new Promise((resolve) => CriticalTimers.setTimeout(resolve, duration)); } /** @internal */