Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert RTPTransceiver to separate process #18

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 69 additions & 13 deletions lib/ex_webrtc/peer_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule ExWebRTC.PeerConnection do
DTLSTransport,
IceCandidate,
MediaStreamTrack,
RTPCodecParameters,
RTPTransceiver,
SDPUtils,
SessionDescription,
Expand Down Expand Up @@ -91,13 +92,15 @@ defmodule ExWebRTC.PeerConnection do
GenServer.call(peer_connection, {:add_ice_candidate, candidate})
end

@spec get_transceivers(peer_connection()) :: [RTPTransceiver.t()]
@spec get_transceivers(peer_connection()) :: [
{mid :: String.t(), RTPTransceiver.rtp_transceiver()}
]
def get_transceivers(peer_connection) do
GenServer.call(peer_connection, :get_transceivers)
end

@spec add_transceiver(peer_connection(), RTPTransceiver.kind(), transceiver_options()) ::
{:ok, RTPTransceiver.t()} | {:error, :TODO}
{:ok, RTPTransceiver.rtp_transceiver()} | {:error, :TODO}
def add_transceiver(peer_connection, kind, options \\ []) do
GenServer.call(peer_connection, {:add_transceiver, kind, options})
end
Expand Down Expand Up @@ -161,7 +164,7 @@ defmodule ExWebRTC.PeerConnection do
rtcp: true
]

mlines = Enum.map(transceivers, &RTPTransceiver.to_offer_mline(&1, opts))
mlines = Enum.map(transceivers, fn {_, t} -> SDPUtils.to_offer_mline(t, opts) end)

mids =
Enum.map(mlines, fn mline ->
Expand Down Expand Up @@ -218,8 +221,8 @@ defmodule ExWebRTC.PeerConnection do
mlines =
Enum.map(remote_offer.media, fn mline ->
{:mid, mid} = ExSDP.Media.get_attribute(mline, :mid)
{_ix, transceiver} = RTPTransceiver.find_by_mid(state.transceivers, mid)
RTPTransceiver.to_answer_mline(transceiver, mline, opts)
{_, transceiver} = Enum.find(state.transceivers, fn {m, _} -> m == mid end)
SDPUtils.to_answer_mline(transceiver, mline, opts)
end)

mids =
Expand Down Expand Up @@ -314,15 +317,16 @@ defmodule ExWebRTC.PeerConnection do
:video -> {state.config.video_rtp_hdr_exts, state.config.video_codecs}
end

transceiver = %RTPTransceiver{
props = %{
mid: nil,
direction: direction,
kind: kind,
codecs: codecs,
rtp_hdr_exts: rtp_hdr_exts
}

transceivers = List.insert_at(state.transceivers, -1, transceiver)
{:ok, transceiver} = RTPTransceiver.start_link(kind, props)

transceivers = state.transceivers ++ [{nil, transceiver}]
{:reply, {:ok, transceiver}, %{state | transceivers: transceivers}}
end

Expand Down Expand Up @@ -408,7 +412,7 @@ defmodule ExWebRTC.PeerConnection do
new_transceivers
# only take new transceivers that can receive tracks
|> Enum.filter(fn tr ->
RTPTransceiver.find_by_mid(state.transceivers, tr.mid) == nil and
Enum.all?(state.transceivers, fn {m, _} -> tr.mid != m end) and
tr.direction in [:recvonly, :sendrecv]
end)
|> Enum.map(fn tr -> MediaStreamTrack.from_transceiver(tr) end)
Expand Down Expand Up @@ -442,7 +446,7 @@ defmodule ExWebRTC.PeerConnection do
Enum.reduce_while(sdp.media, {:ok, transceivers}, fn mline, {:ok, transceivers} ->
case ExSDP.Media.get_attribute(mline, :mid) do
{:mid, mid} ->
transceivers = RTPTransceiver.update_or_create(transceivers, mid, mline, config)
transceivers = update_or_create_transceiver(transceivers, mid, mline, config)
{:cont, {:ok, transceivers}}

_other ->
Expand All @@ -451,11 +455,63 @@ defmodule ExWebRTC.PeerConnection do
end)
end

# searches for transceiver for a given mline
# if it exists, updates its configuration
# if it doesn't exist, creats a new one
# returns list of updated transceivers
defp update_or_create_transceiver(transceivers, mid, mline, config) do
codecs = get_codecs(mline, config)
rtp_hdr_exts = get_rtp_hdr_extensions(mline, config)
props = %{codecs: codecs, rtp_hdr_exts: rtp_hdr_exts}

Enum.find(transceivers, fn {m, _} -> m == mid end)
|> case do
{_, tr} ->
:ok = RTPTransceiver.update_properties(tr, props)
transceivers

nil ->
props = Map.merge(props, %{mid: mid, direction: :recvonly})
{:ok, tr} = RTPTransceiver.start_link(mline.type, props)
transceivers ++ [{mid, tr}]
end
end

defp get_codecs(mline, config) do
rtp_mappings = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.RTPMapping)
fmtps = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.FMTP)
all_rtcp_fbs = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.RTCPFeedback)

rtp_mappings
|> Stream.map(fn rtp_mapping ->
fmtp = Enum.find(fmtps, &(&1.pt == rtp_mapping.payload_type))

rtcp_fbs =
all_rtcp_fbs
|> Stream.filter(&(&1.pt == rtp_mapping.payload_type))
|> Enum.filter(&Configuration.is_supported_rtcp_fb(config, &1))

RTPCodecParameters.new(mline.type, rtp_mapping, fmtp, rtcp_fbs)
end)
|> Enum.filter(fn codec -> Configuration.is_supported_codec(config, codec) end)
end

defp get_rtp_hdr_extensions(mline, config) do
mline
|> ExSDP.Media.get_attributes(ExSDP.Attribute.Extmap)
|> Enum.filter(&Configuration.is_supported_rtp_hdr_extension(config, &1, mline.type))
end

defp assign_mids(transceivers, next_mid) do
{new_transceivers, _next_mid} =
Enum.map_reduce(transceivers, next_mid, fn
%{mid: nil} = t, nm -> {%{t | mid: to_string(nm)}, nm + 1}
other, nm -> {other, nm}
{nil, tr}, nm ->
mid = to_string(nm)
:ok = RTPTransceiver.update_properties(tr, %{mid: mid})
{{mid, tr}, nm + 1}

other, nm ->
{other, nm}
end)

new_transceivers
Expand All @@ -476,7 +532,7 @@ defmodule ExWebRTC.PeerConnection do
end

tsc_mids =
for %RTPTransceiver{mid: mid} when mid != nil <- state.transceivers,
for {mid, _transceiver} when mid != nil <- state.transceivers,
{mid, ""} <- Integer.parse(mid) do
mid
end
Expand Down
170 changes: 34 additions & 136 deletions lib/ex_webrtc/rtp_transceiver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,164 +3,62 @@ defmodule ExWebRTC.RTPTransceiver do
RTPTransceiver
"""

alias ExWebRTC.{PeerConnection.Configuration, RTPCodecParameters, RTPReceiver}
use GenServer

alias ExWebRTC.RTPCodecParameters

@type rtp_transceiver() :: GenServer.server()

@type direction() :: :sendonly | :recvonly | :sendrecv | :inactive | :stopped
@type kind() :: :audio | :video

@type t() :: %__MODULE__{
mid: String.t(),
@type properties() :: %{
mid: String.t() | nil,
direction: direction(),
kind: kind(),
rtp_hdr_exts: [ExSDP.Attribute.Extmap.t()],
codecs: [RTPCodecParameters.t()],
rtp_receiver: nil
codecs: [RTPCodecParameters.t()]
}

@enforce_keys [:mid, :direction, :kind]
defstruct @enforce_keys ++ [codecs: [], rtp_hdr_exts: [], rtp_receiver: %RTPReceiver{}]

@doc false
def find_by_mid(transceivers, mid) do
transceivers
|> Enum.with_index(fn tr, idx -> {idx, tr} end)
|> Enum.find(fn {_idx, tr} -> tr.mid == mid end)
@spec start_link(kind(), properties()) :: GenServer.on_start()
def start_link(kind, properties) do
GenServer.start_link(__MODULE__, [kind, properties])
end

@doc false
@spec to_answer_mline(t(), ExSDP.Media.t(), Keyword.t()) :: ExSDP.Media.t()
def to_answer_mline(transceiver, mline, opts) do
if transceiver.codecs == [] do
# reject mline and skip further processing
# see RFC 8299 sec. 5.3.1 and RFC 3264 sec. 6
%ExSDP.Media{mline | port: 0}
else
offered_direction = ExSDP.Media.get_attribute(mline, :direction)
direction = get_direction(offered_direction, transceiver.direction)
opts = Keyword.put(opts, :direction, direction)
to_mline(transceiver, opts)
end
@spec get_properties(rtp_transceiver()) :: {kind(), properties()}
def get_properties(transceiver) do
GenServer.call(transceiver, :get_properties)
end

@doc false
@spec to_offer_mline(t(), Keyword.t()) :: ExSDP.Media.t()
def to_offer_mline(transceiver, opts) do
to_mline(transceiver, opts)
@spec update_properties(rtp_transceiver(), map()) :: :ok
def update_properties(transceiver, properties) do
# properties should be a subset of properties() type, but typespecs suck
GenServer.call(transceiver, {:update_properties, properties})
end

# searches for transceiver for a given mline
# if it exists, updates its configuration
# if it doesn't exist, creats a new one
# returns list of updated transceivers
@doc false
def update_or_create(transceivers, mid, mline, config) do
case find_by_mid(transceivers, mid) do
{idx, %__MODULE__{} = tr} ->
List.replace_at(transceivers, idx, update(tr, mline, config))

nil ->
codecs = get_codecs(mline, config)
rtp_hdr_exts = get_rtp_hdr_extensions(mline, config)
ssrc = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.SSRC)
@impl true
def init([kind, props]) do
state = %{props | kind: kind, receiver: nil, sender: nil}

tr = %__MODULE__{
mid: mid,
direction: :recvonly,
kind: mline.type,
codecs: codecs,
rtp_hdr_exts: rtp_hdr_exts,
rtp_receiver: %RTPReceiver{ssrc: ssrc}
}

transceivers ++ [tr]
end
{:ok, state}
end

defp to_mline(transceiver, opts) do
pt = Enum.map(transceiver.codecs, fn codec -> codec.payload_type end)

media_formats =
Enum.flat_map(transceiver.codecs, fn codec ->
[_type, encoding] = String.split(codec.mime_type, "/")

rtp_mapping = %ExSDP.Attribute.RTPMapping{
clock_rate: codec.clock_rate,
encoding: encoding,
params: codec.channels,
payload_type: codec.payload_type
}

[rtp_mapping, codec.sdp_fmtp_line, codec.rtcp_fbs]
end)

attributes =
if(Keyword.get(opts, :rtcp, false), do: [{"rtcp", "9 IN IP4 0.0.0.0"}], else: []) ++
[
Keyword.get(opts, :direction, transceiver.direction),
{:mid, transceiver.mid},
{:ice_ufrag, Keyword.fetch!(opts, :ice_ufrag)},
{:ice_pwd, Keyword.fetch!(opts, :ice_pwd)},
{:ice_options, Keyword.fetch!(opts, :ice_options)},
{:fingerprint, Keyword.fetch!(opts, :fingerprint)},
{:setup, Keyword.fetch!(opts, :setup)},
:rtcp_mux
] ++ transceiver.rtp_hdr_exts

%ExSDP.Media{
ExSDP.Media.new(transceiver.kind, 9, "UDP/TLS/RTP/SAVPF", pt)
| # mline must be followed by a cline, which must contain
# the default value "IN IP4 0.0.0.0" (as there are no candidates yet)
connection_data: [%ExSDP.ConnectionData{address: {0, 0, 0, 0}}]
}
|> ExSDP.Media.add_attributes(attributes ++ media_formats)
end

# RFC 3264 (6.1) + RFC 8829 (5.3.1)
# AFAIK one of the cases should always match
# bc we won't assign/create an inactive transceiver to i.e. sendonly mline
# also neither of the arguments should ever be :stopped
defp get_direction(_, :inactive), do: :inactive
defp get_direction(:sendonly, t) when t in [:sendrecv, :recvonly], do: :recvonly
defp get_direction(:recvonly, t) when t in [:sendrecv, :sendonly], do: :sendonly
defp get_direction(o, other) when o in [:sendrecv, nil], do: other
defp get_direction(:inactive, _), do: :inactive

defp update(transceiver, mline, config) do
codecs = get_codecs(mline, config)
rtp_hdr_exts = get_rtp_hdr_extensions(mline, config)
ssrc = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.SSRC)
rtp_receiver = %RTPReceiver{ssrc: ssrc}

%__MODULE__{
transceiver
| codecs: codecs,
rtp_hdr_exts: rtp_hdr_exts,
rtp_receiver: rtp_receiver
}
@impl true
def handle_call(:get_properties, _from, state) do
properties = Map.take(state, [:mid, :direction, :kind, :rtp_hrd_exts, :codecs])
{:reply, properties, state}
end

defp get_codecs(mline, config) do
rtp_mappings = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.RTPMapping)
fmtps = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.FMTP)
all_rtcp_fbs = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.RTCPFeedback)

rtp_mappings
|> Stream.map(fn rtp_mapping ->
fmtp = Enum.find(fmtps, &(&1.pt == rtp_mapping.payload_type))

rtcp_fbs =
all_rtcp_fbs
|> Stream.filter(&(&1.pt == rtp_mapping.payload_type))
|> Enum.filter(&Configuration.is_supported_rtcp_fb(config, &1))

RTPCodecParameters.new(mline.type, rtp_mapping, fmtp, rtcp_fbs)
end)
|> Enum.filter(fn codec -> Configuration.is_supported_codec(config, codec) end)
end
@impl true
def handle_call({:update_properties, properties}, _from, state) do
# TODO: there's more to it that simply overriding the state's values
state =
properties
|> Map.take([:mid, :direction, :rtp_hdr_exts, :codecs])
|> then(&Map.merge(state, &1))

defp get_rtp_hdr_extensions(mline, config) do
mline
|> ExSDP.Media.get_attributes(ExSDP.Attribute.Extmap)
|> Enum.filter(&Configuration.is_supported_rtp_hdr_extension(config, &1, mline.type))
{:reply, :ok, state}
end
end
Loading
Loading