From 7631712a0eceaf2734c3206d0f135c4d1a095927 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Thu, 26 Oct 2023 09:47:02 +0200 Subject: [PATCH] Make peerconnection private on PCTransport (#903) * Make peerconnection private on PCTransport * revert enablesimulcast for this branch * Create moody-nails-flash.md * fix connection check --- .changeset/moody-nails-flash.md | 5 + src/connectionHelper/checks/webrtc.ts | 2 +- src/room/PCTransport.ts | 117 +++++++++++++++++++- src/room/RTCEngine.ts | 134 +++++++++-------------- src/room/Room.ts | 10 +- src/room/participant/LocalParticipant.ts | 6 +- 6 files changed, 178 insertions(+), 96 deletions(-) create mode 100644 .changeset/moody-nails-flash.md diff --git a/.changeset/moody-nails-flash.md b/.changeset/moody-nails-flash.md new file mode 100644 index 0000000000..ac429345f7 --- /dev/null +++ b/.changeset/moody-nails-flash.md @@ -0,0 +1,5 @@ +--- +"livekit-client": patch +--- + +Make peerconnection private on PCTransport diff --git a/src/connectionHelper/checks/webrtc.ts b/src/connectionHelper/checks/webrtc.ts index 6461ef06a9..9082d847c7 100644 --- a/src/connectionHelper/checks/webrtc.ts +++ b/src/connectionHelper/checks/webrtc.ts @@ -39,7 +39,7 @@ export class WebRTCCheck extends Checker { }; if (this.room.engine.subscriber) { - this.room.engine.subscriber.pc.onicecandidateerror = (ev) => { + this.room.engine.subscriber.onIceCandidateError = (ev) => { if (ev instanceof RTCPeerConnectionIceErrorEvent) { this.appendWarning( `error with ICE candidate: ${ev.errorCode} ${ev.errorText} ${ev.url}`, diff --git a/src/room/PCTransport.ts b/src/room/PCTransport.ts index 62fa4fe5ed..e6fb95ad95 100644 --- a/src/room/PCTransport.ts +++ b/src/room/PCTransport.ts @@ -32,7 +32,7 @@ export const PCEvents = { export default class PCTransport extends EventEmitter { private _pc: RTCPeerConnection | null; - public get pc() { + private get pc() { if (this._pc) return this._pc; throw new UnexpectedConnectionState('Expected peer connection to be available'); } @@ -51,12 +51,38 @@ export default class PCTransport extends EventEmitter { onOffer?: (offer: RTCSessionDescriptionInit) => void; + onIceCandidate?: (candidate: RTCIceCandidate) => void; + + onIceCandidateError?: (ev: Event) => void; + + onConnectionStateChange?: (state: RTCPeerConnectionState) => void; + + onDataChannel?: (ev: RTCDataChannelEvent) => void; + + onTrack?: (ev: RTCTrackEvent) => void; + constructor(config?: RTCConfiguration, mediaConstraints: Record = {}) { super(); this._pc = isChromiumBased() ? // @ts-expect-error chrome allows additional media constraints to be passed into the RTCPeerConnection constructor new RTCPeerConnection(config, mediaConstraints) : new RTCPeerConnection(config); + this._pc.onicecandidate = (ev) => { + if (!ev.candidate) return; + this.onIceCandidate?.(ev.candidate); + }; + this._pc.onicecandidateerror = (ev) => { + this.onIceCandidateError?.(ev); + }; + this._pc.onconnectionstatechange = () => { + this.onConnectionStateChange?.(this._pc?.connectionState ?? 'closed'); + }; + this._pc.ondatachannel = (ev) => { + this.onDataChannel?.(ev); + }; + this._pc.ontrack = (ev) => { + this.onTrack?.(ev); + }; } get isICEConnected(): boolean { @@ -270,10 +296,99 @@ export default class PCTransport extends EventEmitter { return answer; } + createDataChannel(label: string, dataChannelDict: RTCDataChannelInit) { + return this.pc.createDataChannel(label, dataChannelDict); + } + + addTransceiver(mediaStreamTrack: MediaStreamTrack, transceiverInit: RTCRtpTransceiverInit) { + return this.pc.addTransceiver(mediaStreamTrack, transceiverInit); + } + + addTrack(track: MediaStreamTrack) { + return this.pc.addTrack(track); + } + setTrackCodecBitrate(info: TrackBitrateInfo) { this.trackBitrates.push(info); } + setConfiguration(rtcConfig: RTCConfiguration) { + return this.pc.setConfiguration(rtcConfig); + } + + canRemoveTrack(): boolean { + return !!this.pc.removeTrack; + } + + removeTrack(sender: RTCRtpSender) { + return this.pc.removeTrack(sender); + } + + getConnectionState() { + return this.pc.connectionState; + } + + getICEConnectionState() { + return this.pc.iceConnectionState; + } + + getSignallingState() { + return this.pc.signalingState; + } + + getTransceivers() { + return this.pc.getTransceivers(); + } + + getSenders() { + return this.pc.getSenders(); + } + + getLocalDescription() { + return this.pc.localDescription; + } + + getRemoteDescription() { + return this.pc.remoteDescription; + } + + async getConnectedAddress(): Promise { + if (!this._pc) { + return; + } + let selectedCandidatePairId = ''; + const candidatePairs = new Map(); + // id -> candidate ip + const candidates = new Map(); + const stats: RTCStatsReport = await this._pc.getStats(); + stats.forEach((v) => { + switch (v.type) { + case 'transport': + selectedCandidatePairId = v.selectedCandidatePairId; + break; + case 'candidate-pair': + if (selectedCandidatePairId === '' && v.selected) { + selectedCandidatePairId = v.id; + } + candidatePairs.set(v.id, v); + break; + case 'remote-candidate': + candidates.set(v.id, `${v.address}:${v.port}`); + break; + default: + } + }); + + if (selectedCandidatePairId === '') { + return undefined; + } + const selectedID = candidatePairs.get(selectedCandidatePairId)?.remoteCandidateId; + if (selectedID === undefined) { + return undefined; + } + return candidates.get(selectedID); + } + close() { if (!this._pc) { return; diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 8203dc518c..83ccf15498 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -108,7 +108,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit private subscriberPrimary: boolean = false; - private primaryPC?: RTCPeerConnection; + private primaryTransport?: PCTransport; private pcState: PCState = PCState.New; @@ -247,12 +247,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } async cleanupPeerConnections() { - if (this.publisher && this.publisher.pc.signalingState !== 'closed') { - this.publisher.pc.getSenders().forEach((sender) => { + if (this.publisher && this.publisher.getSignallingState() !== 'closed') { + this.publisher.getSenders().forEach((sender) => { try { // TODO: react-native-webrtc doesn't have removeTrack yet. - if (this.publisher?.pc.removeTrack) { - this.publisher?.pc.removeTrack(sender); + if (this.publisher?.canRemoveTrack()) { + this.publisher?.removeTrack(sender); } } catch (e) { log.warn('could not removeTrack', { error: e }); @@ -268,7 +268,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.subscriber = undefined; } this.hasPublished = false; - this.primaryPC = undefined; + this.primaryTransport = undefined; const dcCleanup = (dc: RTCDataChannel | undefined) => { if (!dc) return; @@ -336,7 +336,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit delete this.pendingTrackResolvers[sender.track.id]; } try { - this.publisher?.pc.removeTrack(sender); + this.publisher?.removeTrack(sender); return true; } catch (e: unknown) { log.warn('failed to remove track', { error: e, method: 'removeTrack' }); @@ -353,10 +353,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } async getConnectedServerAddress(): Promise { - if (this.primaryPC === undefined) { + if (this.primaryTransport === undefined) { return undefined; } - return getConnectedAddress(this.primaryPC); + return this.primaryTransport.getConnectedAddress(); } /* @internal */ @@ -387,40 +387,38 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.emit(EngineEvent.TransportsCreated, this.publisher, this.subscriber); - this.publisher.pc.onicecandidate = (ev) => { - if (!ev.candidate) return; - log.trace('adding ICE candidate for peer', ev.candidate); - this.client.sendIceCandidate(ev.candidate, SignalTarget.PUBLISHER); + this.publisher.onIceCandidate = (candidate) => { + log.trace('adding ICE candidate for peer', candidate); + this.client.sendIceCandidate(candidate, SignalTarget.PUBLISHER); }; - this.subscriber.pc.onicecandidate = (ev) => { - if (!ev.candidate) return; - this.client.sendIceCandidate(ev.candidate, SignalTarget.SUBSCRIBER); + this.subscriber.onIceCandidate = (candidate) => { + this.client.sendIceCandidate(candidate, SignalTarget.SUBSCRIBER); }; this.publisher.onOffer = (offer) => { this.client.sendOffer(offer); }; - let primaryPC = this.publisher.pc; - let secondaryPC = this.subscriber.pc; + let primaryTransport = this.publisher; + let secondaryTransport = this.subscriber; let subscriberPrimary = joinResponse.subscriberPrimary; if (subscriberPrimary) { - primaryPC = this.subscriber.pc; - secondaryPC = this.publisher.pc; + primaryTransport = this.subscriber; + secondaryTransport = this.publisher; // in subscriber primary mode, server side opens sub data channels. - this.subscriber.pc.ondatachannel = this.handleDataChannel; + this.subscriber.onDataChannel = this.handleDataChannel; } - this.primaryPC = primaryPC; - primaryPC.onconnectionstatechange = async () => { - log.debug(`primary PC state changed ${primaryPC.connectionState}`); - if (primaryPC.connectionState === 'connected') { + this.primaryTransport = primaryTransport; + primaryTransport.onConnectionStateChange = async (connectionState) => { + log.debug(`primary PC state changed ${connectionState}`); + if (connectionState === 'connected') { const shouldEmit = this.pcState === PCState.New; this.pcState = PCState.Connected; if (shouldEmit) { this.emit(EngineEvent.Connected, joinResponse); } - } else if (primaryPC.connectionState === 'failed') { + } else if (connectionState === 'failed') { // on Safari, PeerConnection will switch to 'disconnected' during renegotiation if (this.pcState === PCState.Connected) { this.pcState = PCState.Disconnected; @@ -434,10 +432,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } } }; - secondaryPC.onconnectionstatechange = async () => { - log.debug(`secondary PC state changed ${secondaryPC.connectionState}`); + secondaryTransport.onConnectionStateChange = async (connectionState) => { + log.debug(`secondary PC state changed ${connectionState}`); // also reconnect if secondary peerconnection fails - if (secondaryPC.connectionState === 'failed') { + if (connectionState === 'failed') { this.handleDisconnect( 'secondary peerconnection', subscriberPrimary @@ -447,7 +445,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } }; - this.subscriber.pc.ontrack = (ev: RTCTrackEvent) => { + this.subscriber.onTrack = (ev: RTCTrackEvent) => { this.emit(EngineEvent.MediaTrackAdded, ev.track, ev.streams[0], ev.receiver); }; @@ -462,7 +460,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } log.debug('received server answer', { RTCSdpType: sd.type, - signalingState: this.publisher.pc.signalingState.toString(), + signalingState: this.publisher.getSignallingState().toString(), }); await this.publisher.setRemoteDescription(sd); }; @@ -487,7 +485,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } log.debug('received server offer', { RTCSdpType: sd.type, - signalingState: this.subscriber.pc.signalingState.toString(), + signalingState: this.subscriber.getSignallingState().toString(), }); await this.subscriber.setRemoteDescription(sd); @@ -518,7 +516,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.client.onLeave = (leave?: LeaveRequest) => { if (leave?.canReconnect) { this.fullReconnectOnNext = true; - this.primaryPC = undefined; + this.primaryTransport = undefined; // reconnect immediately instead of waiting for next attempt this.handleDisconnect(leaveReconnect); } else { @@ -579,12 +577,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } // create data channels - this.lossyDC = this.publisher.pc.createDataChannel(lossyDataChannel, { + this.lossyDC = this.publisher.createDataChannel(lossyDataChannel, { // will drop older packets that arrive ordered: true, maxRetransmits: 0, }); - this.reliableDC = this.publisher.pc.createDataChannel(reliableDataChannel, { + this.reliableDC = this.publisher.createDataChannel(reliableDataChannel, { ordered: true, }); @@ -765,7 +763,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit transceiverInit.sendEncodings = encodings; } // addTransceiver for react-native is async. web is synchronous, but await won't effect it. - const transceiver = await this.publisher.pc.addTransceiver( + const transceiver = await this.publisher.addTransceiver( track.mediaStreamTrack, transceiverInit, ); @@ -791,7 +789,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit transceiverInit.sendEncodings = encodings; } // addTransceiver for react-native is async. web is synchronous, but await won't effect it. - const transceiver = await this.publisher.pc.addTransceiver( + const transceiver = await this.publisher.addTransceiver( simulcastTrack.mediaStreamTrack, transceiverInit, ); @@ -807,7 +805,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit if (!this.publisher) { throw new UnexpectedConnectionState('publisher is closed'); } - return this.publisher.pc.addTrack(track); + return this.publisher.addTrack(track); } // websocket reconnect behavior. if websocket is interrupted, and the PeerConnection @@ -872,7 +870,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.clientConfiguration?.resumeConnection === ClientConfigSetting.DISABLED || // signaling state could change to closed due to hardware sleep // those connections cannot be resumed - (this.primaryPC?.signalingState ?? 'closed') === 'closed' + (this.primaryTransport?.getSignallingState() ?? 'closed') === 'closed' ) { this.fullReconnectOnNext = true; } @@ -999,8 +997,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit const res = await this.client.reconnect(this.url, this.token, this.participantSid, reason); if (res) { const rtcConfig = this.makeRTCConfiguration(res); - this.publisher.pc.setConfiguration(rtcConfig); - this.subscriber.pc.setConfiguration(rtcConfig); + this.publisher.setConfiguration(rtcConfig); + this.subscriber.setConfiguration(rtcConfig); } } catch (e) { let message = ''; @@ -1084,7 +1082,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit log.debug('waiting for peer connection to reconnect'); while (now - startTime < this.peerConnectionTimeout) { - if (this.primaryPC === undefined) { + if (this.primaryTransport === undefined) { // we can abort early, connection is hosed break; } else if ( @@ -1092,8 +1090,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit // this means we'd have to check its status manually and update address // manually now - startTime > minReconnectWait && - this.primaryPC?.connectionState === 'connected' && - (!this.hasPublished || this.publisher?.pc.connectionState === 'connected') + this.primaryTransport?.getConnectionState() === 'connected' && + (!this.hasPublished || this.publisher?.getConnectionState() === 'connected') ) { this.pcState = PCState.Connected; } @@ -1172,7 +1170,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit if ( !subscriber && !this.publisher?.isICEConnected && - this.publisher?.pc.iceConnectionState !== 'checking' + this.publisher?.getICEConnectionState() !== 'checking' ) { // start negotiation this.negotiate(); @@ -1196,7 +1194,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } throw new ConnectionError( - `could not establish ${transportName} connection, state: ${transport.pc.iceConnectionState}`, + `could not establish ${transportName} connection, state: ${transport.getICEConnectionState()}`, ); } @@ -1207,12 +1205,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit /* @internal */ verifyTransport(): boolean { // primary connection - if (!this.primaryPC) { + if (!this.primaryTransport) { return false; } if ( - this.primaryPC.connectionState === 'closed' || - this.primaryPC.connectionState === 'failed' + this.primaryTransport.getConnectionState() === 'closed' || + this.primaryTransport.getConnectionState() === 'failed' ) { return false; } @@ -1223,8 +1221,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit return false; } if ( - this.publisher.pc.connectionState === 'closed' || - this.publisher.pc.connectionState === 'failed' + this.publisher.getConnectionState() === 'closed' || + this.publisher.getConnectionState() === 'failed' ) { return false; } @@ -1355,40 +1353,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } } -async function getConnectedAddress(pc: RTCPeerConnection): Promise { - let selectedCandidatePairId = ''; - const candidatePairs = new Map(); - // id -> candidate ip - const candidates = new Map(); - const stats: RTCStatsReport = await pc.getStats(); - stats.forEach((v) => { - switch (v.type) { - case 'transport': - selectedCandidatePairId = v.selectedCandidatePairId; - break; - case 'candidate-pair': - if (selectedCandidatePairId === '' && v.selected) { - selectedCandidatePairId = v.id; - } - candidatePairs.set(v.id, v); - break; - case 'remote-candidate': - candidates.set(v.id, `${v.address}:${v.port}`); - break; - default: - } - }); - - if (selectedCandidatePairId === '') { - return undefined; - } - const selectedID = candidatePairs.get(selectedCandidatePairId)?.remoteCandidateId; - if (selectedID === undefined) { - return undefined; - } - return candidates.get(selectedID); -} - class SignalReconnectError extends Error {} export type EngineEventCallbacks = { diff --git a/src/room/Room.ts b/src/room/Room.ts index 0c49af49ee..b4688e51cf 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -1546,14 +1546,12 @@ class Room extends (EventEmitter as new () => TypedEmitter) } private sendSyncState() { - if ( - this.engine.subscriber === undefined || - this.engine.subscriber.pc.localDescription === null - ) { + const previousAnswer = this.engine.subscriber?.getLocalDescription(); + const previousOffer = this.engine.subscriber?.getRemoteDescription(); + + if (!previousAnswer) { return; } - const previousAnswer = this.engine.subscriber.pc.localDescription; - const previousOffer = this.engine.subscriber.pc.remoteDescription; /* 1. autosubscribe on, so subscribed tracks = all tracks - unsub tracks, in this case, we send unsub tracks, so server add all tracks to this diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index c9f123b8c5..e93d8024a9 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -796,7 +796,7 @@ export default class LocalParticipant extends Participant { fix the issue. */ let trackTransceiver: RTCRtpTransceiver | undefined = undefined; - for (const transceiver of this.engine.publisher.pc.getTransceivers()) { + for (const transceiver of this.engine.publisher.getTransceivers()) { if (transceiver.sender === track.sender) { trackTransceiver = transceiver; break; @@ -943,11 +943,11 @@ export default class LocalParticipant extends Participant { track.sender = undefined; if ( this.engine.publisher && - this.engine.publisher.pc.connectionState !== 'closed' && + this.engine.publisher.getConnectionState() !== 'closed' && trackSender ) { try { - for (const transceiver of this.engine.publisher.pc.getTransceivers()) { + for (const transceiver of this.engine.publisher.getTransceivers()) { // if sender is not currently sending (after replaceTrack(null)) // removeTrack would have no effect. // to ensure we end up successfully removing the track, manually set