Skip to content

Commit

Permalink
Add basic RTP demuxing
Browse files Browse the repository at this point in the history
  • Loading branch information
LVala committed Nov 10, 2023
1 parent 49278e8 commit 867df70
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 5 deletions.
24 changes: 19 additions & 5 deletions lib/ex_webrtc/peer_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule ExWebRTC.PeerConnection do

require Logger

alias __MODULE__.Configuration
alias __MODULE__.{Configuration, Demuxer}
alias ExICE.ICEAgent

alias ExWebRTC.{
Expand Down Expand Up @@ -41,6 +41,7 @@ defmodule ExWebRTC.PeerConnection do
:ice_agent,
:ice_state,
:dtls_transport,
demuxer: %Demuxer{},
transceivers: [],
signaling_state: :stable,
last_offer: nil,
Expand Down Expand Up @@ -346,9 +347,16 @@ defmodule ExWebRTC.PeerConnection do
{:ok, dtls} ->
{:noreply, %__MODULE__{state | dtls_transport: dtls}}

{:ok, dtls, payload} ->
notify(state.owner, {:data, payload})
{:noreply, %__MODULE__{state | dtls_transport: dtls}}
{:ok, dtls, decoded_data} ->
case Demuxer.process_data(state.demuxer, decoded_data) do
{:ok, demuxer, mid, packet} ->
notify(state.owner, {:data, {mid, packet}})
{:noreply, %__MODULE__{state | dtls_transport: dtls, demuxer: demuxer}}

{:error, reason} ->
Logger.error("Unable to decode/demux RTP, reason: #{inspect(reason)}")
{:noreply, %__MODULE__{state | dtls_transport: dtls}}
end

{:error, reason} ->
Logger.error("Unable to process data, reason: #{inspect(reason)}")
Expand All @@ -372,6 +380,12 @@ defmodule ExWebRTC.PeerConnection do
new_transceivers = update_local_transceivers(type, state.transceivers, sdp)
state = set_description(:local, type, sdp, state)

demuxer = %{
state.demuxer
| extensions: SDPUtils.get_extensions(sdp),
payload_types: SDPUtils.get_payload_types(sdp)
}

dtls =
if type == :answer do
{:setup, setup} = ExSDP.Media.get_attribute(hd(sdp.media), :setup)
Expand All @@ -380,7 +394,7 @@ defmodule ExWebRTC.PeerConnection do
state.dtls_transport
end

{:ok, %{state | transceivers: new_transceivers, dtls_transport: dtls}}
{:ok, %{state | transceivers: new_transceivers, dtls_transport: dtls, demuxer: demuxer}}
end

defp update_local_transceivers(:offer, transceivers, _sdp) do
Expand Down
73 changes: 73 additions & 0 deletions lib/ex_webrtc/peer_connection/demuxer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
defmodule ExWebRTC.PeerConnection.Demuxer do
@moduledoc false

alias ExRTP.Packet
alias ExRTP.Packet.Extension
alias ExRTP.Packet.Extension.SourceDescription

defstruct ssrcs: %{}, extensions: %{}, payload_types: %{}

def process_data(demuxer, data) do
with {:ok, %Packet{} = packet} <- decode(data),
{:ok, demuxer, mid} <- match_to_mid(demuxer, packet) do
{:ok, demuxer, mid, packet}
end
end

# RFC 8843, 9.2
defp match_to_mid(demuxer, packet) do
with demuxer <- update_mapping(demuxer, packet),
:error <- match_by_extension(demuxer, packet),
:error <- match_by_payload_type(demuxer, packet) do
{:error, :unmatched_stream}
else
{:ok, mid} -> {:ok, demuxer, mid}
{:ok, _demuxer, _mid} = res -> res
end
end

defp update_mapping(demuxer, %Packet{ssrc: ssrc, sequence_number: sn} = packet) do
mid =
packet.extensions
|> Enum.find_value(fn %Extension{id: id} = ext ->
case demuxer.extensions[id] do
{SourceDescription, :mid} ->
{:ok, decoded_ext} = SourceDescription.from_raw(ext)
decoded_ext.text

_other ->
nil
end
end)

case Map.get(demuxer.ssrcs, ssrc) do
{_last_mid, last_sn} when mid != nil and sn > last_sn ->
put_in(demuxer.ssrcs[ssrc], {mid, sn})

nil when mid != nil ->
put_in(demuxer.ssrcs[ssrc], {mid, sn})

_other ->
demuxer
end
end

defp match_by_extension(demuxer, %Packet{ssrc: ssrc}) do
case Map.get(demuxer.ssrcs, ssrc) do
{last_mid, _last_sn} -> {:ok, last_mid}
nil -> :error
end
end

defp match_by_payload_type(demuxer, %Packet{ssrc: ssrc, payload_type: pt}) do
case Map.get(demuxer.payload_types, pt) do
nil -> :error
mid -> {:ok, put_in(demuxer.ssrcs[ssrc], mid), mid}
end
end

# RTP & RTCP demuxing, see RFC 6761
# TODO: handle RTCP
defp decode(<<_, s, _::binary>>) when s in 192..223, do: {:error, :rtcp}
defp decode(data), do: Packet.decode(data)
end
32 changes: 32 additions & 0 deletions lib/ex_webrtc/sdp_utils.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule ExWebRTC.SDPUtils do
@moduledoc false

alias ExRTP.Packet.Extension.SourceDescription

@spec get_media_direction(ExSDP.Media.t()) ::
:sendrecv | :sendonly | :recvonly | :inactive | nil
def get_media_direction(media) do
Expand Down Expand Up @@ -99,6 +101,36 @@ defmodule ExWebRTC.SDPUtils do
end
end

def get_extensions(sdp) do
# we assume that, if extension is present in multiple mlines, the IDs are the same (RFC 8285)
sdp.media
|> Enum.flat_map(&ExSDP.Media.get_attributes(&1, :extmap))
|> Enum.flat_map(fn
%{id: id, uri: "urn:ietf:params:rtp-hdrext:sdes:mid"} -> [{id, {SourceDescription, :mid}}]
# TODO: handle other types of extensions
_ -> []
end)
|> Map.new()
end

def get_payload_types(sdp) do
# if payload type is used in more than 1 mline, it cannot be used to identify the mline
# thus, it is not placed in the returned map
sdp.media
|> Enum.flat_map(fn mline ->
{:mid, mid} = ExSDP.Media.get_attribute(mline, :mid)
encodings = ExSDP.Media.get_attributes(mline, :rtpmap)

Enum.map(encodings, &{&1.payload_type, mid})
end)
|> Enum.reduce(%{}, fn
{pt, _mid}, acc when is_map_key(acc, pt) -> Map.put(acc, pt, nil)
{pt, mid}, acc -> Map.put(acc, pt, mid)
end)
|> Enum.reject(fn {_, v} -> is_nil(v) end)
|> Map.new()
end

defp do_get_ice_credentials(sdp_or_mline) do
get_attr =
case sdp_or_mline do
Expand Down
2 changes: 2 additions & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ defmodule ExWebRTC.MixProject do
{:ex_ice, "~> 0.1"},
{:ex_dtls, "~> 0.13"},
{:ex_libsrtp, "~> 0.6"},
{:ex_rtp, git: "https://github.com/elixir-webrtc/ex_rtp.git"},
{:ex_rtcp, git: "https://github.com/elixir-webrtc/ex_rtcp.git"},

# dev/test
{:excoveralls, "~> 0.14", only: [:dev, :test], runtime: false},
Expand Down
3 changes: 3 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
"ex_dtls": {:hex, :ex_dtls, "0.13.0", "4d7631eefc19a8820d4f79883f379ff2ad642976bda55493d4ec4e5d10d6c078", [:mix], [{:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "3ece30967006ec12a4088e60514cb08847814fba8b8a21aca3862e5d1fd4a6bc"},
"ex_ice": {:hex, :ex_ice, "0.1.0", "2653c884872d8769cf9fc655c74002a63ed6c21be1b3c2badfa42bdc74de2355", [:mix], [{:ex_stun, "~> 0.1.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "e2539a321f87f31997ba974d532d00511e5828f2f113b550b1ef6aa799dd2ffe"},
"ex_libsrtp": {:hex, :ex_libsrtp, "0.6.0", "d96cd7fc1780157614f0bf47d31587e5eab953b43067f4885849f8177ec452a9", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "e9ce8a507a658f7e2df72fae82a4b3ba0a056c175f0bc490e79ab03058e094d5"},
"ex_rctp": {:git, "https://github.com/elixir-webrtc/ex_rtcp.git", "c0cf2b7f995e34d13cee4cbb228376a55700fb6a", []},
"ex_rtcp": {:git, "https://github.com/elixir-webrtc/ex_rtcp.git", "c0cf2b7f995e34d13cee4cbb228376a55700fb6a", []},
"ex_rtp": {:git, "https://github.com/elixir-webrtc/ex_rtp.git", "83f088fb6d8448d6d705d26f9ff6e6a35fa8cfd6", []},
"ex_sdp": {:hex, :ex_sdp, "0.13.0", "b464cf5f6b70433159be243115857599f82b07234ee022997868c85ae1f225f7", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:uuid, "~> 1.1", [hex: :uuid, repo: "hexpm", optional: false]}], "hexpm", "975ca4d274240c51ee85909bc0618bd4dd940e69f7d8c8f0d701f1524eaffaad"},
"ex_stun": {:hex, :ex_stun, "0.1.0", "252474bf4c8519fbf4bc0fbfc6a1b846a634b1478c65dbbfb4b6ab4e33c2a95a", [:mix], [], "hexpm", "629fc8be45b624a92522f81d85ba001877b1f0745889a2419bdb678790d7480c"},
"excoveralls": {:hex, :excoveralls, "0.17.1", "83fa7906ef23aa7fc8ad7ee469c357a63b1b3d55dd701ff5b9ce1f72442b2874", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "95bc6fda953e84c60f14da4a198880336205464e75383ec0f570180567985ae0"},
Expand Down

0 comments on commit 867df70

Please sign in to comment.