From 37ebbf0fcca5bdc1635910e98200d6a35eb143bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Wala?= Date: Fri, 24 Nov 2023 17:18:50 +0100 Subject: [PATCH 1/6] Add functionality to loop media back to the browser --- examples/example.exs | 44 ++++---- examples/example.js | 23 ++-- lib/ex_webrtc/dtls_transport.ex | 68 ++++++++---- lib/ex_webrtc/media_stream_track.ex | 9 +- lib/ex_webrtc/peer_connection.ex | 162 +++++++++++++++++++--------- lib/ex_webrtc/rtp_receiver.ex | 9 +- lib/ex_webrtc/rtp_sender.ex | 13 +++ lib/ex_webrtc/rtp_transceiver.ex | 25 +++-- 8 files changed, 230 insertions(+), 123 deletions(-) create mode 100644 lib/ex_webrtc/rtp_sender.ex diff --git a/examples/example.exs b/examples/example.exs index 40d58cb2..37db0c2b 100644 --- a/examples/example.exs +++ b/examples/example.exs @@ -15,12 +15,12 @@ defmodule Peer do %{urls: "stun:stun.l.google.com:19302"} ] - def start_link(mode \\ :passive) do - GenServer.start_link(__MODULE__, mode) + def start_link() do + GenServer.start_link(__MODULE__, nil) end @impl true - def init(mode) do + def init(_) do {:ok, conn} = :gun.open({127, 0, 0, 1}, 4000) {:ok, _protocol} = :gun.await_up(conn) :gun.ws_upgrade(conn, "/websocket") @@ -32,16 +32,7 @@ defmodule Peer do {:ok, pc} = PeerConnection.start_link(ice_servers: @ice_servers) - if mode == :active do - {:ok, _transceiver} = PeerConnection.add_transceiver(pc, :audio) - {:ok, offer} = PeerConnection.create_offer(pc) - :ok = PeerConnection.set_local_description(pc, offer) - msg = %{"type" => "offer", "sdp" => offer.sdp} - :gun.ws_send(conn, stream, {:text, Jason.encode!(msg)}) - Logger.info("Send SDP offer: #{offer.sdp}") - end - - {:ok, %{conn: conn, stream: stream, peer_connection: pc, mode: mode}} + {:ok, %{conn: conn, stream: stream, peer_connection: pc}} other -> Logger.error("Couldn't connect to the signalling server: #{inspect(other)}") @@ -79,9 +70,7 @@ defmodule Peer do @impl true def handle_info({:ex_webrtc, _pid, msg}, state) do - Logger.info("Received ExWebRTC message: #{inspect(msg)}") handle_webrtc_message(msg, state) - {:noreply, state} end @@ -91,17 +80,24 @@ defmodule Peer do {:noreply, state} end - defp handle_ws_message(%{"type" => "offer", "sdp" => sdp}, %{mode: :passive} = state) do + defp handle_ws_message(%{"type" => "offer", "sdp" => sdp}, %{peer_connection: pc} = state) do Logger.info("Received SDP offer: #{inspect(sdp)}") offer = %SessionDescription{type: :offer, sdp: sdp} - :ok = PeerConnection.set_remote_description(state.peer_connection, offer) - {:ok, answer} = PeerConnection.create_answer(state.peer_connection) - :ok = PeerConnection.set_local_description(state.peer_connection, answer) + :ok = PeerConnection.set_remote_description(pc, offer) + {:ok, answer} = PeerConnection.create_answer(pc) + :ok = PeerConnection.set_local_description(pc, answer) msg = %{"type" => "answer", "sdp" => answer.sdp} :gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)}) + + track = ExWebRTC.MediaStreamTrack.new(:video) + {:ok, transceiver} = PeerConnection.add_transceiver(pc, track) + {:ok, offer} = PeerConnection.create_offer(pc) + :ok = PeerConnection.set_local_description(pc, offer) + msg = %{"type" => "offer", "sdp" => offer.sdp} + :gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)}) end - defp handle_ws_message(%{"type" => "answer", "sdp" => sdp}, %{mode: :active} = state) do + defp handle_ws_message(%{"type" => "answer", "sdp" => sdp}, state) do Logger.info("Received SDP answer: #{inspect(sdp)}") answer = %SessionDescription{type: :answer, sdp: sdp} :ok = PeerConnection.set_remote_description(state.peer_connection, answer) @@ -136,13 +132,17 @@ defmodule Peer do :gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)}) end + defp handle_webrtc_message({:rtp, mid, packet}, state) do + Logger.info("Received RTP: #{inspect(packet)}") + PeerConnection.send_rtp(state.peer_connection, "1", packet) + end + defp handle_webrtc_message(msg, _state) do Logger.warning("Received unknown ex_webrtc message: #{inspect(msg)}") end end -mode = :active -{:ok, pid} = Peer.start_link(mode) +{:ok, pid} = Peer.start_link() ref = Process.monitor(pid) receive do diff --git a/examples/example.js b/examples/example.js index ad63e977..2cb579f0 100644 --- a/examples/example.js +++ b/examples/example.js @@ -13,7 +13,14 @@ const start_connection = async (ws) => { pc.oniceconnectionstatechange = _ => console.log("ICE connection state changed:", pc.iceConnectionState); pc.onicegatheringstatechange = _ => console.log("ICE gathering state changed:", pc.iceGatheringState); pc.onsignalingstatechange = _ => console.log("Signaling state changed:", pc.signalingState); - pc.ontrack = event => console.log("New track:", event); + pc.ontrack = event => { + const videoPlayer = document.createElement("video"); + videoPlayer.srcObject = event.streams[0]; + videoPlayer.onloadedmetadata = () => { + videoPlayer.play(); + }; + document.body.appendChild(videoPlayer); + }; pc.onicecandidate = event => { console.log("New local ICE candidate:", event.candidate); @@ -22,7 +29,7 @@ const start_connection = async (ws) => { } }; - const localStream = await navigator.mediaDevices.getUserMedia({audio: true}); + const localStream = await navigator.mediaDevices.getUserMedia({video: true}); for (const track of localStream.getTracks()) { pc.addTrack(track, localStream); } @@ -48,16 +55,12 @@ const start_connection = async (ws) => { } }; - if (mode === "active") { - const desc = await pc.createOffer(); - console.log("Generated SDP offer:", desc); - await pc.setLocalDescription(desc); - ws.send(JSON.stringify(desc)) - } + const desc = await pc.createOffer(); + console.log("Generated SDP offer:", desc); + await pc.setLocalDescription(desc); + ws.send(JSON.stringify(desc)) }; -const mode = "passive" - const ws = new WebSocket("ws://127.0.0.1:4000/websocket"); ws.onclose = event => console.log("WebSocket was closed", event); ws.onopen = _ => start_connection(ws); diff --git a/lib/ex_webrtc/dtls_transport.ex b/lib/ex_webrtc/dtls_transport.ex index 9734005a..0c4ea4f3 100644 --- a/lib/ex_webrtc/dtls_transport.ex +++ b/lib/ex_webrtc/dtls_transport.ex @@ -36,9 +36,15 @@ defmodule ExWebRTC.DTLSTransport do end @doc false - @spec send_data(dtls_transport(), binary()) :: :ok - def send_data(dtls_transport, data) do - GenServer.cast(dtls_transport, {:send_data, data}) + @spec send_rtp(dtls_transport(), binary()) :: :ok + def send_rtp(dtls_transport, data) do + GenServer.cast(dtls_transport, {:send_rtp, data}) + end + + @doc false + @spec send_rtcp(dtls_transport(), binary()) :: :ok + def send_rtcp(dtls_transport, data) do + GenServer.cast(dtls_transport, {:send_rtcp, data}) end @impl true @@ -50,7 +56,6 @@ defmodule ExWebRTC.DTLSTransport do fingerprint = ExDTLS.get_cert_fingerprint(dtls) {:ok, ice_agent} = ice_module.start_link(:controlled, ice_config) - srtp = ExLibSRTP.new() state = %{ owner: owner, @@ -60,7 +65,8 @@ defmodule ExWebRTC.DTLSTransport do cert: cert, pkey: pkey, fingerprint: fingerprint, - srtp: srtp, + in_srtp: ExLibSRTP.new(), + out_srtp: ExLibSRTP.new(), dtls_state: :new, dtls: nil, mode: nil @@ -101,18 +107,19 @@ defmodule ExWebRTC.DTLSTransport do end @impl true - def handle_cast({:send_data, _data}, %{dtls_state: :connected, ice_state: ice_state} = state) + def handle_cast({:send_rtp, data}, %{dtls_state: :connected, ice_state: ice_state} = state) when ice_state in [:connected, :completed] do - # TODO + case ExLibSRTP.protect(state.out_srtp, data) do + {:ok, protected} -> ICEAgent.send_data(state.ice_agent, protected) + {:error, reason} -> Logger.error("Unable to protect RTP: #{inspect(reason)}") + end + {:noreply, state} end @impl true - def handle_cast({:send_data, _data}, state) do - Logger.error( - "Attempted to send data when DTLS handshake was not finished or ICE Transport is unavailable" - ) - + def handle_cast({:send_rtcp, _data}, state) do + # TODO: implement {:noreply, state} end @@ -174,16 +181,16 @@ defmodule ExWebRTC.DTLSTransport do Process.send_after(self(), :dtls_timeout, timeout) %{state | dtls_state: :connecting, buffered_packets: packets} - {:handshake_finished, _, remote_keying_material, profile, packets} -> + {:handshake_finished, lkm, rkm, profile, packets} -> Logger.debug("DTLS handshake finished") ICEAgent.send_data(state.ice_agent, packets) # TODO: validate fingerprint - state = setup_srtp(state, remote_keying_material, profile) + state = setup_srtp(state, lkm, rkm, profile) %{state | dtls_state: :connected} - {:handshake_finished, _, remote_keying_material, profile} -> + {:handshake_finished, lkm, rkm, profile} -> Logger.debug("DTLS handshake finished") - state = setup_srtp(state, remote_keying_material, profile) + state = setup_srtp(state, lkm, rkm, profile) %{state | dtls_state: :connected} :handshake_want_read -> @@ -193,13 +200,18 @@ defmodule ExWebRTC.DTLSTransport do defp handle_ice({:data, <> = data}, %{dtls_state: :connected} = state) when f in 128..191 do - case ExLibSRTP.unprotect(state.srtp, data) do + {type, unprotect} = + case data do + <<_, s, _::binary>> when s in 192..223 -> {:rtcp, &ExLibSRTP.unprotect_rtcp/2} + _ -> {:rtp, &ExLibSRTP.unprotect/2} + end + + case unprotect.(state.in_srtp, data) do {:ok, payload} -> - # TODO: temporarily, everything goes to peer connection process - send(state.owner, {:rtp_data, payload}) + send(state.owner, {type, payload}) {:error, reason} -> - Logger.warning("Failed to decrypt SRTP, reason: #{inspect(reason)}") + Logger.error("Failed to decrypt SRTP/SRTCP, reason: #{inspect(reason)}") end state @@ -246,18 +258,28 @@ defmodule ExWebRTC.DTLSTransport do defp handle_ice(_msg, state), do: state - defp setup_srtp(state, remote_keying_material, profile) do + defp setup_srtp(state, local_keying_material, remote_keying_material, profile) do {:ok, crypto_profile} = ExLibSRTP.Policy.crypto_profile_from_dtls_srtp_protection_profile(profile) - policy = %ExLibSRTP.Policy{ + inbound_policy = %ExLibSRTP.Policy{ ssrc: :any_inbound, key: remote_keying_material, rtp: crypto_profile, rtcp: crypto_profile } - :ok = ExLibSRTP.add_stream(state.srtp, policy) + :ok = ExLibSRTP.add_stream(state.in_srtp, inbound_policy) + + outbound_policy = %ExLibSRTP.Policy{ + ssrc: :any_outbound, + key: local_keying_material, + rtp: crypto_profile, + rtcp: crypto_profile + } + + :ok = ExLibSRTP.add_stream(state.out_srtp, outbound_policy) + state end end diff --git a/lib/ex_webrtc/media_stream_track.ex b/lib/ex_webrtc/media_stream_track.ex index d68cf249..49351980 100644 --- a/lib/ex_webrtc/media_stream_track.ex +++ b/lib/ex_webrtc/media_stream_track.ex @@ -5,15 +5,14 @@ defmodule ExWebRTC.MediaStreamTrack do @type t() :: %__MODULE__{ kind: :audio | :video, - id: integer(), - mid: String.t() + id: integer() } @enforce_keys [:id, :kind] - defstruct @enforce_keys ++ [:mid] + defstruct @enforce_keys - def from_transceiver(tr) do - %__MODULE__{kind: tr.kind, id: generate_id(), mid: tr.mid} + def new(kind) do + %__MODULE__{kind: kind, id: generate_id()} end defp generate_id() do diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex index 9c0f127c..a7161a4f 100644 --- a/lib/ex_webrtc/peer_connection.ex +++ b/lib/ex_webrtc/peer_connection.ex @@ -15,6 +15,8 @@ defmodule ExWebRTC.PeerConnection do IceCandidate, MediaStreamTrack, RTPTransceiver, + RTPReceiver, + RTPSender, SDPUtils, SessionDescription, Utils @@ -96,7 +98,11 @@ defmodule ExWebRTC.PeerConnection do GenServer.call(peer_connection, :get_transceivers) end - @spec add_transceiver(peer_connection(), RTPTransceiver.kind(), transceiver_options()) :: + @spec add_transceiver( + peer_connection(), + RTPTransceiver.kind() | MediaStreamTrack.t(), + transceiver_options() + ) :: {:ok, RTPTransceiver.t()} | {:error, :TODO} def add_transceiver(peer_connection, kind, options \\ []) do GenServer.call(peer_connection, {:add_transceiver, kind, options}) @@ -107,6 +113,11 @@ defmodule ExWebRTC.PeerConnection do GenServer.stop(peer_connection) end + @spec send_rtp(peer_connection(), String.t(), ExRTP.Packet.t()) :: :ok + def send_rtp(peer_connection, mid, packet) do + GenServer.cast(peer_connection, {:send_rtp, mid, packet}) + end + #### CALLBACKS #### @impl true @@ -306,26 +317,41 @@ defmodule ExWebRTC.PeerConnection do @impl true def handle_call({:add_transceiver, kind, options}, _from, state) when kind in [:audio, :video] do - direction = Keyword.get(options, :direction, :sendrcv) + transceiver = new_transceiver(kind, nil, options, state.config) + transceivers = state.transceivers ++ [transceiver] - {rtp_hdr_exts, codecs} = - case kind do - :audio -> {state.config.audio_rtp_hdr_exts, state.config.audio_codecs} - :video -> {state.config.video_rtp_hdr_exts, state.config.video_codecs} - end + {:reply, {:ok, transceiver}, %{state | transceivers: transceivers}} + end - transceiver = %RTPTransceiver{ - mid: nil, - direction: direction, - kind: kind, - codecs: codecs, - rtp_hdr_exts: rtp_hdr_exts - } + @impl true + def handle_call({:add_transceiver, %MediaStreamTrack{} = track, options}, _from, state) do + transceiver = new_transceiver(track.kind, track, options, state.config) + transceivers = state.transceivers ++ [transceiver] - transceivers = List.insert_at(state.transceivers, -1, transceiver) {:reply, {:ok, transceiver}, %{state | transceivers: transceivers}} end + @impl true + def handle_cast({:send_rtp, mid, packet}, state) do + # TODO: this is very temporary + id = + Enum.find_value(state.demuxer.extensions, fn + {ext_id, {ExRTP.Packet.Extension.SourceDescription, :mid}} -> ext_id + _ -> nil + end) + + mid_ext = + %ExRTP.Packet.Extension.SourceDescription{text: mid} + |> ExRTP.Packet.Extension.SourceDescription.to_raw(id) + + packet + |> ExRTP.Packet.set_extension(:two_byte, [mid_ext]) + |> ExRTP.Packet.encode() + |> then(&DTLSTransport.send_rtp(state.dtls_transport, &1)) + + {:noreply, state} + end + @impl true def handle_info({:ex_ice, _from, :connected}, state) do {:noreply, %__MODULE__{state | ice_state: :connected}} @@ -346,10 +372,10 @@ defmodule ExWebRTC.PeerConnection do end @impl true - def handle_info({:rtp_data, data}, state) do + def handle_info({:rtp, data}, state) do case Demuxer.demux(state.demuxer, data) do {:ok, demuxer, mid, packet} -> - notify(state.owner, {:data, {mid, packet}}) + notify(state.owner, {:rtp, mid, packet}) {:noreply, %__MODULE__{state | demuxer: demuxer}} {:error, reason} -> @@ -361,9 +387,32 @@ defmodule ExWebRTC.PeerConnection do @impl true def handle_info(msg, state) do Logger.info("OTHER MSG #{inspect(msg)}") + {:noreply, state} end + defp new_transceiver(kind, sender_track, options, config) do + direction = Keyword.get(options, :direction, :sendrcv) + + {rtp_hdr_exts, codecs} = + case kind do + :audio -> {config.audio_rtp_hdr_exts, config.audio_codecs} + :video -> {config.video_rtp_hdr_exts, config.video_codecs} + end + + track = MediaStreamTrack.new(kind) + + %RTPTransceiver{ + mid: nil, + direction: direction, + kind: kind, + codecs: codecs, + rtp_hdr_exts: rtp_hdr_exts, + receiver: %RTPReceiver{track: track}, + sender: %RTPSender{track: sender_track} + } + end + defp apply_local_description(type, sdp, state) do new_transceivers = update_local_transceivers(type, state.transceivers, sdp) state = set_description(:local, type, sdp, state) @@ -374,15 +423,12 @@ defmodule ExWebRTC.PeerConnection do pt_to_mid: SDPUtils.get_payload_types(sdp) } - dtls = - if type == :answer do - {:setup, setup} = ExSDP.Media.get_attribute(hd(sdp.media), :setup) - :ok = DTLSTransport.start_dtls(state.dtls_transport, setup) - else - state.dtls_transport - end + if type == :answer do + {:setup, setup} = ExSDP.Media.get_attribute(hd(sdp.media), :setup) + DTLSTransport.start_dtls(state.dtls_transport, setup) + end - {:ok, %{state | transceivers: new_transceivers, dtls_transport: dtls, demuxer: demuxer}} + {:ok, %{state | transceivers: new_transceivers, demuxer: demuxer}} end defp update_local_transceivers(:offer, transceivers, _sdp) do @@ -411,30 +457,30 @@ defmodule ExWebRTC.PeerConnection do RTPTransceiver.find_by_mid(state.transceivers, tr.mid) == nil and tr.direction in [:recvonly, :sendrecv] end) - |> Enum.map(fn tr -> MediaStreamTrack.from_transceiver(tr) end) + |> Enum.map(fn tr -> MediaStreamTrack.new(tr.kind) end) for track <- new_remote_tracks do notify(state.owner, {:track, track}) end + tracks_map = Map.new(new_remote_tracks, &{&1.id, &1}) + state = %{state | tracks: Map.merge(state.tracks, tracks_map)} + state = set_description(:remote, type, sdp, state) - dtls = - if type == :answer do - {:setup, setup} = ExSDP.Media.get_attribute(hd(sdp.media), :setup) + if type == :answer do + {:setup, setup} = ExSDP.Media.get_attribute(hd(sdp.media), :setup) - setup = - case setup do - :active -> :passive - :passive -> :active - end + setup = + case setup do + :active -> :passive + :passive -> :active + end - :ok = DTLSTransport.start_dtls(state.dtls_transport, setup) - else - state.dtls_transport - end + DTLSTransport.start_dtls(state.dtls_transport, setup) + end - {:ok, %{state | transceivers: new_transceivers, dtls_transport: dtls}} + {:ok, %{state | transceivers: new_transceivers}} end end @@ -464,24 +510,34 @@ defmodule ExWebRTC.PeerConnection do defp find_next_mid(state) do # next mid must be unique, it's acomplished by looking for values # greater than any mid in remote description or our own transceivers - crd_mids = - if is_nil(state.current_remote_desc) do - [] + crd_mids = get_desc_mids(state.current_remote_desc) + tsc_mids = get_transceiver_mids(state.transceivers) + + Enum.max(crd_mids ++ tsc_mids, &>=/2, fn -> -1 end) + 1 + end + + defp get_desc_mids(nil), do: [] + + defp get_desc_mids({_, remote_desc}) do + Enum.flat_map(remote_desc.media, fn mline -> + with {:mid, mid} <- ExSDP.Media.get_attribute(mline, :mid), + {mid, ""} <- Integer.parse(mid) do + [mid] else - for mline <- state.current_remote_desc.media, - {:mid, mid} <- ExSDP.Media.get_attribute(mline, :mid), - {mid, ""} <- Integer.parse(mid) do - mid - end + _ -> [] end + end) + end - tsc_mids = - for %RTPTransceiver{mid: mid} when mid != nil <- state.transceivers, - {mid, ""} <- Integer.parse(mid) do - mid + defp get_transceiver_mids(transceivers) do + Enum.flat_map(transceivers, fn transceiver -> + with mid when mid != nil <- transceiver.mid, + {mid, ""} <- Integer.parse(mid) do + [mid] + else + _ -> [] end - - Enum.max(crd_mids ++ tsc_mids, &>=/2, fn -> -1 end) + 1 + end) end defp check_desc_altered(:offer, sdp, %{last_offer: offer}) when sdp == offer, do: :ok diff --git a/lib/ex_webrtc/rtp_receiver.ex b/lib/ex_webrtc/rtp_receiver.ex index fd1a8338..5356cf88 100644 --- a/lib/ex_webrtc/rtp_receiver.ex +++ b/lib/ex_webrtc/rtp_receiver.ex @@ -2,5 +2,12 @@ defmodule ExWebRTC.RTPReceiver do @moduledoc """ RTPReceiver """ - defstruct [:ssrc] + + alias ExWebRTC.MediaStreamTrack + + @type t() :: %__MODULE__{ + track: MediaStreamTrack.t() | nil + } + + defstruct [:track] end diff --git a/lib/ex_webrtc/rtp_sender.ex b/lib/ex_webrtc/rtp_sender.ex new file mode 100644 index 00000000..a497b2f9 --- /dev/null +++ b/lib/ex_webrtc/rtp_sender.ex @@ -0,0 +1,13 @@ +defmodule ExWebRTC.RTPSender do + @moduledoc """ + RTPSender + """ + + alias ExWebRTC.MediaStreamTrack + + @type t() :: %__MODULE__{ + track: MediaStreamTrack.t() | nil + } + + defstruct [:track] +end diff --git a/lib/ex_webrtc/rtp_transceiver.ex b/lib/ex_webrtc/rtp_transceiver.ex index 82d24c4a..da012589 100644 --- a/lib/ex_webrtc/rtp_transceiver.ex +++ b/lib/ex_webrtc/rtp_transceiver.ex @@ -3,7 +3,13 @@ defmodule ExWebRTC.RTPTransceiver do RTPTransceiver """ - alias ExWebRTC.{PeerConnection.Configuration, RTPCodecParameters, RTPReceiver} + alias ExWebRTC.{ + PeerConnection.Configuration, + RTPCodecParameters, + RTPReceiver, + RTPSender, + MediaStreamTrack + } @type direction() :: :sendonly | :recvonly | :sendrecv | :inactive | :stopped @type kind() :: :audio | :video @@ -14,11 +20,13 @@ defmodule ExWebRTC.RTPTransceiver do kind: kind(), rtp_hdr_exts: [ExSDP.Attribute.Extmap.t()], codecs: [RTPCodecParameters.t()], - rtp_receiver: nil + receiver: nil, + sender: nil } @enforce_keys [:mid, :direction, :kind] - defstruct @enforce_keys ++ [codecs: [], rtp_hdr_exts: [], rtp_receiver: %RTPReceiver{}] + defstruct @enforce_keys ++ + [codecs: [], rtp_hdr_exts: [], receiver: %RTPReceiver{}, sender: %RTPSender{}] @doc false def find_by_mid(transceivers, mid) do @@ -61,7 +69,8 @@ defmodule ExWebRTC.RTPTransceiver do nil -> codecs = get_codecs(mline, config) rtp_hdr_exts = get_rtp_hdr_extensions(mline, config) - ssrc = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.SSRC) + + track = MediaStreamTrack.new(mline.type) tr = %__MODULE__{ mid: mid, @@ -69,7 +78,7 @@ defmodule ExWebRTC.RTPTransceiver do kind: mline.type, codecs: codecs, rtp_hdr_exts: rtp_hdr_exts, - rtp_receiver: %RTPReceiver{ssrc: ssrc} + receiver: %RTPReceiver{track: track} } transceivers ++ [tr] @@ -128,14 +137,12 @@ defmodule ExWebRTC.RTPTransceiver do defp update(transceiver, mline, config) do codecs = get_codecs(mline, config) rtp_hdr_exts = get_rtp_hdr_extensions(mline, config) - ssrc = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.SSRC) - rtp_receiver = %RTPReceiver{ssrc: ssrc} + # TODO: potentially update tracks %__MODULE__{ transceiver | codecs: codecs, - rtp_hdr_exts: rtp_hdr_exts, - rtp_receiver: rtp_receiver + rtp_hdr_exts: rtp_hdr_exts } end From 1672d8d611e398cc4f240a11218766461cfa18e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Wala?= Date: Fri, 24 Nov 2023 17:51:50 +0100 Subject: [PATCH 2/6] Fix tests --- lib/ex_webrtc/dtls_transport.ex | 6 ++++++ lib/ex_webrtc/peer_connection.ex | 6 ++---- test/ex_webrtc/dtls_transport_test.exs | 2 +- test/ex_webrtc/peer_connection_test.exs | 4 ++-- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/lib/ex_webrtc/dtls_transport.ex b/lib/ex_webrtc/dtls_transport.ex index 0c4ea4f3..04968e7e 100644 --- a/lib/ex_webrtc/dtls_transport.ex +++ b/lib/ex_webrtc/dtls_transport.ex @@ -117,6 +117,12 @@ defmodule ExWebRTC.DTLSTransport do {:noreply, state} end + @impl true + def handle_cast({:send_rtp, _data}, state) do + Logger.warning("Attemped to send RTP before DTLS handshake has been finished") + {:noreply, state} + end + @impl true def handle_cast({:send_rtcp, _data}, state) do # TODO: implement diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex index a7161a4f..64d5508b 100644 --- a/lib/ex_webrtc/peer_connection.ex +++ b/lib/ex_webrtc/peer_connection.ex @@ -387,7 +387,6 @@ defmodule ExWebRTC.PeerConnection do @impl true def handle_info(msg, state) do Logger.info("OTHER MSG #{inspect(msg)}") - {:noreply, state} end @@ -450,6 +449,8 @@ defmodule ExWebRTC.PeerConnection do :ok = ICEAgent.set_remote_credentials(state.ice_agent, ice_ufrag, ice_pwd) :ok = ICEAgent.gather_candidates(state.ice_agent) + # TODO: this needs a look + new_remote_tracks = new_transceivers # only take new transceivers that can receive tracks @@ -463,9 +464,6 @@ defmodule ExWebRTC.PeerConnection do notify(state.owner, {:track, track}) end - tracks_map = Map.new(new_remote_tracks, &{&1.id, &1}) - state = %{state | tracks: Map.merge(state.tracks, tracks_map)} - state = set_description(:remote, type, sdp, state) if type == :answer do diff --git a/test/ex_webrtc/dtls_transport_test.exs b/test/ex_webrtc/dtls_transport_test.exs index 9763fc9e..9bdd0cf5 100644 --- a/test/ex_webrtc/dtls_transport_test.exs +++ b/test/ex_webrtc/dtls_transport_test.exs @@ -54,7 +54,7 @@ defmodule ExWebRTC.DTLSTransportTest do end test "cannot send data when handshake not finished", %{dtls: dtls} do - DTLSTransport.send_data(dtls, <<1, 2, 3>>) + DTLSTransport.send_rtp(dtls, <<1, 2, 3>>) refute_receive {:fake_ice, _data} end diff --git a/test/ex_webrtc/peer_connection_test.exs b/test/ex_webrtc/peer_connection_test.exs index dfdeb876..f08cb600 100644 --- a/test/ex_webrtc/peer_connection_test.exs +++ b/test/ex_webrtc/peer_connection_test.exs @@ -15,7 +15,7 @@ defmodule ExWebRTC.PeerConnectionTest do offer = %SessionDescription{type: :offer, sdp: File.read!("test/fixtures/audio_sdp.txt")} :ok = PeerConnection.set_remote_description(pc, offer) - assert_receive {:ex_webrtc, ^pc, {:track, %MediaStreamTrack{mid: "0", kind: :audio}}} + assert_receive {:ex_webrtc, ^pc, {:track, %MediaStreamTrack{kind: :audio}}} offer = %SessionDescription{ type: :offer, @@ -24,7 +24,7 @@ defmodule ExWebRTC.PeerConnectionTest do :ok = PeerConnection.set_remote_description(pc, offer) - assert_receive {:ex_webrtc, ^pc, {:track, %MediaStreamTrack{mid: "1", kind: :video}}} + assert_receive {:ex_webrtc, ^pc, {:track, %MediaStreamTrack{kind: :video}}} refute_receive {:ex_webrtc, ^pc, {:track, %MediaStreamTrack{}}} end From 72d592437f5d516d3e5ae17aabd4cb576069254f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Wala?= Date: Mon, 27 Nov 2023 09:58:52 +0100 Subject: [PATCH 3/6] Improve the elixir example --- examples/example.exs | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/examples/example.exs b/examples/example.exs index 37db0c2b..cf39b7ac 100644 --- a/examples/example.exs +++ b/examples/example.exs @@ -8,7 +8,7 @@ defmodule Peer do require Logger - alias ExWebRTC.{IceCandidate, PeerConnection, SessionDescription} + alias ExWebRTC.{IceCandidate, PeerConnection, SessionDescription, RTPTransceiver} @ice_servers [ # %{urls: "stun:stun.stunprotocol.org:3478"}, @@ -32,7 +32,7 @@ defmodule Peer do {:ok, pc} = PeerConnection.start_link(ice_servers: @ice_servers) - {:ok, %{conn: conn, stream: stream, peer_connection: pc}} + {:ok, %{conn: conn, stream: stream, peer_connection: pc, other_mid: nil}} other -> Logger.error("Couldn't connect to the signalling server: #{inspect(other)}") @@ -55,9 +55,10 @@ defmodule Peer do @impl true def handle_info({:gun_ws, _, _, {:text, msg}}, state) do - msg - |> Jason.decode!() - |> handle_ws_message(state) + state = + msg + |> Jason.decode!() + |> handle_ws_message(state) {:noreply, state} end @@ -90,17 +91,22 @@ defmodule Peer do :gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)}) track = ExWebRTC.MediaStreamTrack.new(:video) - {:ok, transceiver} = PeerConnection.add_transceiver(pc, track) + {:ok, _} = PeerConnection.add_transceiver(pc, track) {:ok, offer} = PeerConnection.create_offer(pc) :ok = PeerConnection.set_local_description(pc, offer) msg = %{"type" => "offer", "sdp" => offer.sdp} :gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)}) + + [_, %RTPTransceiver{mid: mid}] = PeerConnection.get_transceivers(pc) + + %{state | other_mid: mid} end defp handle_ws_message(%{"type" => "answer", "sdp" => sdp}, state) do Logger.info("Received SDP answer: #{inspect(sdp)}") answer = %SessionDescription{type: :answer, sdp: sdp} :ok = PeerConnection.set_remote_description(state.peer_connection, answer) + state end defp handle_ws_message(%{"type" => "ice", "data" => data}, state) do @@ -114,10 +120,13 @@ defmodule Peer do } :ok = PeerConnection.add_ice_candidate(state.peer_connection, candidate) + + state end - defp handle_ws_message(msg, _state) do + defp handle_ws_message(msg, state) do Logger.info("Received unexpected message: #{inspect(msg)}") + state end defp handle_webrtc_message({:ice_candidate, candidate}, state) do @@ -132,9 +141,13 @@ defmodule Peer do :gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)}) end - defp handle_webrtc_message({:rtp, mid, packet}, state) do + defp handle_webrtc_message({:rtp, _mid, _packet}, %{other_mid: nil}) do + Logger.warning("Received RTP, but out transceiver has not beed created") + end + + defp handle_webrtc_message({:rtp, _mid, packet}, state) do Logger.info("Received RTP: #{inspect(packet)}") - PeerConnection.send_rtp(state.peer_connection, "1", packet) + PeerConnection.send_rtp(state.peer_connection, state.other_mid, packet) end defp handle_webrtc_message(msg, _state) do From 10e4231bb31b61c04e45a736ec348cdb7eef4b83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Wala?= Date: Mon, 27 Nov 2023 11:49:16 +0100 Subject: [PATCH 4/6] Use `track_id` instead of `mid` when sending RTP --- examples/example.exs | 10 +++--- lib/ex_webrtc/dtls_transport.ex | 1 + lib/ex_webrtc/peer_connection.ex | 54 ++++++++++++++++++++++---------- lib/ex_webrtc/rtp_transceiver.ex | 4 +-- 4 files changed, 44 insertions(+), 25 deletions(-) diff --git a/examples/example.exs b/examples/example.exs index cf39b7ac..39265717 100644 --- a/examples/example.exs +++ b/examples/example.exs @@ -32,7 +32,7 @@ defmodule Peer do {:ok, pc} = PeerConnection.start_link(ice_servers: @ice_servers) - {:ok, %{conn: conn, stream: stream, peer_connection: pc, other_mid: nil}} + {:ok, %{conn: conn, stream: stream, peer_connection: pc, track_id: nil}} other -> Logger.error("Couldn't connect to the signalling server: #{inspect(other)}") @@ -97,9 +97,7 @@ defmodule Peer do msg = %{"type" => "offer", "sdp" => offer.sdp} :gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)}) - [_, %RTPTransceiver{mid: mid}] = PeerConnection.get_transceivers(pc) - - %{state | other_mid: mid} + %{state | track_id: track.id} end defp handle_ws_message(%{"type" => "answer", "sdp" => sdp}, state) do @@ -141,13 +139,13 @@ defmodule Peer do :gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)}) end - defp handle_webrtc_message({:rtp, _mid, _packet}, %{other_mid: nil}) do + defp handle_webrtc_message({:rtp, _mid, _packet}, %{track_id: nil}) do Logger.warning("Received RTP, but out transceiver has not beed created") end defp handle_webrtc_message({:rtp, _mid, packet}, state) do Logger.info("Received RTP: #{inspect(packet)}") - PeerConnection.send_rtp(state.peer_connection, state.other_mid, packet) + PeerConnection.send_rtp(state.peer_connection, state.track_id, packet) end defp handle_webrtc_message(msg, _state) do diff --git a/lib/ex_webrtc/dtls_transport.ex b/lib/ex_webrtc/dtls_transport.ex index 04968e7e..077eaaf6 100644 --- a/lib/ex_webrtc/dtls_transport.ex +++ b/lib/ex_webrtc/dtls_transport.ex @@ -172,6 +172,7 @@ defmodule ExWebRTC.DTLSTransport do end defp handle_ice({:data, <> = data}, state) when f in 20..64 do + # TODO: handle {:connection_closed, _} case ExDTLS.handle_data(state.dtls, data) do {:handshake_packets, packets, timeout} when state.ice_state in [:connected, :completed] -> :ok = ICEAgent.send_data(state.ice_agent, packets) diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex index 64d5508b..9dac60d0 100644 --- a/lib/ex_webrtc/peer_connection.ex +++ b/lib/ex_webrtc/peer_connection.ex @@ -114,8 +114,8 @@ defmodule ExWebRTC.PeerConnection do end @spec send_rtp(peer_connection(), String.t(), ExRTP.Packet.t()) :: :ok - def send_rtp(peer_connection, mid, packet) do - GenServer.cast(peer_connection, {:send_rtp, mid, packet}) + def send_rtp(peer_connection, track_id, packet) do + GenServer.cast(peer_connection, {:send_rtp, track_id, packet}) end #### CALLBACKS #### @@ -332,22 +332,37 @@ defmodule ExWebRTC.PeerConnection do end @impl true - def handle_cast({:send_rtp, mid, packet}, state) do - # TODO: this is very temporary - id = + def handle_cast({:send_rtp, track_id, packet}, state) do + sdes_id = Enum.find_value(state.demuxer.extensions, fn {ext_id, {ExRTP.Packet.Extension.SourceDescription, :mid}} -> ext_id _ -> nil end) - mid_ext = - %ExRTP.Packet.Extension.SourceDescription{text: mid} - |> ExRTP.Packet.Extension.SourceDescription.to_raw(id) + # TODO: iterating over transceivers is not optimal + # but this is, most likely, going to be refactored anyways + maybe_transceiver = + Enum.find(state.transceivers, fn + %{sender: %{track: %{id: id}}} -> id == track_id + _ -> false + end) - packet - |> ExRTP.Packet.set_extension(:two_byte, [mid_ext]) - |> ExRTP.Packet.encode() - |> then(&DTLSTransport.send_rtp(state.dtls_transport, &1)) + case maybe_transceiver do + %RTPTransceiver{mid: mid} -> + mid_ext = + %ExRTP.Packet.Extension.SourceDescription{text: mid} + |> ExRTP.Packet.Extension.SourceDescription.to_raw(sdes_id) + + packet + |> ExRTP.Packet.set_extension(:two_byte, [mid_ext]) + |> ExRTP.Packet.encode() + |> then(&DTLSTransport.send_rtp(state.dtls_transport, &1)) + + nil -> + Logger.warning( + "Attempted to send packet to track with unrecognized id: #{inspect(track_id)}" + ) + end {:noreply, state} end @@ -373,10 +388,15 @@ defmodule ExWebRTC.PeerConnection do @impl true def handle_info({:rtp, data}, state) do - case Demuxer.demux(state.demuxer, data) do - {:ok, demuxer, mid, packet} -> - notify(state.owner, {:rtp, mid, packet}) - {:noreply, %__MODULE__{state | demuxer: demuxer}} + with {:ok, demuxer, mid, packet} <- Demuxer.demux(state.demuxer, data), + %RTPTransceiver{} = t <- Enum.find(state.transceivers, &(&1.mid == mid)) do + track_id = t.receiver.track.id + notify(state.owner, {:rtp, track_id, packet}) + {:noreply, %__MODULE__{state | demuxer: demuxer}} + else + nil -> + Logger.warning("Received RTP with unrecognized MID: #{inspect(data)}") + {:noreply, state} {:error, reason} -> Logger.error("Unable to demux RTP, reason: #{inspect(reason)}") @@ -458,7 +478,7 @@ defmodule ExWebRTC.PeerConnection do RTPTransceiver.find_by_mid(state.transceivers, tr.mid) == nil and tr.direction in [:recvonly, :sendrecv] end) - |> Enum.map(fn tr -> MediaStreamTrack.new(tr.kind) end) + |> Enum.map(fn tr -> tr.receiver.track end) for track <- new_remote_tracks do notify(state.owner, {:track, track}) diff --git a/lib/ex_webrtc/rtp_transceiver.ex b/lib/ex_webrtc/rtp_transceiver.ex index da012589..e8397c37 100644 --- a/lib/ex_webrtc/rtp_transceiver.ex +++ b/lib/ex_webrtc/rtp_transceiver.ex @@ -20,8 +20,8 @@ defmodule ExWebRTC.RTPTransceiver do kind: kind(), rtp_hdr_exts: [ExSDP.Attribute.Extmap.t()], codecs: [RTPCodecParameters.t()], - receiver: nil, - sender: nil + receiver: RTPReceiver.t(), + sender: RTPSender.t() } @enforce_keys [:mid, :direction, :kind] From a4a77f672e1b56c4bbe7bacad84aaacf7d03d4dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Wala?= Date: Tue, 28 Nov 2023 10:06:42 +0100 Subject: [PATCH 5/6] Apply requested changes --- lib/ex_webrtc/dtls_transport.ex | 2 +- lib/ex_webrtc/peer_connection.ex | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/ex_webrtc/dtls_transport.ex b/lib/ex_webrtc/dtls_transport.ex index 93154228..a9e7e2b7 100644 --- a/lib/ex_webrtc/dtls_transport.ex +++ b/lib/ex_webrtc/dtls_transport.ex @@ -139,7 +139,7 @@ defmodule ExWebRTC.DTLSTransport do @impl true def handle_cast({:send_rtp, _data}, state) do - Logger.warning("Attemped to send RTP before DTLS handshake has been finished") + Logger.warning("Attemped to send RTP before DTLS handshake has been finished. Ignoring.") {:noreply, state} end diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex index 3439f331..3ba3ff28 100644 --- a/lib/ex_webrtc/peer_connection.ex +++ b/lib/ex_webrtc/peer_connection.ex @@ -361,13 +361,13 @@ defmodule ExWebRTC.PeerConnection do # TODO: iterating over transceivers is not optimal # but this is, most likely, going to be refactored anyways - maybe_transceiver = + transceiver = Enum.find(state.transceivers, fn %{sender: %{track: %{id: id}}} -> id == track_id _ -> false end) - case maybe_transceiver do + case transceiver do %RTPTransceiver{mid: mid} -> mid_ext = %ExRTP.Packet.Extension.SourceDescription{text: mid} From e60c5e0a191cfcb7aa95d739a0de831eded3a95c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Wala?= Date: Tue, 28 Nov 2023 10:28:21 +0100 Subject: [PATCH 6/6] Change return value of `setup_srtp` --- lib/ex_webrtc/dtls_transport.ex | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/ex_webrtc/dtls_transport.ex b/lib/ex_webrtc/dtls_transport.ex index a9e7e2b7..a34bb8e7 100644 --- a/lib/ex_webrtc/dtls_transport.ex +++ b/lib/ex_webrtc/dtls_transport.ex @@ -213,12 +213,12 @@ defmodule ExWebRTC.DTLSTransport do Logger.debug("DTLS handshake finished") ICEAgent.send_data(state.ice_agent, packets) # TODO: validate fingerprint - state = setup_srtp(state, lkm, rkm, profile) + :ok = setup_srtp(state, lkm, rkm, profile) update_dtls_state(state, :connected) {:handshake_finished, lkm, rkm, profile} -> Logger.debug("DTLS handshake finished") - state = setup_srtp(state, lkm, rkm, profile) + :ok = setup_srtp(state, lkm, rkm, profile) update_dtls_state(state, :connected) :handshake_want_read -> @@ -306,7 +306,7 @@ defmodule ExWebRTC.DTLSTransport do :ok = ExLibSRTP.add_stream(state.out_srtp, outbound_policy) - state + :ok end defp update_dtls_state(%{dtls_state: dtls_state} = state, dtls_state), do: state