From cd13e3cfc11249526e2f435bdb0815e91e0446a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Wala?= Date: Fri, 10 Nov 2023 14:51:39 +0100 Subject: [PATCH] Add basic RTP demuxing --- lib/ex_webrtc/demuxer.ex | 73 ++++++++++++++++++++++++++++++++ lib/ex_webrtc/peer_connection.ex | 23 ++++++++-- lib/ex_webrtc/sdp_utils.ex | 32 ++++++++++++++ mix.exs | 2 + mix.lock | 3 ++ 5 files changed, 129 insertions(+), 4 deletions(-) create mode 100644 lib/ex_webrtc/demuxer.ex diff --git a/lib/ex_webrtc/demuxer.ex b/lib/ex_webrtc/demuxer.ex new file mode 100644 index 00000000..bf59c8a3 --- /dev/null +++ b/lib/ex_webrtc/demuxer.ex @@ -0,0 +1,73 @@ +defmodule ExWebRTC.Demuxer do + @moduledoc false + + alias ExRTP.Packet + alias ExRTP.Packet.Extension + alias ExRTP.Packet.Extension.SourceDescription + + defstruct ssrcs: %{}, extensions: %{}, payload_types: %{} + + def process_data(demuxer, data) do + with {:ok, %Packet{} = packet} <- decode(data), + {:ok, demuxer, mid} <- match_to_mid(demuxer, packet) do + {:ok, demuxer, mid, packet} + end + end + + # RFC 8843, 9.2 + defp match_to_mid(demuxer, packet) do + with demuxer <- update_mapping(demuxer, packet), + :error <- match_by_extension(demuxer, packet), + :error <- match_by_payload_type(demuxer, packet) do + {:error, :unmatched_stream} + else + {:ok, mid} -> {:ok, demuxer, mid} + {:ok, _demuxer, _mid} = res -> res + end + end + + defp update_mapping(demuxer, %Packet{ssrc: ssrc, sequence_number: sn} = packet) do + mid = + packet.extensions + |> Enum.find_value(fn %Extension{id: id} = ext -> + case demuxer.extensions[id] do + {SourceDescription, :mid} -> + {:ok, decoded_ext} = SourceDescription.from_raw(ext) + decoded_ext.text + + _other -> + nil + end + end) + + case Map.get(demuxer.ssrcs, ssrc) do + {_last_mid, last_sn} when mid != nil and sn > last_sn -> + put_in(demuxer.ssrcs[ssrc], {mid, sn}) + + nil when mid != nil -> + put_in(demuxer.ssrcs[ssrc], {mid, sn}) + + _other -> + demuxer + end + end + + defp match_by_extension(demuxer, %Packet{ssrc: ssrc}) do + case Map.get(demuxer.ssrcs, ssrc) do + {last_mid, _last_sn} -> {:ok, last_mid} + nil -> :error + end + end + + defp match_by_payload_type(demuxer, %Packet{ssrc: ssrc, payload_type: pt}) do + case Map.get(demuxer.payload_types, pt) do + nil -> :error + mid -> {:ok, put_in(demuxer.ssrcs[ssrc], mid), mid} + end + end + + # RTP & RTCP demuxing, see RFC 6761 + # TODO: handle RTCP + defp decode(<<_, s, _::binary>>) when s in 192..223, do: {:error, :rtcp} + defp decode(data), do: Packet.decode(data) +end diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex index 6a6d91bf..598dcf8d 100644 --- a/lib/ex_webrtc/peer_connection.ex +++ b/lib/ex_webrtc/peer_connection.ex @@ -11,6 +11,7 @@ defmodule ExWebRTC.PeerConnection do alias ExICE.ICEAgent alias ExWebRTC.{ + Demuxer, DTLSTransport, IceCandidate, MediaStreamTrack, @@ -41,6 +42,7 @@ defmodule ExWebRTC.PeerConnection do :ice_agent, :ice_state, :dtls_transport, + demuxer: %Demuxer{}, transceivers: [], signaling_state: :stable, last_offer: nil, @@ -346,9 +348,16 @@ defmodule ExWebRTC.PeerConnection do {:ok, dtls} -> {:noreply, %__MODULE__{state | dtls_transport: dtls}} - {:ok, dtls, payload} -> - notify(state.owner, {:data, payload}) - {:noreply, %__MODULE__{state | dtls_transport: dtls}} + {:ok, dtls, decoded_data} -> + case Demuxer.process_data(state.demuxer, decoded_data) do + {:ok, demuxer, mid, packet} -> + notify(state.owner, {:data, {mid, packet}}) + {:noreply, %__MODULE__{state | dtls_transport: dtls, demuxer: demuxer}} + + {:error, reason} -> + Logger.error("Unable to decode/demux RTP, reason: #{inspect(reason)}") + {:noreply, %__MODULE__{state | dtls_transport: dtls}} + end {:error, reason} -> Logger.error("Unable to process data, reason: #{inspect(reason)}") @@ -372,6 +381,12 @@ defmodule ExWebRTC.PeerConnection do new_transceivers = update_local_transceivers(type, state.transceivers, sdp) state = set_description(:local, type, sdp, state) + demuxer = %{ + state.demuxer + | extensions: SDPUtils.get_extensions(sdp), + payload_types: SDPUtils.get_payload_types(sdp) + } + dtls = if type == :answer do {:setup, setup} = ExSDP.Media.get_attribute(hd(sdp.media), :setup) @@ -380,7 +395,7 @@ defmodule ExWebRTC.PeerConnection do state.dtls_transport end - {:ok, %{state | transceivers: new_transceivers, dtls_transport: dtls}} + {:ok, %{state | transceivers: new_transceivers, dtls_transport: dtls, demuxer: demuxer}} end defp update_local_transceivers(:offer, transceivers, _sdp) do diff --git a/lib/ex_webrtc/sdp_utils.ex b/lib/ex_webrtc/sdp_utils.ex index 2eb751ad..270c4766 100644 --- a/lib/ex_webrtc/sdp_utils.ex +++ b/lib/ex_webrtc/sdp_utils.ex @@ -1,6 +1,8 @@ defmodule ExWebRTC.SDPUtils do @moduledoc false + alias ExRTP.Packet.Extension.SourceDescription + @spec get_media_direction(ExSDP.Media.t()) :: :sendrecv | :sendonly | :recvonly | :inactive | nil def get_media_direction(media) do @@ -99,6 +101,36 @@ defmodule ExWebRTC.SDPUtils do end end + def get_extensions(sdp) do + # we assume that, if extension is present in multiple mlines, the IDs are the same (RFC 8285) + sdp.media + |> Enum.flat_map(&ExSDP.Media.get_attributes(&1, :extmap)) + |> Enum.flat_map(fn + %{id: id, uri: "urn:ietf:params:rtp-hdrext:sdes:mid"} -> [{id, {SourceDescription, :mid}}] + # TODO: handle other types of extensions + _ -> [] + end) + |> Map.new() + end + + def get_payload_types(sdp) do + # if payload type is used in more than 1 mline, it cannot be used to identify the mline + # thus, it is not placed in the returned map + sdp.media + |> Enum.flat_map(fn mline -> + {:mid, mid} = ExSDP.Media.get_attribute(mline, :mid) + encodings = ExSDP.Media.get_attributes(mline, :rtpmap) + + Enum.map(encodings, &{&1.payload_type, mid}) + end) + |> Enum.reduce(%{}, fn + {pt, _mid}, acc when is_map_key(acc, pt) -> Map.put(acc, pt, nil) + {pt, mid}, acc -> Map.put(acc, pt, mid) + end) + |> Enum.reject(fn {_, v} -> is_nil(v) end) + |> Map.new() + end + defp do_get_ice_credentials(sdp_or_mline) do get_attr = case sdp_or_mline do diff --git a/mix.exs b/mix.exs index 46d56344..3d424487 100644 --- a/mix.exs +++ b/mix.exs @@ -49,6 +49,8 @@ defmodule ExWebRTC.MixProject do {:ex_ice, "~> 0.1"}, {:ex_dtls, "~> 0.13"}, {:ex_libsrtp, "~> 0.6"}, + {:ex_rtp, git: "https://github.com/elixir-webrtc/ex_rtp.git"}, + {:ex_rtcp, git: "https://github.com/elixir-webrtc/ex_rtcp.git"}, # dev/test {:excoveralls, "~> 0.14", only: [:dev, :test], runtime: false}, diff --git a/mix.lock b/mix.lock index 3b274569..9f99295f 100644 --- a/mix.lock +++ b/mix.lock @@ -12,6 +12,9 @@ "ex_dtls": {:hex, :ex_dtls, "0.13.0", "4d7631eefc19a8820d4f79883f379ff2ad642976bda55493d4ec4e5d10d6c078", [:mix], [{:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "3ece30967006ec12a4088e60514cb08847814fba8b8a21aca3862e5d1fd4a6bc"}, "ex_ice": {:hex, :ex_ice, "0.1.0", "2653c884872d8769cf9fc655c74002a63ed6c21be1b3c2badfa42bdc74de2355", [:mix], [{:ex_stun, "~> 0.1.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "e2539a321f87f31997ba974d532d00511e5828f2f113b550b1ef6aa799dd2ffe"}, "ex_libsrtp": {:hex, :ex_libsrtp, "0.6.0", "d96cd7fc1780157614f0bf47d31587e5eab953b43067f4885849f8177ec452a9", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "e9ce8a507a658f7e2df72fae82a4b3ba0a056c175f0bc490e79ab03058e094d5"}, + "ex_rctp": {:git, "https://github.com/elixir-webrtc/ex_rtcp.git", "c0cf2b7f995e34d13cee4cbb228376a55700fb6a", []}, + "ex_rtcp": {:git, "https://github.com/elixir-webrtc/ex_rtcp.git", "c0cf2b7f995e34d13cee4cbb228376a55700fb6a", []}, + "ex_rtp": {:git, "https://github.com/elixir-webrtc/ex_rtp.git", "83f088fb6d8448d6d705d26f9ff6e6a35fa8cfd6", []}, "ex_sdp": {:hex, :ex_sdp, "0.13.0", "b464cf5f6b70433159be243115857599f82b07234ee022997868c85ae1f225f7", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:uuid, "~> 1.1", [hex: :uuid, repo: "hexpm", optional: false]}], "hexpm", "975ca4d274240c51ee85909bc0618bd4dd940e69f7d8c8f0d701f1524eaffaad"}, "ex_stun": {:hex, :ex_stun, "0.1.0", "252474bf4c8519fbf4bc0fbfc6a1b846a634b1478c65dbbfb4b6ab4e33c2a95a", [:mix], [], "hexpm", "629fc8be45b624a92522f81d85ba001877b1f0745889a2419bdb678790d7480c"}, "excoveralls": {:hex, :excoveralls, "0.17.1", "83fa7906ef23aa7fc8ad7ee469c357a63b1b3d55dd701ff5b9ce1f72442b2874", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "95bc6fda953e84c60f14da4a198880336205464e75383ec0f570180567985ae0"},