From b732ece20b349fb9f848ca98aeea5acf146fb5aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Wala?= Date: Tue, 21 Nov 2023 13:33:37 +0100 Subject: [PATCH] Convert `RTPTransceiver` to `GenServer` --- lib/ex_webrtc/peer_connection.ex | 82 ++++++++++++--- lib/ex_webrtc/rtp_transceiver.ex | 170 +++++++------------------------ lib/ex_webrtc/sdp_utils.ex | 74 ++++++++++++++ 3 files changed, 177 insertions(+), 149 deletions(-) diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex index 9c0f127c..f3bdffb9 100644 --- a/lib/ex_webrtc/peer_connection.ex +++ b/lib/ex_webrtc/peer_connection.ex @@ -14,6 +14,7 @@ defmodule ExWebRTC.PeerConnection do DTLSTransport, IceCandidate, MediaStreamTrack, + RTPCodecParameters, RTPTransceiver, SDPUtils, SessionDescription, @@ -91,13 +92,15 @@ defmodule ExWebRTC.PeerConnection do GenServer.call(peer_connection, {:add_ice_candidate, candidate}) end - @spec get_transceivers(peer_connection()) :: [RTPTransceiver.t()] + @spec get_transceivers(peer_connection()) :: [ + {mid :: String.t(), RTPTransceiver.rtp_transceiver()} + ] def get_transceivers(peer_connection) do GenServer.call(peer_connection, :get_transceivers) end @spec add_transceiver(peer_connection(), RTPTransceiver.kind(), transceiver_options()) :: - {:ok, RTPTransceiver.t()} | {:error, :TODO} + {:ok, RTPTransceiver.rtp_transceiver()} | {:error, :TODO} def add_transceiver(peer_connection, kind, options \\ []) do GenServer.call(peer_connection, {:add_transceiver, kind, options}) end @@ -161,7 +164,7 @@ defmodule ExWebRTC.PeerConnection do rtcp: true ] - mlines = Enum.map(transceivers, &RTPTransceiver.to_offer_mline(&1, opts)) + mlines = Enum.map(transceivers, fn {_, t} -> SDPUtils.to_offer_mline(t, opts) end) mids = Enum.map(mlines, fn mline -> @@ -218,8 +221,8 @@ defmodule ExWebRTC.PeerConnection do mlines = Enum.map(remote_offer.media, fn mline -> {:mid, mid} = ExSDP.Media.get_attribute(mline, :mid) - {_ix, transceiver} = RTPTransceiver.find_by_mid(state.transceivers, mid) - RTPTransceiver.to_answer_mline(transceiver, mline, opts) + {_, transceiver} = Enum.find(state.transceivers, fn {m, _} -> m == mid end) + SDPUtils.to_answer_mline(transceiver, mline, opts) end) mids = @@ -314,15 +317,16 @@ defmodule ExWebRTC.PeerConnection do :video -> {state.config.video_rtp_hdr_exts, state.config.video_codecs} end - transceiver = %RTPTransceiver{ + props = %{ mid: nil, direction: direction, - kind: kind, codecs: codecs, rtp_hdr_exts: rtp_hdr_exts } - transceivers = List.insert_at(state.transceivers, -1, transceiver) + {:ok, transceiver} = RTPTransceiver.start_link(kind, props) + + transceivers = state.transceivers ++ [{nil, transceiver}] {:reply, {:ok, transceiver}, %{state | transceivers: transceivers}} end @@ -408,7 +412,7 @@ defmodule ExWebRTC.PeerConnection do new_transceivers # only take new transceivers that can receive tracks |> Enum.filter(fn tr -> - RTPTransceiver.find_by_mid(state.transceivers, tr.mid) == nil and + Enum.all?(state.transceivers, fn {m, _} -> tr.mid != m end) and tr.direction in [:recvonly, :sendrecv] end) |> Enum.map(fn tr -> MediaStreamTrack.from_transceiver(tr) end) @@ -442,7 +446,7 @@ defmodule ExWebRTC.PeerConnection do Enum.reduce_while(sdp.media, {:ok, transceivers}, fn mline, {:ok, transceivers} -> case ExSDP.Media.get_attribute(mline, :mid) do {:mid, mid} -> - transceivers = RTPTransceiver.update_or_create(transceivers, mid, mline, config) + transceivers = update_or_create_transceiver(transceivers, mid, mline, config) {:cont, {:ok, transceivers}} _other -> @@ -451,11 +455,63 @@ defmodule ExWebRTC.PeerConnection do end) end + # searches for transceiver for a given mline + # if it exists, updates its configuration + # if it doesn't exist, creats a new one + # returns list of updated transceivers + defp update_or_create_transceiver(transceivers, mid, mline, config) do + codecs = get_codecs(mline, config) + rtp_hdr_exts = get_rtp_hdr_extensions(mline, config) + props = %{codecs: codecs, rtp_hdr_exts: rtp_hdr_exts} + + Enum.find(transceivers, fn {m, _} -> m == mid end) + |> case do + {_, tr} -> + :ok = RTPTransceiver.update_properties(tr, props) + transceivers + + nil -> + props = Map.merge(props, %{mid: mid, direction: :recvonly}) + {:ok, tr} = RTPTransceiver.start_link(mline.type, props) + transceivers ++ [{mid, tr}] + end + end + + defp get_codecs(mline, config) do + rtp_mappings = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.RTPMapping) + fmtps = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.FMTP) + all_rtcp_fbs = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.RTCPFeedback) + + rtp_mappings + |> Stream.map(fn rtp_mapping -> + fmtp = Enum.find(fmtps, &(&1.pt == rtp_mapping.payload_type)) + + rtcp_fbs = + all_rtcp_fbs + |> Stream.filter(&(&1.pt == rtp_mapping.payload_type)) + |> Enum.filter(&Configuration.is_supported_rtcp_fb(config, &1)) + + RTPCodecParameters.new(mline.type, rtp_mapping, fmtp, rtcp_fbs) + end) + |> Enum.filter(fn codec -> Configuration.is_supported_codec(config, codec) end) + end + + defp get_rtp_hdr_extensions(mline, config) do + mline + |> ExSDP.Media.get_attributes(ExSDP.Attribute.Extmap) + |> Enum.filter(&Configuration.is_supported_rtp_hdr_extension(config, &1, mline.type)) + end + defp assign_mids(transceivers, next_mid) do {new_transceivers, _next_mid} = Enum.map_reduce(transceivers, next_mid, fn - %{mid: nil} = t, nm -> {%{t | mid: to_string(nm)}, nm + 1} - other, nm -> {other, nm} + {nil, tr}, nm -> + mid = to_string(nm) + :ok = RTPTransceiver.update_properties(tr, %{mid: mid}) + {{mid, tr}, nm + 1} + + other, nm -> + {other, nm} end) new_transceivers @@ -476,7 +532,7 @@ defmodule ExWebRTC.PeerConnection do end tsc_mids = - for %RTPTransceiver{mid: mid} when mid != nil <- state.transceivers, + for {mid, _transceiver} when mid != nil <- state.transceivers, {mid, ""} <- Integer.parse(mid) do mid end diff --git a/lib/ex_webrtc/rtp_transceiver.ex b/lib/ex_webrtc/rtp_transceiver.ex index 82d24c4a..f1a9cf69 100644 --- a/lib/ex_webrtc/rtp_transceiver.ex +++ b/lib/ex_webrtc/rtp_transceiver.ex @@ -3,164 +3,62 @@ defmodule ExWebRTC.RTPTransceiver do RTPTransceiver """ - alias ExWebRTC.{PeerConnection.Configuration, RTPCodecParameters, RTPReceiver} + use GenServer + + alias ExWebRTC.RTPCodecParameters + + @type rtp_transceiver() :: GenServer.server() @type direction() :: :sendonly | :recvonly | :sendrecv | :inactive | :stopped @type kind() :: :audio | :video - @type t() :: %__MODULE__{ - mid: String.t(), + @type properties() :: %{ + mid: String.t() | nil, direction: direction(), - kind: kind(), rtp_hdr_exts: [ExSDP.Attribute.Extmap.t()], - codecs: [RTPCodecParameters.t()], - rtp_receiver: nil + codecs: [RTPCodecParameters.t()] } - @enforce_keys [:mid, :direction, :kind] - defstruct @enforce_keys ++ [codecs: [], rtp_hdr_exts: [], rtp_receiver: %RTPReceiver{}] - @doc false - def find_by_mid(transceivers, mid) do - transceivers - |> Enum.with_index(fn tr, idx -> {idx, tr} end) - |> Enum.find(fn {_idx, tr} -> tr.mid == mid end) + @spec start_link(kind(), properties()) :: GenServer.on_start() + def start_link(kind, properties) do + GenServer.start_link(__MODULE__, [kind, properties]) end @doc false - @spec to_answer_mline(t(), ExSDP.Media.t(), Keyword.t()) :: ExSDP.Media.t() - def to_answer_mline(transceiver, mline, opts) do - if transceiver.codecs == [] do - # reject mline and skip further processing - # see RFC 8299 sec. 5.3.1 and RFC 3264 sec. 6 - %ExSDP.Media{mline | port: 0} - else - offered_direction = ExSDP.Media.get_attribute(mline, :direction) - direction = get_direction(offered_direction, transceiver.direction) - opts = Keyword.put(opts, :direction, direction) - to_mline(transceiver, opts) - end + @spec get_properties(rtp_transceiver()) :: {kind(), properties()} + def get_properties(transceiver) do + GenServer.call(transceiver, :get_properties) end @doc false - @spec to_offer_mline(t(), Keyword.t()) :: ExSDP.Media.t() - def to_offer_mline(transceiver, opts) do - to_mline(transceiver, opts) + @spec update_properties(rtp_transceiver(), map()) :: :ok + def update_properties(transceiver, properties) do + # properties should be a subset of properties() type, but typespecs suck + GenServer.call(transceiver, {:update_properties, properties}) end - # searches for transceiver for a given mline - # if it exists, updates its configuration - # if it doesn't exist, creats a new one - # returns list of updated transceivers - @doc false - def update_or_create(transceivers, mid, mline, config) do - case find_by_mid(transceivers, mid) do - {idx, %__MODULE__{} = tr} -> - List.replace_at(transceivers, idx, update(tr, mline, config)) - - nil -> - codecs = get_codecs(mline, config) - rtp_hdr_exts = get_rtp_hdr_extensions(mline, config) - ssrc = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.SSRC) + @impl true + def init([kind, props]) do + state = %{props | kind: kind, receiver: nil, sender: nil} - tr = %__MODULE__{ - mid: mid, - direction: :recvonly, - kind: mline.type, - codecs: codecs, - rtp_hdr_exts: rtp_hdr_exts, - rtp_receiver: %RTPReceiver{ssrc: ssrc} - } - - transceivers ++ [tr] - end + {:ok, state} end - defp to_mline(transceiver, opts) do - pt = Enum.map(transceiver.codecs, fn codec -> codec.payload_type end) - - media_formats = - Enum.flat_map(transceiver.codecs, fn codec -> - [_type, encoding] = String.split(codec.mime_type, "/") - - rtp_mapping = %ExSDP.Attribute.RTPMapping{ - clock_rate: codec.clock_rate, - encoding: encoding, - params: codec.channels, - payload_type: codec.payload_type - } - - [rtp_mapping, codec.sdp_fmtp_line, codec.rtcp_fbs] - end) - - attributes = - if(Keyword.get(opts, :rtcp, false), do: [{"rtcp", "9 IN IP4 0.0.0.0"}], else: []) ++ - [ - Keyword.get(opts, :direction, transceiver.direction), - {:mid, transceiver.mid}, - {:ice_ufrag, Keyword.fetch!(opts, :ice_ufrag)}, - {:ice_pwd, Keyword.fetch!(opts, :ice_pwd)}, - {:ice_options, Keyword.fetch!(opts, :ice_options)}, - {:fingerprint, Keyword.fetch!(opts, :fingerprint)}, - {:setup, Keyword.fetch!(opts, :setup)}, - :rtcp_mux - ] ++ transceiver.rtp_hdr_exts - - %ExSDP.Media{ - ExSDP.Media.new(transceiver.kind, 9, "UDP/TLS/RTP/SAVPF", pt) - | # mline must be followed by a cline, which must contain - # the default value "IN IP4 0.0.0.0" (as there are no candidates yet) - connection_data: [%ExSDP.ConnectionData{address: {0, 0, 0, 0}}] - } - |> ExSDP.Media.add_attributes(attributes ++ media_formats) - end - - # RFC 3264 (6.1) + RFC 8829 (5.3.1) - # AFAIK one of the cases should always match - # bc we won't assign/create an inactive transceiver to i.e. sendonly mline - # also neither of the arguments should ever be :stopped - defp get_direction(_, :inactive), do: :inactive - defp get_direction(:sendonly, t) when t in [:sendrecv, :recvonly], do: :recvonly - defp get_direction(:recvonly, t) when t in [:sendrecv, :sendonly], do: :sendonly - defp get_direction(o, other) when o in [:sendrecv, nil], do: other - defp get_direction(:inactive, _), do: :inactive - - defp update(transceiver, mline, config) do - codecs = get_codecs(mline, config) - rtp_hdr_exts = get_rtp_hdr_extensions(mline, config) - ssrc = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.SSRC) - rtp_receiver = %RTPReceiver{ssrc: ssrc} - - %__MODULE__{ - transceiver - | codecs: codecs, - rtp_hdr_exts: rtp_hdr_exts, - rtp_receiver: rtp_receiver - } + @impl true + def handle_call(:get_properties, _from, state) do + properties = Map.take(state, [:mid, :direction, :kind, :rtp_hrd_exts, :codecs]) + {:reply, properties, state} end - defp get_codecs(mline, config) do - rtp_mappings = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.RTPMapping) - fmtps = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.FMTP) - all_rtcp_fbs = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.RTCPFeedback) - - rtp_mappings - |> Stream.map(fn rtp_mapping -> - fmtp = Enum.find(fmtps, &(&1.pt == rtp_mapping.payload_type)) - - rtcp_fbs = - all_rtcp_fbs - |> Stream.filter(&(&1.pt == rtp_mapping.payload_type)) - |> Enum.filter(&Configuration.is_supported_rtcp_fb(config, &1)) - - RTPCodecParameters.new(mline.type, rtp_mapping, fmtp, rtcp_fbs) - end) - |> Enum.filter(fn codec -> Configuration.is_supported_codec(config, codec) end) - end + @impl true + def handle_call({:update_properties, properties}, _from, state) do + # TODO: there's more to it that simply overriding the state's values + state = + properties + |> Map.take([:mid, :direction, :rtp_hdr_exts, :codecs]) + |> then(&Map.merge(state, &1)) - defp get_rtp_hdr_extensions(mline, config) do - mline - |> ExSDP.Media.get_attributes(ExSDP.Attribute.Extmap) - |> Enum.filter(&Configuration.is_supported_rtp_hdr_extension(config, &1, mline.type)) + {:reply, :ok, state} end end diff --git a/lib/ex_webrtc/sdp_utils.ex b/lib/ex_webrtc/sdp_utils.ex index ad3f2c8d..30fa7c8b 100644 --- a/lib/ex_webrtc/sdp_utils.ex +++ b/lib/ex_webrtc/sdp_utils.ex @@ -2,6 +2,7 @@ defmodule ExWebRTC.SDPUtils do @moduledoc false alias ExRTP.Packet.Extension.SourceDescription + alias ExWebRTC.RTPTransceiver @spec get_media_direction(ExSDP.Media.t()) :: :sendrecv | :sendonly | :recvonly | :inactive | nil @@ -134,6 +135,79 @@ defmodule ExWebRTC.SDPUtils do |> Map.new() end + @spec to_answer_mline(RTPTransceiver.rtp_transceiver(), ExSDP.Media.t(), Keyword.t()) :: + ExSDP.Media.t() + def to_answer_mline(transceiver, mline, opts) do + {kind, props} = RTPTransceiver.get_properties(transceiver) + + if props.codecs == [] do + # reject mline and skip further processing + # see RFC 8299 sec. 5.3.1 and RFC 3264 sec. 6 + %ExSDP.Media{mline | port: 0} + else + offered_direction = ExSDP.Media.get_attribute(mline, :direction) + direction = get_direction(offered_direction, props.direction) + opts = Keyword.put(opts, :direction, direction) + to_mline(kind, props, opts) + end + end + + @spec to_offer_mline(RTPTransceiver.rtp_transceiver(), Keyword.t()) :: + ExSDP.Media.t() + def to_offer_mline(transceiver, opts) do + {kind, props} = RTPTransceiver.get_properties(transceiver) + to_mline(kind, props, opts) + end + + defp to_mline(kind, props, opts) do + pt = Enum.map(props.codecs, fn codec -> codec.payload_type end) + + media_formats = + Enum.flat_map(props.codecs, fn codec -> + [_type, encoding] = String.split(codec.mime_type, "/") + + rtp_mapping = %ExSDP.Attribute.RTPMapping{ + clock_rate: codec.clock_rate, + encoding: encoding, + params: codec.channels, + payload_type: codec.payload_type + } + + [rtp_mapping, codec.sdp_fmtp_line, codec.rtcp_fbs] + end) + + attributes = + if(Keyword.get(opts, :rtcp, false), do: [{"rtcp", "9 IN IP4 0.0.0.0"}], else: []) ++ + [ + Keyword.get(opts, :direction, props.direction), + {:mid, props.mid}, + {:ice_ufrag, Keyword.fetch!(opts, :ice_ufrag)}, + {:ice_pwd, Keyword.fetch!(opts, :ice_pwd)}, + {:ice_options, Keyword.fetch!(opts, :ice_options)}, + {:fingerprint, Keyword.fetch!(opts, :fingerprint)}, + {:setup, Keyword.fetch!(opts, :setup)}, + :rtcp_mux + ] ++ props.rtp_hdr_exts + + %ExSDP.Media{ + ExSDP.Media.new(kind, 9, "UDP/TLS/RTP/SAVPF", pt) + | # mline must be followed by a cline, which must contain + # the default value "IN IP4 0.0.0.0" (as there are no candidates yet) + connection_data: [%ExSDP.ConnectionData{address: {0, 0, 0, 0}}] + } + |> ExSDP.Media.add_attributes(attributes ++ media_formats) + end + + # RFC 3264 (6.1) + RFC 8829 (5.3.1) + # AFAIK one of the cases should always match + # bc we won't assign/create an inactive transceiver to i.e. sendonly mline + # also neither of the arguments should ever be :stopped + defp get_direction(_, :inactive), do: :inactive + defp get_direction(:sendonly, t) when t in [:sendrecv, :recvonly], do: :recvonly + defp get_direction(:recvonly, t) when t in [:sendrecv, :sendonly], do: :sendonly + defp get_direction(o, other) when o in [:sendrecv, nil], do: other + defp get_direction(:inactive, _), do: :inactive + defp do_get_ice_credentials(sdp_or_mline) do get_attr = case sdp_or_mline do