Skip to content

Commit

Permalink
WIP managed transport connection
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasIO committed Oct 26, 2023
1 parent 2999699 commit a3a4b82
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 100 deletions.
27 changes: 14 additions & 13 deletions src/room/PCTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,39 +73,40 @@ export default class PCTransport extends EventEmitter {
super();
this.config = config;
this.mediaConstraints = mediaConstraints;
this.setupPC();
this._pc = this.setupPC();
}

private setupPC() {
this._pc = isChromiumBased()
const pc = isChromiumBased()
? // @ts-expect-error chrome allows additional media constraints to be passed into the RTCPeerConnection constructor
new RTCPeerConnection(this.config, this.mediaConstraints)
: new RTCPeerConnection(this.config);
this._pc.onicecandidate = (ev) => {
pc.onicecandidate = (ev) => {
if (!ev.candidate) return;
this.onIceCandidate?.(ev.candidate);
};
this._pc.onicecandidateerror = (ev) => {
pc.onicecandidateerror = (ev) => {
this.onIceCandidateError?.(ev);
};

this._pc.oniceconnectionstatechange = () => {
pc.oniceconnectionstatechange = () => {
this.onIceConnectionStateChange?.();
};

this._pc.onsignalingstatechange = () => {
pc.onsignalingstatechange = () => {
this.onSignalingStatechange?.();
};

this._pc.onconnectionstatechange = () => {
pc.onconnectionstatechange = () => {
this.onConnectionStateChange?.(this._pc?.connectionState ?? 'closed');
};
this._pc.ondatachannel = (ev) => {
pc.ondatachannel = (ev) => {
this.onDataChannel?.(ev);
};
this._pc.ontrack = (ev) => {
pc.ontrack = (ev) => {
this.onTrack?.(ev);
};
return pc;
}

get isICEConnected(): boolean {
Expand Down Expand Up @@ -206,10 +207,10 @@ export default class PCTransport extends EventEmitter {
}

// debounced negotiate interface
negotiate = debounce((onError?: (e: Error) => void) => {
negotiate = debounce(async (onError?: (e: Error) => void) => {
this.emit(PCEvents.NegotiationStarted);
try {
this.createAndSendOffer();
await this.createAndSendOffer();
} catch (e) {
if (onError) {
onError(e as Error);
Expand Down Expand Up @@ -413,6 +414,7 @@ export default class PCTransport extends EventEmitter {
}

close = () => {
console.warn('closing pc transport');
if (!this._pc) {
return;
}
Expand All @@ -428,8 +430,7 @@ export default class PCTransport extends EventEmitter {
this._pc.ontrack = null;
this._pc.onconnectionstatechange = null;
this._pc.oniceconnectionstatechange = null;
this._pc = null;
this.setupPC();
this._pc = this.setupPC();
};

private async setMungedSDP(sd: RTCSessionDescriptionInit, munged?: string, remote?: boolean) {
Expand Down
119 changes: 104 additions & 15 deletions src/room/PCTransportManager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import log from '../logger';
import PCTransport from './PCTransport';
import { roomConnectOptionDefaults } from './defaults';
import { ConnectionError, ConnectionErrorReason } from './errors';
import CriticalTimers from './timers';
import { Mutex, sleep } from './utils';

export enum PCTransportState {
IDLE,
Expand All @@ -11,20 +15,32 @@ export enum PCTransportState {
}

export class PCTransportManager {
public publisher: PCTransport;

public subscriber: PCTransport;

public get needsPublisher() {
return this.isPublisherConnectionRequired;
}

public get needsSubscriber() {
return this.isSubscriberConnectionRequired;
}

private isPublisherConnectionRequired: boolean;

private isSubscriberConnectionRequired: boolean;

public publisher: PCTransport;
private state: PCTransportState;

public subscriber: PCTransport;
private peerConnectionTimeout: number = roomConnectOptionDefaults.peerConnectionTimeout;

private state: PCTransportState;
private connectionLock: Mutex;

onStateChange?: (state: PCTransportState) => void;
public onStateChange?: (state: PCTransportState) => void;

constructor(rtcConfig: RTCConfiguration) {
this.isPublisherConnectionRequired = true;
this.isPublisherConnectionRequired = false;
this.isSubscriberConnectionRequired = true;
const googConstraints = { optional: [{ googDscp: true }] };
this.publisher = new PCTransport(rtcConfig, googConstraints);
Expand All @@ -38,10 +54,35 @@ export class PCTransportManager {
this.subscriber.onSignalingStatechange = this.handleStateChanged;

this.state = PCTransportState.IDLE;

this.connectionLock = new Mutex();
}

requirePublisher(require = true) {
this.isPublisherConnectionRequired = require;
this.handleStateChanged();
}

requireSubscriber(require = true) {
this.isSubscriberConnectionRequired = require;
this.handleStateChanged();
}

async ensurePCTransportConnection() {
return Promise.all(this.requiredTransports?.map(this.ensureTransportConnected));
createAndSendOffer(options?: RTCOfferOptions) {
return this.publisher.createAndSendOffer(options);
}

async ensurePCTransportConnection(abortController?: AbortController, timeout?: number) {
const unlock = await this.connectionLock.lock();
try {
await Promise.all(
this.requiredTransports?.map((transport) =>
this.ensureTransportConnected(transport, abortController, timeout),
),
);
} finally {
unlock();
}
}

private get requiredTransports() {
Expand Down Expand Up @@ -71,18 +112,66 @@ export class PCTransportManager {

if (previousState !== this.state) {
this.onStateChange?.(this.state);
log.info('pc state', {
overall: this.state,
publisher: getPCState(this.publisher),
subscriber: getPCState(this.subscriber),
});
}
log.info('pc state', {
overall: this.state,
publisher: getPCState(this.publisher),
subscriber: getPCState(this.subscriber),
});
};

private async ensureTransportConnected(pcTransport: PCTransport) {
if (pcTransport.getConnectionState() === 'connected') {
return true;
private async ensureTransportConnected(
pcTransport: PCTransport,
abortController?: AbortController,
timeout: number = this.peerConnectionTimeout,
) {
const connectionState = pcTransport.getConnectionState();
if (connectionState === 'connected') {
return;
}
// if (this.pcState !== PCState.New) {
// throw new UnexpectedConnectionState(
// 'Expected peer connection to be new on initial connection',
// );
// }
return new Promise<void>(async (resolve, reject) => {
const abortHandler = () => {
log.warn('abort transport connection');
CriticalTimers.clearTimeout(connectTimeout);

reject(
new ConnectionError(
'room connection has been cancelled',
ConnectionErrorReason.Cancelled,
),
);
};
if (abortController?.signal.aborted) {
abortHandler();
}
abortController?.signal.addEventListener('abort', abortHandler);

const connectTimeout = CriticalTimers.setTimeout(() => {
abortController?.signal.removeEventListener('abort', abortHandler);
reject(new ConnectionError('could not establish pc connection'));
}, timeout);

while (this.state !== PCTransportState.CONNECTED) {
await sleep(50); // FIXME we shouldn't rely on `sleep` in the connection paths, as it invokes `setTimeout` which can be drastically throttled by browser implementations
if (abortController?.signal.aborted) {
reject(
new ConnectionError(
'room connection has been cancelled',
ConnectionErrorReason.Cancelled,
),
);
return;
}
}
CriticalTimers.clearTimeout(connectTimeout);
abortController?.signal.removeEventListener('abort', abortHandler);
resolve();
});
}
}

Expand Down
Loading

0 comments on commit a3a4b82

Please sign in to comment.