Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PeerConnection state machine #20

Merged
merged 3 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 51 additions & 23 deletions lib/ex_webrtc/dtls_transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,24 @@ defmodule ExWebRTC.DTLSTransport do

@type dtls_transport() :: GenServer.server()

# Messages sent by the DTLSTransport
@typedoc false
@type signal() :: {:dtls_transport, pid(), state_change() | rtp_data()}

# Message sent when DTLSTransport changes its state
@typedoc false
@type state_change() :: {:state_change, dtls_state()}

# Message sent when a new RTP packet arrives.
# Packet is decrypted.
@typedoc false
@type rtp_data() :: {:rtp_data, binary()}

# Possible DTLSTransport states.
# For the exact meaning, refer to the [WebRTC W3C, sec. 5.5.1](https://www.w3.org/TR/webrtc/#rtcdtlstransportstate-enum)
@typedoc false
@type dtls_state() :: :new | :connecting | :connected | :closed | :failed

@doc false
@spec start_link(ExICE.ICEAgent.opts(), GenServer.server()) :: GenServer.on_start()
def start_link(ice_config, ice_module \\ ICEAgent) do
Expand Down Expand Up @@ -66,6 +84,8 @@ defmodule ExWebRTC.DTLSTransport do
mode: nil
}

notify(state.owner, {:state_change, :new})

{:ok, state}
end

Expand Down Expand Up @@ -163,7 +183,7 @@ defmodule ExWebRTC.DTLSTransport do
{:handshake_packets, packets, timeout} when state.ice_state in [:connected, :completed] ->
:ok = ICEAgent.send_data(state.ice_agent, packets)
Process.send_after(self(), :dtls_timeout, timeout)
%{state | dtls_state: :connecting}
update_dtls_state(state, :connecting)

{:handshake_packets, packets, timeout} ->
Logger.debug("""
Expand All @@ -172,19 +192,20 @@ defmodule ExWebRTC.DTLSTransport do
""")

Process.send_after(self(), :dtls_timeout, timeout)
%{state | dtls_state: :connecting, buffered_packets: packets}
state = %{state | buffered_packets: packets}
update_dtls_state(state, :connecting)

{:handshake_finished, _, remote_keying_material, profile, packets} ->
Logger.debug("DTLS handshake finished")
ICEAgent.send_data(state.ice_agent, packets)
# TODO: validate fingerprint
state = setup_srtp(state, remote_keying_material, profile)
%{state | dtls_state: :connected}
update_dtls_state(state, :connected)

{:handshake_finished, _, remote_keying_material, profile} ->
Logger.debug("DTLS handshake finished")
state = setup_srtp(state, remote_keying_material, profile)
%{state | dtls_state: :connected}
update_dtls_state(state, :connected)

:handshake_want_read ->
state
Expand All @@ -196,7 +217,7 @@ defmodule ExWebRTC.DTLSTransport do
case ExLibSRTP.unprotect(state.srtp, data) do
{:ok, payload} ->
# TODO: temporarily, everything goes to peer connection process
send(state.owner, {:rtp_data, payload})
notify(state.owner, {:rtp_data, payload})

{:error, reason} ->
Logger.warning("Failed to decrypt SRTP, reason: #{inspect(reason)}")
Expand All @@ -213,35 +234,33 @@ defmodule ExWebRTC.DTLSTransport do
state
end

# I hope ExICE will be refactord so new state is a tuple
defp handle_ice(new_state, %{dtls_state: :new} = state)
when new_state in [:connected, :completed] do
state =
if state.mode == :active do
{packets, timeout} = ExDTLS.do_handshake(state.dtls)
Process.send_after(self(), :dtls_timeout, timeout)
:ok = ICEAgent.send_data(state.ice_agent, packets)
%{state | dtls_state: :connecting}
else
state
end
defp handle_ice({:connection_state_change, new_ice_state}, %{dtls_state: :new} = state)
when new_ice_state in [:connected, :completed] do
state = %{state | ice_state: new_ice_state}

%{state | ice_state: new_state}
if state.mode == :active do
{packets, timeout} = ExDTLS.do_handshake(state.dtls)
Process.send_after(self(), :dtls_timeout, timeout)
:ok = ICEAgent.send_data(state.ice_agent, packets)
update_dtls_state(state, :connecting)
else
state
end
end

defp handle_ice(new_state, state)
when new_state in [:connected, :completed] do
defp handle_ice({:connection_state_change, new_ice_state}, state)
when new_ice_state in [:connected, :completed] do
if state.buffered_packets do
Logger.debug("Sending buffered DTLS packets")
:ok = ICEAgent.send_data(state.ice_agent, state.buffered_packets)
%{state | ice_state: new_state, buffered_packets: nil}
%{state | ice_state: new_ice_state, buffered_packets: nil}
else
state
end
end

defp handle_ice(new_state, state) when is_atom(new_state) do
%{state | ice_state: new_state}
defp handle_ice({:connection_state_change, new_ice_state}, state) do
%{state | ice_state: new_ice_state}
end

defp handle_ice(_msg, state), do: state
Expand All @@ -260,4 +279,13 @@ defmodule ExWebRTC.DTLSTransport do
:ok = ExLibSRTP.add_stream(state.srtp, policy)
state
end

defp update_dtls_state(%{dtls_state: dtls_state} = state, dtls_state), do: state

defp update_dtls_state(state, new_dtls_state) do
notify(state.owner, {:state_change, new_dtls_state})
%{state | dtls_state: new_dtls_state}
end

defp notify(dst, msg), do: send(dst, {:dtls_transport, self(), msg})
end
69 changes: 64 additions & 5 deletions lib/ex_webrtc/peer_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ defmodule ExWebRTC.PeerConnection do
streams: [:TODO]
]

@typedoc """
Messages sent by the ExWebRTC.
"""
@type signal() :: {:ex_webrtc, pid(), connection_state_change()}

@type connection_state_change() :: {:connection_state_change, connection_state()}

@typedoc """
Possible PeerConnection states.

For the exact meaning, refer to the [WebRTC W3C, section 4.3.3](https://www.w3.org/TR/webrtc/#rtcpeerconnectionstate-enum)
"""
@type connection_state() :: :closed | :failed | :disconnected | :new | :connecting | :connected

@enforce_keys [:config, :owner]
defstruct @enforce_keys ++
[
Expand All @@ -39,11 +53,13 @@ defmodule ExWebRTC.PeerConnection do
:current_remote_desc,
:pending_remote_desc,
:ice_agent,
:ice_state,
:dtls_transport,
demuxer: %Demuxer{},
transceivers: [],
ice_state: nil,
dtls_state: nil,
signaling_state: :stable,
conn_state: :new,
last_offer: nil,
last_answer: nil
]
Expand Down Expand Up @@ -119,9 +135,13 @@ defmodule ExWebRTC.PeerConnection do
owner: owner,
config: config,
ice_agent: ice_agent,
dtls_transport: dtls_transport
dtls_transport: dtls_transport,
ice_state: :new,
dtls_state: :new
}

notify(state.owner, {:connection_state_change, :new})

{:ok, state}
end

Expand Down Expand Up @@ -327,8 +347,11 @@ defmodule ExWebRTC.PeerConnection do
end

@impl true
def handle_info({:ex_ice, _from, :connected}, state) do
{:noreply, %__MODULE__{state | ice_state: :connected}}
def handle_info({:ex_ice, _from, {:connection_state_change, new_ice_state}}, state) do
state = %__MODULE__{state | ice_state: new_ice_state}
next_conn_state = next_conn_state(new_ice_state, state.dtls_state)
state = update_conn_state(state, next_conn_state)
{:noreply, state}
end

@impl true
Expand All @@ -346,7 +369,15 @@ defmodule ExWebRTC.PeerConnection do
end

@impl true
def handle_info({:rtp_data, data}, state) do
def handle_info({:dtls_transport, _pid, {:state_change, new_dtls_state}}, state) do
state = %__MODULE__{state | dtls_state: new_dtls_state}
next_conn_state = next_conn_state(state.ice_state, new_dtls_state)
state = update_conn_state(state, next_conn_state)
{:noreply, state}
end

@impl true
def handle_info({:dtls_transport, _pid, {:rtp_data, data}}, state) do
case Demuxer.demux(state.demuxer, data) do
{:ok, demuxer, mid, packet} ->
notify(state.owner, {:data, {mid, packet}})
Expand Down Expand Up @@ -514,6 +545,34 @@ defmodule ExWebRTC.PeerConnection do
defp maybe_next_state(:have_remote_pranswer, :remote, :answer), do: {:ok, :stable}
defp maybe_next_state(:have_remote_pranswer, _, _), do: {:error, :invalid_transition}

# TODO support :disconnected state - our ICE doesn't provide disconnected state for now
# TODO support :closed state
# the order of these clauses is important
defp next_conn_state(ice_state, dtls_state)

defp next_conn_state(ice_state, dtls_state) when ice_state == :failed or dtls_state == :failed,
do: :failed

defp next_conn_state(:failed, _dtls_state), do: :failed

defp next_conn_state(_ice_state, :failed), do: :failed

defp next_conn_state(:new, :new), do: :new

defp next_conn_state(ice_state, dtls_state)
when ice_state in [:new, :checking] or dtls_state in [:new, :connecting],
do: :connecting

defp next_conn_state(ice_state, :connected) when ice_state in [:connected, :completed],
do: :connected

defp update_conn_state(%{conn_state: conn_state} = state, conn_state), do: state

defp update_conn_state(state, new_conn_state) do
notify(state.owner, {:connection_state_change, new_conn_state})
%{state | conn_state: new_conn_state}
end

defp set_description(:local, :answer, sdp, state) do
# NOTICE: internaly, we don't create SessionDescription
# as it would require serialization of sdp
Expand Down
16 changes: 8 additions & 8 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ defmodule ExWebRTC.MixProject do

defp deps do
[
{:ex_sdp, "~> 0.13"},
{:ex_ice, "~> 0.1"},
{:ex_dtls, "~> 0.14"},
{:ex_libsrtp, "~> 0.6"},
{:ex_rtp, "~> 0.2"},
{:ex_rtcp, "~> 0.1"},
{:ex_sdp, "~> 0.13.0"},
{:ex_ice, "~> 0.2.0"},
{:ex_dtls, "~> 0.14.0"},
{:ex_libsrtp, "~> 0.6.0"},
{:ex_rtp, "~> 0.2.0"},
{:ex_rtcp, "~> 0.1.0"},

# dev/test
{:excoveralls, "~> 0.14", only: [:dev, :test], runtime: false},
{:ex_doc, "~> 0.30", only: :dev, runtime: false},
{:excoveralls, "~> 0.17.0", only: [:dev, :test], runtime: false},
{:ex_doc, "~> 0.30.0", only: :dev, runtime: false},
{:credo, "~> 1.7", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.4", only: [:dev, :test], runtime: false}
]
Expand Down
Loading
Loading