From 415110580cb27672e0b4eb2fc8286c27278f2a29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Thu, 23 Nov 2023 20:15:54 +0100 Subject: [PATCH] Add PeerConnection state machine --- lib/ex_webrtc/dtls_transport.ex | 59 +++++++++++----- lib/ex_webrtc/peer_connection.ex | 68 +++++++++++++++++-- mix.lock | 2 +- test/ex_webrtc/dtls_transport_test.exs | 5 ++ test/ex_webrtc/peer_connection_test.exs | 90 +++++++++++++++++++++++++ 5 files changed, 203 insertions(+), 21 deletions(-) diff --git a/lib/ex_webrtc/dtls_transport.ex b/lib/ex_webrtc/dtls_transport.ex index 9734005a..6a4931c9 100644 --- a/lib/ex_webrtc/dtls_transport.ex +++ b/lib/ex_webrtc/dtls_transport.ex @@ -11,6 +11,24 @@ defmodule ExWebRTC.DTLSTransport do @type dtls_transport() :: GenServer.server() + # Messages sent by the DTLSTransport + @typedoc false + @type signal() :: {:dtls_transport, pid(), state_change() | rtp_data()} + + # Message sent when DTLSTransport changes its state + @typedoc false + @type state_change() :: {:state_change, dtls_state()} + + # Message sent when a new RTP packet arrives. + # Packet is decrypted. + @typedoc false + @type rtp_data() :: {:rtp_data, binary()} + + # Possible DTLSTransport states. + # For the exact meaning, refer to the [WebRTC W3C, sec. 5.5.1](https://www.w3.org/TR/webrtc/#rtcdtlstransportstate-enum) + @typedoc false + @type dtls_state() :: :new | :connecting | :connected | :closed | :failed + @doc false @spec start_link(ExICE.ICEAgent.opts(), GenServer.server()) :: GenServer.on_start() def start_link(ice_config, ice_module \\ ICEAgent) do @@ -66,6 +84,8 @@ defmodule ExWebRTC.DTLSTransport do mode: nil } + notify(state.owner, {:state_change, :new}) + {:ok, state} end @@ -163,7 +183,7 @@ defmodule ExWebRTC.DTLSTransport do {:handshake_packets, packets, timeout} when state.ice_state in [:connected, :completed] -> :ok = ICEAgent.send_data(state.ice_agent, packets) Process.send_after(self(), :dtls_timeout, timeout) - %{state | dtls_state: :connecting} + update_dtls_state(state, :connecting) {:handshake_packets, packets, timeout} -> Logger.debug(""" @@ -172,19 +192,20 @@ defmodule ExWebRTC.DTLSTransport do """) Process.send_after(self(), :dtls_timeout, timeout) - %{state | dtls_state: :connecting, buffered_packets: packets} + state = %{state | buffered_packets: packets} + update_dtls_state(state, :connecting) {:handshake_finished, _, remote_keying_material, profile, packets} -> Logger.debug("DTLS handshake finished") ICEAgent.send_data(state.ice_agent, packets) # TODO: validate fingerprint state = setup_srtp(state, remote_keying_material, profile) - %{state | dtls_state: :connected} + update_dtls_state(state, :connected) {:handshake_finished, _, remote_keying_material, profile} -> Logger.debug("DTLS handshake finished") state = setup_srtp(state, remote_keying_material, profile) - %{state | dtls_state: :connected} + update_dtls_state(state, :connected) :handshake_want_read -> state @@ -196,7 +217,7 @@ defmodule ExWebRTC.DTLSTransport do case ExLibSRTP.unprotect(state.srtp, data) do {:ok, payload} -> # TODO: temporarily, everything goes to peer connection process - send(state.owner, {:rtp_data, payload}) + notify(state.owner, {:rtp_data, payload}) {:error, reason} -> Logger.warning("Failed to decrypt SRTP, reason: #{inspect(reason)}") @@ -216,17 +237,16 @@ defmodule ExWebRTC.DTLSTransport do # I hope ExICE will be refactord so new state is a tuple defp handle_ice(new_state, %{dtls_state: :new} = state) when new_state in [:connected, :completed] do - state = - if state.mode == :active do - {packets, timeout} = ExDTLS.do_handshake(state.dtls) - Process.send_after(self(), :dtls_timeout, timeout) - :ok = ICEAgent.send_data(state.ice_agent, packets) - %{state | dtls_state: :connecting} - else - state - end + state = %{state | ice_state: new_state} - %{state | ice_state: new_state} + if state.mode == :active do + {packets, timeout} = ExDTLS.do_handshake(state.dtls) + Process.send_after(self(), :dtls_timeout, timeout) + :ok = ICEAgent.send_data(state.ice_agent, packets) + update_dtls_state(state, :connecting) + else + state + end end defp handle_ice(new_state, state) @@ -260,4 +280,13 @@ defmodule ExWebRTC.DTLSTransport do :ok = ExLibSRTP.add_stream(state.srtp, policy) state end + + defp update_dtls_state(%{dtls_state: dtls_state} = state, dtls_state), do: state + + defp update_dtls_state(state, new_dtls_state) do + notify(state.owner, {:state_change, new_dtls_state}) + %{state | dtls_state: new_dtls_state} + end + + defp notify(dst, msg), do: send(dst, {:dtls_transport, self(), msg}) end diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex index 9c0f127c..c4c969df 100644 --- a/lib/ex_webrtc/peer_connection.ex +++ b/lib/ex_webrtc/peer_connection.ex @@ -31,6 +31,20 @@ defmodule ExWebRTC.PeerConnection do streams: [:TODO] ] + @typedoc """ + Messages sent by the ExWebRTC. + """ + @type signal() :: {:ex_webrtc, pid(), connection_state_change()} + + @type connection_state_change() :: {:connection_state_change, connection_state()} + + @typedoc """ + Possible PeerConnection states. + + For the exact meaning, refer to the [WebRTC W3C, section 4.3.3](https://www.w3.org/TR/webrtc/#rtcpeerconnectionstate-enum) + """ + @type connection_state() :: :closed | :failed | :disconnected | :new | :connecting | :connected + @enforce_keys [:config, :owner] defstruct @enforce_keys ++ [ @@ -39,11 +53,13 @@ defmodule ExWebRTC.PeerConnection do :current_remote_desc, :pending_remote_desc, :ice_agent, - :ice_state, :dtls_transport, demuxer: %Demuxer{}, transceivers: [], + ice_state: nil, + dtls_state: nil, signaling_state: :stable, + conn_state: :new, last_offer: nil, last_answer: nil ] @@ -119,9 +135,13 @@ defmodule ExWebRTC.PeerConnection do owner: owner, config: config, ice_agent: ice_agent, - dtls_transport: dtls_transport + dtls_transport: dtls_transport, + ice_state: :new, + dtls_state: :new } + notify(state.owner, {:connection_state_change, :new}) + {:ok, state} end @@ -327,8 +347,12 @@ defmodule ExWebRTC.PeerConnection do end @impl true - def handle_info({:ex_ice, _from, :connected}, state) do - {:noreply, %__MODULE__{state | ice_state: :connected}} + def handle_info({:ex_ice, _from, msg}, state) + when msg in [:checking, :connected, :completed, :failed] do + state = %__MODULE__{state | ice_state: msg} + next_conn_state = next_conn_state(msg, state.dtls_state) + state = update_conn_state(state, next_conn_state) + {:noreply, state} end @impl true @@ -346,7 +370,15 @@ defmodule ExWebRTC.PeerConnection do end @impl true - def handle_info({:rtp_data, data}, state) do + def handle_info({:dtls_transport, _pid, {:state_change, new_dtls_state}}, state) do + state = %__MODULE__{state | dtls_state: new_dtls_state} + next_conn_state = next_conn_state(state.ice_state, new_dtls_state) + state = update_conn_state(state, next_conn_state) + {:noreply, state} + end + + @impl true + def handle_info({:dtls_transport, _pid, {:rtp_data, data}}, state) do case Demuxer.demux(state.demuxer, data) do {:ok, demuxer, mid, packet} -> notify(state.owner, {:data, {mid, packet}}) @@ -514,6 +546,32 @@ defmodule ExWebRTC.PeerConnection do defp maybe_next_state(:have_remote_pranswer, :remote, :answer), do: {:ok, :stable} defp maybe_next_state(:have_remote_pranswer, _, _), do: {:error, :invalid_transition} + # TODO support :disconnected state - our ICE doesn't provide disconnected state for now + # TODO support :closed state + # the order of these clauses is important + defp next_conn_state(ice_state, dtls_state) + + defp next_conn_state(ice_state, dtls_state) when ice_state == :failed or dtls_state == :failed, + do: :failed + + defp next_conn_state(ice_state, dtls_state) when ice_state == :new and dtls_state == :new, + do: :new + + defp next_conn_state(ice_state, dtls_state) + when ice_state in [:new, :checking] or dtls_state in [:new, :connecting], + do: :connecting + + defp next_conn_state(ice_state, dtls_state) + when ice_state in [:connected, :completed] and dtls_state in [:connected], + do: :connected + + defp update_conn_state(%{conn_state: conn_state} = state, conn_state), do: state + + defp update_conn_state(state, new_conn_state) do + notify(state.owner, {:connection_state_change, new_conn_state}) + %{state | conn_state: new_conn_state} + end + defp set_description(:local, :answer, sdp, state) do # NOTICE: internaly, we don't create SessionDescription # as it would require serialization of sdp diff --git a/mix.lock b/mix.lock index 12c45908..8e749370 100644 --- a/mix.lock +++ b/mix.lock @@ -1,7 +1,7 @@ %{ "bunch": {:hex, :bunch, "1.6.0", "4775f8cdf5e801c06beed3913b0bd53fceec9d63380cdcccbda6be125a6cfd54", [:mix], [], "hexpm", "ef4e9abf83f0299d599daed3764d19e8eac5d27a5237e5e4d5e2c129cfeb9a22"}, "bunch_native": {:hex, :bunch_native, "0.5.0", "8ac1536789a597599c10b652e0b526d8833348c19e4739a0759a2bedfd924e63", [:mix], [{:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "24190c760e32b23b36edeb2dc4852515c7c5b3b8675b1a864e0715bdd1c8f80d"}, - "bundlex": {:hex, :bundlex, "1.2.0", "a89869208a019376a38e8a10e1bd573dcbeae8addd381c2cd74e2817010bef8f", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:req, "~> 0.4.0", [hex: :req, repo: "hexpm", optional: false]}, {:secure_random, "~> 0.5", [hex: :secure_random, repo: "hexpm", optional: false]}, {:zarex, "~> 1.0", [hex: :zarex, repo: "hexpm", optional: false]}], "hexpm", "d2182b91a2a53847baadf4745ad2291853e786ad28671f474a611e7703dbca9b"}, + "bundlex": {:hex, :bundlex, "1.3.1", "5791b4037df961f092eac9a51d8df91030a80381e442e580a3f4d82c9e5d34f0", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:req, "~> 0.4.0", [hex: :req, repo: "hexpm", optional: false]}, {:secure_random, "~> 0.5", [hex: :secure_random, repo: "hexpm", optional: false]}, {:zarex, "~> 1.0", [hex: :zarex, repo: "hexpm", optional: false]}], "hexpm", "9651ddc7e627dd1bd0eed9aaaba3de8b4bbc06c10980089f7276cdb82bb3fc51"}, "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, "castore": {:hex, :castore, "1.0.4", "ff4d0fb2e6411c0479b1d965a814ea6d00e51eb2f58697446e9c41a97d940b28", [:mix], [], "hexpm", "9418c1b8144e11656f0be99943db4caf04612e3eaecefb5dae9a2a87565584f8"}, "credo": {:hex, :credo, "1.7.0", "6119bee47272e85995598ee04f2ebbed3e947678dee048d10b5feca139435f75", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "6839fcf63d1f0d1c0f450abc8564a57c43d644077ab96f2934563e68b8a769d7"}, diff --git a/test/ex_webrtc/dtls_transport_test.exs b/test/ex_webrtc/dtls_transport_test.exs index 9763fc9e..a4741279 100644 --- a/test/ex_webrtc/dtls_transport_test.exs +++ b/test/ex_webrtc/dtls_transport_test.exs @@ -37,6 +37,7 @@ defmodule ExWebRTC.DTLSTransportTest do setup do assert {:ok, dtls} = DTLSTransport.start_link([tester: self()], FakeICEAgent) + assert_receive {:dtls_transport, ^dtls, {:state_change, :new}} ice = DTLSTransport.get_ice_agent(dtls) assert is_pid(ice) @@ -111,6 +112,8 @@ defmodule ExWebRTC.DTLSTransportTest do FakeICEAgent.send_dtls(ice, :connected) assert :ok = check_handshake(dtls, ice, remote_dtls) + assert_receive {:dtls_transport, ^dtls, {:state_change, :connecting}} + assert_receive {:dtls_transport, ^dtls, {:state_change, :connected}} end test "finishes handshake in passive mode", %{dtls: dtls, ice: ice} do @@ -122,6 +125,8 @@ defmodule ExWebRTC.DTLSTransportTest do FakeICEAgent.send_dtls(ice, {:data, packets}) assert :ok == check_handshake(dtls, ice, remote_dtls) + assert_receive {:dtls_transport, ^dtls, {:state_change, :connecting}} + assert_receive {:dtls_transport, ^dtls, {:state_change, :connected}} end defp check_handshake(dtls, ice, remote_dtls) do diff --git a/test/ex_webrtc/peer_connection_test.exs b/test/ex_webrtc/peer_connection_test.exs index dfdeb876..2f0aa21c 100644 --- a/test/ex_webrtc/peer_connection_test.exs +++ b/test/ex_webrtc/peer_connection_test.exs @@ -42,6 +42,96 @@ defmodule ExWebRTC.PeerConnectionTest do :ok = PeerConnection.set_remote_description(pc1, answer) end + test "connection state change" do + {:ok, pc1} = PeerConnection.start_link() + assert_receive {:ex_webrtc, ^pc1, {:connection_state_change, :new}} + {:ok, _} = PeerConnection.add_transceiver(pc1, :audio) + {:ok, offer} = PeerConnection.create_offer(pc1) + :ok = PeerConnection.set_local_description(pc1, offer) + + {:ok, pc2} = PeerConnection.start_link() + assert_receive {:ex_webrtc, ^pc2, {:connection_state_change, :new}} + :ok = PeerConnection.set_remote_description(pc2, offer) + {:ok, answer} = PeerConnection.create_answer(pc2) + :ok = PeerConnection.set_local_description(pc2, answer) + + :ok = PeerConnection.set_remote_description(pc1, answer) + + assert :ok == + check_connection_state_change( + pc1, + pc2, + %{ + connecting_recv: false, + connected_recv: false + }, + %{ + connecting_recv: false, + connected_recv: false + } + ) + end + + defp check_connection_state_change( + _pc1, + _pc2, + %{ + connecting_recv: true, + connected_recv: true + }, + %{ + connecting_recv: true, + connected_recv: true + } + ), + do: :ok + + defp check_connection_state_change(pc1, pc2, pc1_states, pc2_states) do + receive do + {:ex_webrtc, ^pc1, {:ice_candidate, cand}} -> + :ok = PeerConnection.add_ice_candidate(pc2, cand) + check_connection_state_change(pc1, pc2, pc1_states, pc2_states) + + {:ex_webrtc, ^pc2, {:ice_candidate, cand}} -> + :ok = PeerConnection.add_ice_candidate(pc1, cand) + check_connection_state_change(pc1, pc2, pc1_states, pc2_states) + + {:ex_webrtc, ^pc1, {:connection_state_change, :connecting}} + when pc1_states.connecting_recv == false and pc1_states.connected_recv == false -> + check_connection_state_change(pc1, pc2, %{pc1_states | connecting_recv: true}, pc2_states) + + {:ex_webrtc, ^pc1, {:connection_state_change, :connecting}} = msg -> + raise "Unexpectedly received: #{inspect(msg)}, when pc_states is: #{inspect(pc1_states)}" + + {:ex_webrtc, ^pc2, {:connection_state_change, :connecting}} + when pc2_states.connecting_recv == false and pc2_states.connected_recv == false -> + check_connection_state_change(pc1, pc2, pc1_states, %{pc2_states | connecting_recv: true}) + + {:ex_webrtc, ^pc2, {:connection_state_change, :connecting}} = msg -> + raise "Unexpectedly received: #{inspect(msg)}, when pc_states is: #{inspect(pc2_states)}" + + {:ex_webrtc, ^pc1, {:connection_state_change, :connected}} + when pc1_states.connecting_recv == true and pc1_states.connected_recv == false -> + check_connection_state_change(pc1, pc2, %{pc1_states | connected_recv: true}, pc2_states) + + {:ex_webrtc, ^pc1, {:connection_state_change, :connected}} = msg -> + raise "Unexpectedly received: #{inspect(msg)}, when pc_states is: #{inspect(pc1_states)}" + + {:ex_webrtc, ^pc2, {:connection_state_change, :connected}} + when pc2_states.connecting_recv == true and pc2_states.connected_recv == false -> + check_connection_state_change(pc1, pc2, pc1_states, %{pc2_states | connected_recv: true}) + + {:ex_webrtc, ^pc2, {:connection_state_change, :connected}} = msg -> + raise "Unexpectedly received: #{inspect(msg)}, when pc_states is: #{inspect(pc2_states)}" + + {:ex_webrtc, ^pc1, {:connection_state_change, _state}} = msg -> + raise "Unexpectedly received: #{inspect(msg)}, when pc_states is: #{inspect(pc1_states)}" + + {:ex_webrtc, ^pc2, {:connection_state_change, _state}} = msg -> + raise "Unexpectedly received: #{inspect(msg)}, when pc_states is: #{inspect(pc2_states)}" + end + end + describe "set_remote_description/2" do test "MID" do {:ok, pc} = PeerConnection.start_link()