Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unexpected closing of signal connection during reconnect #994

Merged
merged 8 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/silver-shoes-rest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-client": patch
---

Fix for recovering SignalChannel closing during reconnect
6 changes: 5 additions & 1 deletion example/index.html
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<!DOCTYPE html>
<!doctype html>
<html lang="en">
<head>
<title>Livekit test app</title>
Expand Down Expand Up @@ -152,6 +152,10 @@ <h2>Livekit Sample App</h2>
<option value="signal-reconnect">Signal reconnect</option>
<option value="full-reconnect">Full reconnect</option>
<option value="resume-reconnect">Resume reconnect</option>
<option value="disconnect-signal-on-resume">Signal disconnect on resume</option>
<option value="disconnect-signal-on-resume-no-messages">
Signal disconnect on resume no msgs
</option>
<option value="speaker">Speaker update</option>
<option value="node-failure">Node failure</option>
<option value="server-leave">Server booted</option>
Expand Down
2 changes: 1 addition & 1 deletion protocol
Submodule protocol updated 88 files
+1 −1 .github/workflows/buildtest.yaml
+4 −10 .github/workflows/generate.yaml
+0 −24 .idea/watcherTasks.xml
+7 −0 auth/accesstoken.go
+20 −0 auth/accesstoken_test.go
+38 −0 auth/grants.go
+18 −0 auth/grants_test.go
+3 −3 bootstrap.sh
+2 −0 egress/token.go
+27 −27 go.mod
+59 −56 go.sum
+83 −66 infra/link.pb.go
+3 −0 infra/link.proto
+1 −1 infra/link_grpc.pb.go
+2 −0 ingress/token.go
+1,250 −0 livekit/livekit_agent.pb.go
+520 −190 livekit/livekit_analytics.pb.go
+74 −3 livekit/livekit_analytics_grpc.pb.go
+343 −330 livekit/livekit_egress.pb.go
+181 −179 livekit/livekit_egress.twirp.go
+2 −2 livekit/livekit_ingress.pb.go
+2 −2 livekit/livekit_internal.pb.go
+646 −530 livekit/livekit_models.pb.go
+2 −2 livekit/livekit_room.pb.go
+85 −43 livekit/livekit_rtc.pb.go
+1,832 −0 livekit/livekit_sip.pb.go
+3,096 −0 livekit/livekit_sip.twirp.go
+2 −2 livekit/livekit_webhook.pb.go
+111 −0 livekit_agent.proto
+26 −1 livekit_analytics.proto
+3 −0 livekit_egress.proto
+20 −2 livekit_models.proto
+5 −1 livekit_rtc.proto
+230 −0 livekit_sip.proto
+104 −0 logger/deferred.go
+26 −0 logger/deferred_test.go
+26 −5 logger/fieldsampler_test.go
+28 −0 logger/logger.go
+1 −1 logger/pionlogger/logger.go
+5 −0 magefile.go
+14 −6 redis/redis.go
+253 −0 rpc/agent.pb.go
+51 −0 rpc/agent.proto
+179 −0 rpc/agent.psrpc.go
+20 −19 rpc/egress.pb.go
+1 −0 rpc/egress.proto
+76 −72 rpc/egress.psrpc.go
+39 −15 rpc/egress_client.go
+2 −2 rpc/ingress.pb.go
+30 −29 rpc/ingress.psrpc.go
+5 −10 rpc/ingress_client.go
+613 −99 rpc/io.pb.go
+69 −2 rpc/io.proto
+124 −66 rpc/io.psrpc.go
+119 −0 rpc/participant.pb.go
+52 −0 rpc/participant.proto
+225 −0 rpc/participant.psrpc.go
+41 −89 rpc/room.pb.go
+0 −40 rpc/room.proto
+53 −160 rpc/room.psrpc.go
+380 −0 rpc/rpcfakes/fake_typed_participant_client.go
+0 −340 rpc/rpcfakes/fake_typed_room_client.go
+2 −2 rpc/signal.pb.go
+8 −7 rpc/signal.psrpc.go
+436 −0 rpc/sip.pb.go
+60 −0 rpc/sip.proto
+169 −0 rpc/sip.psrpc.go
+42 −0 rpc/sip_client.go
+95 −8 rpc/typed_api.go
+301 −0 sip/sip.go
+405 −0 sip/sip_test.go
+39 −0 sip/token.go
+32 −0 utils/connectionquality.go
+117 −5 utils/cpu.go
+10 −0 utils/cpu_linux.go
+10 −0 utils/cpu_nonlinux.go
+20 −17 utils/id.go
+33 −14 utils/lock_tracker.go
+83 −17 utils/lock_tracker_test.go
+61 −0 utils/multitonservice.go
+108 −0 utils/multitonservice_test.go
+22 −0 utils/must.go
+23 −2 utils/protoproxy.go
+82 −57 utils/protoproxy_test.go
+191 −0 utils/rtpstats.go
+49 −1 utils/timed_version.go
+4 −0 utils/timed_version_test.go
+1 −1 webhook/url_notifier.go
24 changes: 22 additions & 2 deletions src/api/SignalClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ export class SignalClient {
);
}

private get isEstablishingConnection() {
return (
this.state === SignalConnectionState.CONNECTING ||
this.state === SignalConnectionState.RECONNECTING
);
}

private options?: SignalOptions;

private pingTimeout: ReturnType<typeof setTimeout> | undefined;
Expand Down Expand Up @@ -339,7 +346,10 @@ export class SignalClient {
this.startPingInterval();
}
resolve(resp.message.value);
} else if (this.state === SignalConnectionState.RECONNECTING) {
} else if (
this.state === SignalConnectionState.RECONNECTING &&
resp.message.case !== 'leave'
) {
// in reconnecting, any message received means signal reconnected
this.state = SignalConnectionState.CONNECTED;
abortSignal?.removeEventListener('abort', abortHandler);
Expand All @@ -350,6 +360,8 @@ export class SignalClient {
resolve();
shouldProcessMessage = true;
}
} else if (this.isEstablishingConnection && resp.message.case === 'leave') {
reject(new ConnectionError('Received leave request while trying to (re)connect'));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this is the intended behaviour:

This means if a user is reconnecting and the very first message they receive is not a ReconnectResponse but rather a leave request, we mark the reconnection attempt as failed, which will result in a new reconnection attempt happening.

Copy link
Contributor

@boks1971 boks1971 Jan 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intended behaviour there is for the client to do a full reconnect. Server sends LeaveRequest when it cannot resume. So, a full reconnect is needed. Does the new reconnection attempt a full reconnect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, next connection attempt would be full reconnect in that case!

} else if (!opts.reconnect) {
// non-reconnect case, should receive join response first
reject(
Expand All @@ -370,7 +382,15 @@ export class SignalClient {
};

this.ws.onclose = (ev: CloseEvent) => {
this.log.warn(`websocket closed`, { ...this.logContext, reason: ev.reason });
if (this.isEstablishingConnection) {
reject(new ConnectionError('Websocket got closed during a (re)connection attempt'));
}

this.log.warn(`websocket closed`, {
...this.logContext,
reason: ev.reason,
state: this.state,
});
this.handleOnClose(ev.reason);
};
} finally {
Expand Down
18 changes: 18 additions & 0 deletions src/proto/livekit_rtc_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2084,6 +2084,22 @@ export class SimulateScenario extends Message<SimulateScenario> {
*/
value: bigint;
case: "subscriberBandwidth";
} | {
/**
* disconnect signal on resume
*
* @generated from field: bool disconnect_signal_on_resume = 7;
*/
value: boolean;
case: "disconnectSignalOnResume";
} | {
/**
* disconnect signal on resume before sending any messages from server
*
* @generated from field: bool disconnect_signal_on_resume_no_messages = 8;
*/
value: boolean;
case: "disconnectSignalOnResumeNoMessages";
} | { case: undefined; value?: undefined } = { case: undefined };

constructor(data?: PartialMessage<SimulateScenario>) {
Expand All @@ -2100,6 +2116,8 @@ export class SimulateScenario extends Message<SimulateScenario> {
{ no: 4, name: "server_leave", kind: "scalar", T: 8 /* ScalarType.BOOL */, oneof: "scenario" },
{ no: 5, name: "switch_candidate_protocol", kind: "enum", T: proto3.getEnumType(CandidateProtocol), oneof: "scenario" },
{ no: 6, name: "subscriber_bandwidth", kind: "scalar", T: 3 /* ScalarType.INT64 */, oneof: "scenario" },
{ no: 7, name: "disconnect_signal_on_resume", kind: "scalar", T: 8 /* ScalarType.BOOL */, oneof: "scenario" },
{ no: 8, name: "disconnect_signal_on_resume_no_messages", kind: "scalar", T: 8 /* ScalarType.BOOL */, oneof: "scenario" },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): SimulateScenario {
Expand Down
23 changes: 18 additions & 5 deletions src/room/RTCEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
}
// guard for attempting reconnection multiple times while one attempt is still not finished
if (this.attemptingReconnect) {
log.warn('already attempting reconnect, returning early', this.logContext);
return;
}
if (
Expand Down Expand Up @@ -948,6 +949,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
this.emit(EngineEvent.SignalRestarted, joinResponse);

await this.waitForPCReconnected();

// re-check signal connection state before setting engine as resumed
if (this.client.currentState !== SignalConnectionState.CONNECTED) {
throw new SignalReconnectError('Signal connection got severed during reconnect');
}

this.regionUrlProvider?.resetAttempts();
// reconnect success
this.emit(EngineEvent.Restarted);
Expand Down Expand Up @@ -984,13 +991,13 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
const rtcConfig = this.makeRTCConfiguration(res);
this.pcManager.updateConfiguration(rtcConfig);
}
} catch (e) {
} catch (error) {
let message = '';
if (e instanceof Error) {
message = e.message;
this.log.error(e.message, this.logContext);
if (error instanceof Error) {
message = error.message;
this.log.error(error.message, { ...this.logContext, error });
}
if (e instanceof ConnectionError && e.reason === ConnectionErrorReason.NotAllowed) {
if (error instanceof ConnectionError && error.reason === ConnectionErrorReason.NotAllowed) {
throw new UnexpectedConnectionState('could not reconnect, token might be expired');
}
throw new SignalReconnectError(message);
Expand All @@ -1005,6 +1012,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
await this.pcManager.triggerIceRestart();

await this.waitForPCReconnected();

// re-check signal connection state before setting engine as resumed
if (this.client.currentState !== SignalConnectionState.CONNECTED) {
throw new SignalReconnectError('Signal connection got severed during reconnect');
}

this.client.setReconnected();

// recreate publish datachannel if it's id is null
Expand Down
37 changes: 33 additions & 4 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,30 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
// @ts-expect-error function is private
await this.engine.client.handleOnClose('simulate resume-disconnect');
break;
case 'disconnect-signal-on-resume':
postAction = async () => {
// @ts-expect-error function is private
await this.engine.client.handleOnClose('simulate resume-disconnect');
};
req = new SimulateScenario({
scenario: {
case: 'disconnectSignalOnResume',
value: true,
},
});
break;
case 'disconnect-signal-on-resume-no-messages':
postAction = async () => {
// @ts-expect-error function is private
await this.engine.client.handleOnClose('simulate resume-disconnect');
};
req = new SimulateScenario({
scenario: {
case: 'disconnectSignalOnResumeNoMessages',
value: true,
},
});
break;
case 'full-reconnect':
this.engine.fullReconnectOnNext = true;
// @ts-expect-error function is private
Expand Down Expand Up @@ -824,11 +848,12 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
},
});
break;

default:
}
if (req) {
this.engine.client.sendSimulateScenario(req);
postAction();
await this.engine.client.sendSimulateScenario(req);
await postAction();
}
}

Expand Down Expand Up @@ -1669,6 +1694,10 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
this.log.warn('detected connection state mismatch', {
...this.logContext,
numFailures: consecutiveFailures,
engine: {
closed: this.engine.isClosed,
transportsConnected: this.engine.verifyTransport(),
},
});
if (consecutiveFailures >= 3) {
this.recreateEngine();
Expand Down Expand Up @@ -1820,7 +1849,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
await window.navigator.mediaDevices.getUserMedia({ video: true })
).getVideoTracks()[0]
: createDummyVideoStreamTrack(
160 * participantOptions.aspectRatios[0] ?? 1,
160 * (participantOptions.aspectRatios[0] ?? 1),
160,
true,
true,
Expand Down Expand Up @@ -1870,7 +1899,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
const p = this.getOrCreateParticipant(info.identity, info);
if (participantOptions.video) {
const dummyVideo = createDummyVideoStreamTrack(
160 * participantOptions.aspectRatios[i % participantOptions.aspectRatios.length] ?? 1,
160 * (participantOptions.aspectRatios[i % participantOptions.aspectRatios.length] ?? 1),
160,
false,
true,
Expand Down
4 changes: 3 additions & 1 deletion src/room/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ export type SimulationScenario =
// overrides server-side bandwidth estimator with set bandwidth
// this can be used to test application behavior when congested or
// to disable congestion control entirely (by setting bandwidth to 100Mbps)
| 'subscriber-bandwidth';
| 'subscriber-bandwidth'
| 'disconnect-signal-on-resume'
| 'disconnect-signal-on-resume-no-messages';

export type LoggerOptions = {
loggerName?: string;
Expand Down
Loading