From 3ba67dc09405d14acdd1d5f278d0280014d9c9e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Wala?= Date: Tue, 14 Nov 2023 12:41:02 +0100 Subject: [PATCH] Apply requested changes --- lib/ex_webrtc/peer_connection.ex | 2 +- lib/ex_webrtc/peer_connection/demuxer.ex | 23 ++++++++++++++--- lib/ex_webrtc/sdp_utils.ex | 2 ++ test/peer_connection/demuxer_test.exs | 33 ++++++++++++------------ 4 files changed, 39 insertions(+), 21 deletions(-) diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex index b75cd48b..d021b293 100644 --- a/lib/ex_webrtc/peer_connection.ex +++ b/lib/ex_webrtc/peer_connection.ex @@ -348,7 +348,7 @@ defmodule ExWebRTC.PeerConnection do {:noreply, %__MODULE__{state | dtls_transport: dtls}} {:ok, dtls, decoded_data} -> - case Demuxer.process_data(state.demuxer, decoded_data) do + case Demuxer.demux(state.demuxer, decoded_data) do {:ok, demuxer, mid, packet} -> notify(state.owner, {:data, {mid, packet}}) {:noreply, %__MODULE__{state | dtls_transport: dtls, demuxer: demuxer}} diff --git a/lib/ex_webrtc/peer_connection/demuxer.ex b/lib/ex_webrtc/peer_connection/demuxer.ex index 3951e5c3..6109dc05 100644 --- a/lib/ex_webrtc/peer_connection/demuxer.ex +++ b/lib/ex_webrtc/peer_connection/demuxer.ex @@ -1,13 +1,27 @@ defmodule ExWebRTC.PeerConnection.Demuxer do @moduledoc false + # RTP demuxing flow: + # 1. if packet has sdes mid extension, add it to mapping ssrc => mid + # - if mapping already exista, raise (temporary) + # 2. if ssrc => mid mapping exists, route the packet accordingly + # 3. if not, check payload_type => mid mapping + # - if exists, route accordingly + # - otherwise, drop the packet alias ExRTP.Packet alias ExRTP.Packet.Extension alias ExRTP.Packet.Extension.SourceDescription + @type t() :: %__MODULE__{ + ssrc_to_mid: %{non_neg_integer() => {non_neg_integer(), binary()}}, + extensions: %{non_neg_integer() => {module(), atom()}}, + pt_to_mid: %{non_neg_integer() => binary()} + } + defstruct ssrc_to_mid: %{}, extensions: %{}, pt_to_mid: %{} - def process_data(demuxer, data) do + @spec demux(t(), binary()) :: {:ok, t(), binary(), ExRTP.Packet.t()} | {:error, atom()} + def demux(demuxer, data) do with {:ok, %Packet{} = packet} <- decode(data), {:ok, demuxer, mid} <- match_to_mid(demuxer, packet) do {:ok, demuxer, mid, packet} @@ -39,8 +53,9 @@ defmodule ExWebRTC.PeerConnection.Demuxer do end) case Map.get(demuxer.ssrc_to_mid, ssrc) do - {_last_mid, last_sn} when mid != nil and sn > last_sn -> - put_in(demuxer.ssrc_to_mid[ssrc], {mid, sn}) + {last_mid, last_sn} when mid != nil and mid != last_mid and sn > last_sn -> + # temporary, as we belive this case shouldn't occur + raise "Received new MID for already mapped SSRC" nil when mid != nil -> put_in(demuxer.ssrc_to_mid[ssrc], {mid, sn}) @@ -52,7 +67,7 @@ defmodule ExWebRTC.PeerConnection.Demuxer do defp match_by_payload_type(demuxer, %Packet{ssrc: ssrc, payload_type: pt, sequence_number: sn}) do case Map.get(demuxer.pt_to_mid, pt) do - nil -> :error + nil -> {:error, :no_matching_mid} mid -> {:ok, put_in(demuxer.ssrc_to_mid[ssrc], {mid, sn}), mid} end end diff --git a/lib/ex_webrtc/sdp_utils.ex b/lib/ex_webrtc/sdp_utils.ex index 270c4766..1ec9633a 100644 --- a/lib/ex_webrtc/sdp_utils.ex +++ b/lib/ex_webrtc/sdp_utils.ex @@ -101,6 +101,7 @@ defmodule ExWebRTC.SDPUtils do end end + @spec get_extensions(ExSDP.t()) :: %{non_neg_integer() => {module(), atom()}} def get_extensions(sdp) do # we assume that, if extension is present in multiple mlines, the IDs are the same (RFC 8285) sdp.media @@ -113,6 +114,7 @@ defmodule ExWebRTC.SDPUtils do |> Map.new() end + @spec get_payload_types(ExSDP.t()) :: %{non_neg_integer() => binary()} def get_payload_types(sdp) do # if payload type is used in more than 1 mline, it cannot be used to identify the mline # thus, it is not placed in the returned map diff --git a/test/peer_connection/demuxer_test.exs b/test/peer_connection/demuxer_test.exs index ab2a56b0..b2e1476b 100644 --- a/test/peer_connection/demuxer_test.exs +++ b/test/peer_connection/demuxer_test.exs @@ -10,7 +10,7 @@ defmodule ExWebRTC.PeerConnection.DemuxerTest do @sequence_number 500 @payload_type 111 @ssrc 333_333 - @raw_packet %Packet{ + @deserialized_packet %Packet{ payload_type: @payload_type, sequence_number: @sequence_number, timestamp: 0, @@ -18,25 +18,27 @@ defmodule ExWebRTC.PeerConnection.DemuxerTest do payload: <<>> } - @packet Packet.encode(@raw_packet) - @packet_mid Packet.encode( - Packet.set_extension(@raw_packet, :two_byte, [%Extension{id: 15, data: @mid}]) - ) + @packet Packet.encode(@deserialized_packet) + @packet_mid @deserialized_packet + |> Packet.set_extension(:two_byte, [%Extension{id: 15, data: @mid}]) + |> Packet.encode() + @demuxer %Demuxer{extensions: %{15 => {Extension.SourceDescription, :mid}}} - test "ssrc already mapped and, without extension" do + test "ssrc already mapped, without extension" do seq_num = 1 demuxer = %Demuxer{@demuxer | ssrc_to_mid: %{@ssrc => {@mid, seq_num}}} - assert {:ok, new_demuxer, @mid, _packet} = Demuxer.process_data(demuxer, @packet) + assert {:ok, new_demuxer, @mid, _packet} = Demuxer.demux(demuxer, @packet) assert new_demuxer == %Demuxer{demuxer | ssrc_to_mid: %{@ssrc => {@mid, seq_num}}} end test "ssrc already mapped, with extension with the same mid and bigger sequence number" do - demuxer = %Demuxer{@demuxer | ssrc_to_mid: %{@ssrc => {@mid, 1}}} + seq_num = 1 + demuxer = %Demuxer{@demuxer | ssrc_to_mid: %{@ssrc => {@mid, seq_num}}} - assert {:ok, new_demuxer, @mid, _packet} = Demuxer.process_data(demuxer, @packet_mid) - assert new_demuxer == %Demuxer{demuxer | ssrc_to_mid: %{@ssrc => {@mid, @sequence_number}}} + assert {:ok, new_demuxer, @mid, _packet} = Demuxer.demux(demuxer, @packet_mid) + assert new_demuxer == %Demuxer{demuxer | ssrc_to_mid: %{@ssrc => {@mid, seq_num}}} end test "ssrc already mapped, with extension with new mid and smaller sequence number" do @@ -44,7 +46,7 @@ defmodule ExWebRTC.PeerConnection.DemuxerTest do mid = "2" demuxer = %Demuxer{@demuxer | ssrc_to_mid: %{@ssrc => {mid, seq_num}}} - assert {:ok, new_demuxer, ^mid, _packet} = Demuxer.process_data(demuxer, @packet_mid) + assert {:ok, new_demuxer, ^mid, _packet} = Demuxer.demux(demuxer, @packet_mid) assert new_demuxer == %Demuxer{demuxer | ssrc_to_mid: %{@ssrc => {mid, seq_num}}} end @@ -53,12 +55,11 @@ defmodule ExWebRTC.PeerConnection.DemuxerTest do mid = "2" demuxer = %Demuxer{@demuxer | ssrc_to_mid: %{@ssrc => {mid, seq_num}}} - assert {:ok, new_demuxer, @mid, _packet} = Demuxer.process_data(demuxer, @packet_mid) - assert new_demuxer == %Demuxer{demuxer | ssrc_to_mid: %{@ssrc => {@mid, @sequence_number}}} + assert_raise(RuntimeError, fn -> Demuxer.demux(demuxer, @packet_mid) end) end test "ssrc not mapped, with extension" do - assert {:ok, new_demuxer, @mid, _packet} = Demuxer.process_data(@demuxer, @packet_mid) + assert {:ok, new_demuxer, @mid, _packet} = Demuxer.demux(@demuxer, @packet_mid) assert new_demuxer == %Demuxer{@demuxer | ssrc_to_mid: %{@ssrc => {@mid, @sequence_number}}} end @@ -66,11 +67,11 @@ defmodule ExWebRTC.PeerConnection.DemuxerTest do mid = "2" demuxer = %Demuxer{@demuxer | pt_to_mid: %{@payload_type => mid}} - assert {:ok, new_demuxer, ^mid, _packet} = Demuxer.process_data(demuxer, @packet) + assert {:ok, new_demuxer, ^mid, _packet} = Demuxer.demux(demuxer, @packet) assert new_demuxer == %Demuxer{demuxer | ssrc_to_mid: %{@ssrc => {mid, @sequence_number}}} end test "unmatchable ssrc" do - assert :error = Demuxer.process_data(@demuxer, @packet) + assert {:error, :no_matching_mid} = Demuxer.demux(@demuxer, @packet) end end