Skip to content

Commit

Permalink
RSDK-8779: Remove additional cases where signaling server errors over…
Browse files Browse the repository at this point in the history
…ride successful PeerConnection attempts.
  • Loading branch information
dgottlieb committed Nov 11, 2024
1 parent e8de541 commit ea214b4
Showing 1 changed file with 43 additions and 47 deletions.
90 changes: 43 additions & 47 deletions rpc/wrtc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ func dialWebRTC(
return nil, err
}
defer func() {
err = multierr.Combine(err, conn.Close())
// Ignore any errors closing the signaling server connection. That step has no bearing on
// whether the PeerConnection was successfully made.
utils.UncheckedError(conn.Close())
}()

logger.Debugw("connected to signaling server", "signaling_server", signalingServer)
Expand All @@ -142,11 +144,13 @@ func dialWebRTC(
if dOpts.webrtcOpts.Config != nil {
config = *dOpts.webrtcOpts.Config
}

extendedConfig := extendWebRTCConfig(&config, configResp.Config, false)
peerConn, dataChannel, err := newPeerConnectionForClient(ctx, extendedConfig, dOpts.webrtcOpts.DisableTrickleICE, logger)
if err != nil {
return nil, err
}

var successful bool
defer func() {
if !successful {
Expand Down Expand Up @@ -187,27 +191,12 @@ func dialWebRTC(
// bool representing whether initial sdp exchange has occurred
haveInit := false

errCh := make(chan error)
sendErr := func(err error) {
if haveInit && isEOF(err) {
logger.Warnf("caller swallowing err %v", err)
return
}
if s, ok := status.FromError(err); ok && strings.Contains(s.Message(), noActiveOfferStr) {
return
}
logger.Warnf("caller received err %v of type %T", err, err)
select {
case <-exchangeCtx.Done():
case errCh <- err:
}
}
var uuid string
// only send once since exchange may end or ICE may end
var sendDoneErrorOnce sync.Once
var sendDoneOnce sync.Once
sendDone := func() error {
var err error
sendDoneErrorOnce.Do(func() {
sendDoneOnce.Do(func() {
_, err = signalingClient.CallUpdate(exchangeCtx, &webrtcpb.CallUpdateRequest{
Uuid: uuid,
Update: &webrtcpb.CallUpdateRequest_Done{
Expand All @@ -230,37 +219,51 @@ func dialWebRTC(
}

var pendingCandidates sync.WaitGroup

// waitOneHost is closed when the first ICE candidate of type `Host` (e.g: 127.0.0.1) is
// found.
waitOneHost := make(chan struct{})
var waitOneHostOnce sync.Once
peerConn.OnICECandidate(func(icecandidate *webrtc.ICECandidate) {
if exchangeCtx.Err() != nil {
// We've decided to bail.
return
}

if icecandidate != nil {
// The last `icecandidate` called from pion will be nil. `nil` signifies that all
// candidates were created. We will still create a goroutine for this "empty"
// candidate to wait for all other candidates to complete. Thus we only increment
// `pendingCandidates` for non-nil values.
pendingCandidates.Add(1)
if icecandidate.Typ == webrtc.ICECandidateTypeHost {
waitOneHostOnce.Do(func() {
close(waitOneHost)
})
}
}

// must spin off to unblock the ICE gatherer
utils.PanicCapturingGo(func() {
if icecandidate != nil {
defer pendingCandidates.Done()
}
select {
case <-remoteDescSet:
// We've sent the `init` offer. We can now proceed with sending individual
// candidates.
case <-exchangeCtx.Done():
return
}

if icecandidate == nil {
// There are no more candidates to generate. Wait for all existing
// candidates/CallUpdate's to complete. Then "sendDone".
pendingCandidates.Wait()
if err := sendDone(); err != nil {
sendErr(err)
}
sendDone()

Check failure on line 263 in rpc/wrtc_client.go

View workflow job for this annotation

GitHub Actions / Build and Test

Error return value is not checked (errcheck)
return
}

iProto := iceCandidateToProto(icecandidate)
callUpdateStart := time.Now()
if _, err := signalingClient.CallUpdate(exchangeCtx, &webrtcpb.CallUpdateRequest{
Expand All @@ -269,9 +272,10 @@ func dialWebRTC(
Candidate: iProto,
},
}); err != nil {
sendErr(err)
logger.Warnw("Error sending a CallUpdate", "err", err)
return
}

statsMu.Lock()
callUpdates++
callUpdateDuration := time.Since(callUpdateStart)
Expand All @@ -285,23 +289,27 @@ func dialWebRTC(

err = peerConn.SetLocalDescription(offer)
if err != nil {
logger.Errorw("Error setting local description with offer", "err", err)
return nil, err
}

select {
case <-exchangeCtx.Done():
return nil, exchangeCtx.Err()
// nolint
return nil, fmt.Errorf("Failed while waiting for first host to be generated. Err: %w", exchangeCtx.Err())
case <-waitOneHost:
}
}

encodedSDP, err := EncodeSDP(peerConn.LocalDescription())
if err != nil {
logger.Errorw("Error encoding local description", "err", err)
return nil, err
}

callClient, err := signalingClient.Call(signalCtx, &webrtcpb.CallRequest{Sdp: encodedSDP})
if err != nil {
logger.Errorw("Error calling with initial SDP", "err", err)
return nil, err
}

Expand Down Expand Up @@ -365,41 +373,29 @@ func dialWebRTC(
}
}

utils.PanicCapturingGoWithCallback(func() {
utils.PanicCapturingGo(func() {
if err := exchangeCandidates(); err != nil {
sendErr(err)
logger.Warn("Failed to exchange candidates", "err", err)
}
}, func(err interface{}) {
sendErr(fmt.Errorf("%v", err))
})

doCall := func() error {
select {
case <-exchangeCtx.Done():
return multierr.Combine(exchangeCtx.Err(), clientCh.Close())
case <-clientCh.Ready():
return nil
case err := <-errCh:
return multierr.Combine(err, clientCh.Close())
}
}

if callErr := doCall(); callErr != nil {
var err error
sendDoneErrorOnce.Do(func() {
select {
case <-clientCh.Ready():
// Happy path
sendDone()

Check failure on line 385 in rpc/wrtc_client.go

View workflow job for this annotation

GitHub Actions / Build and Test

Error return value is not checked (errcheck)
successful = true
case <-exchangeCtx.Done():
sendDoneOnce.Do(func() {
_, err = signalingClient.CallUpdate(exchangeCtx, &webrtcpb.CallUpdateRequest{
Uuid: uuid,
Update: &webrtcpb.CallUpdateRequest_Error{
Error: ErrorToStatus(callErr).Proto(),
Error: ErrorToStatus(exchangeCtx.Err()).Proto(),
},
})
})
return nil, multierr.Combine(callErr, err)
return nil, exchangeCtx.Err()
}
if err := sendDone(); err != nil {
return nil, err
}
successful = true

return clientCh, nil
}

Expand Down

0 comments on commit ea214b4

Please sign in to comment.