From 7dadcc2fcf24c270d900fa0a8a30227c9016c786 Mon Sep 17 00:00:00 2001 From: Mateusz Front Date: Tue, 17 Sep 2024 09:58:54 +0200 Subject: [PATCH] Ex webrtc 0.4 (#7) * bump ex_webrtc to 0.4 * Keyframe interval (#6) * Add a keyframe requester * Fix * Update last keyframe request timestamp * Request keyframes on a timer * Satisfy credo * Bump version * Start timers on first received packet * Fix not updating state * Apply reviewers suggestion * send keyframe request events on relevant pads, add tests * fixes for CR --------- Co-authored-by: Jakub Pryc <94321002+Noarkhh@users.noreply.github.com> --- README.md | 2 +- lib/membrane_webrtc/ex_webrtc/sink.ex | 56 ++++---- lib/membrane_webrtc/ex_webrtc/source.ex | 113 ++++++++++++---- lib/membrane_webrtc/ex_webrtc/utils.ex | 8 +- lib/membrane_webrtc/source.ex | 31 +++-- mix.exs | 4 +- mix.lock | 10 +- test/membrane_webrtc/integration_test.exs | 154 ++++++++++++++++++++++ 8 files changed, 302 insertions(+), 76 deletions(-) diff --git a/README.md b/README.md index 432e6af..011abb2 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_webrtc_plugin` to your list of ```elixir def deps do [ - {:membrane_webrtc_plugin, "~> 0.21.0"} + {:membrane_webrtc_plugin, "~> 0.22.0"} ] end ``` diff --git a/lib/membrane_webrtc/ex_webrtc/sink.ex b/lib/membrane_webrtc/ex_webrtc/sink.ex index 105295b..1487316 100644 --- a/lib/membrane_webrtc/ex_webrtc/sink.ex +++ b/lib/membrane_webrtc/ex_webrtc/sink.ex @@ -25,6 +25,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do @max_rtp_timestamp 2 ** 32 - 1 @max_rtp_seq_num 2 ** 16 - 1 + @keyframe_request_throttle_time Membrane.Time.milliseconds(500) @impl true def handle_init(_ctx, opts) do @@ -40,8 +41,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do audio_params: ExWebRTCUtils.codec_params(:opus), video_params: ExWebRTCUtils.codec_params(opts.video_codec), video_codec: opts.video_codec, - ice_servers: opts.ice_servers, - last_keyframe_request_ts: nil + ice_servers: opts.ice_servers }} end @@ -85,12 +85,14 @@ defmodule Membrane.WebRTC.ExWebRTCSink do negotiated_tracks = List.delete(negotiated_tracks, track) params = %{ + kind: track.kind, clock_rate: case track.kind do :audio -> ExWebRTCUtils.codec_clock_rate(:opus) :video -> ExWebRTCUtils.codec_clock_rate(state.video_codec) end, - seq_num: Enum.random(0..@max_rtp_seq_num) + seq_num: Enum.random(0..@max_rtp_seq_num), + last_keyframe_request_ts: Membrane.Time.monotonic_time() - @keyframe_request_throttle_time } input_tracks = Map.put(input_tracks, pad, {track.id, params}) @@ -137,39 +139,35 @@ defmodule Membrane.WebRTC.ExWebRTCSink do end @impl true - def handle_info({:ex_webrtc, _from, {:rtcp, rtcp_packets}}, ctx, state) do - pli? = + def handle_info({:ex_webrtc, _from, {:rtcp, rtcp_packets}}, _ctx, state) do + time = Membrane.Time.monotonic_time() + + {keyframe_requests, input_tracks} = rtcp_packets - |> Enum.reduce(false, fn - %PLI{} = packet, _pli? -> + |> Enum.filter(fn + {_track_id, %PLI{} = packet} -> Membrane.Logger.debug("Keyframe request received: #{inspect(packet)}") true - packet, pli? -> + packet -> Membrane.Logger.debug_verbose("Ignoring RTCP packet: #{inspect(packet)}") - pli? + false + end) + |> Enum.flat_map_reduce(state.input_tracks, fn {track_id, _pli}, input_tracks -> + {pad, {_id, props}} = + Enum.find(input_tracks, fn {_pad, {id, _props}} -> track_id == id end) + + if props.kind == :video and + time - props.last_keyframe_request_ts > @keyframe_request_throttle_time do + event = [event: {pad, %Membrane.KeyframeRequestEvent{}}] + props = %{props | last_keyframe_request_ts: time} + {event, %{input_tracks | pad => {track_id, props}}} + else + {[], input_tracks} + end end) - now = System.os_time(:millisecond) |> Membrane.Time.milliseconds() - then = state.last_keyframe_request_ts - - request_keyframe? = pli? and (then == nil or now - then >= Membrane.Time.second()) - state = if request_keyframe?, do: %{state | last_keyframe_request_ts: now}, else: state - - actions = - if request_keyframe? do - ctx.pads - |> Enum.flat_map(fn {pad_ref, pad_data} -> - case pad_data.options.kind do - :video -> [event: {pad_ref, %Membrane.KeyframeRequestEvent{}}] - :audio -> [] - end - end) - else - [] - end - - {actions, state} + {keyframe_requests, %{state | input_tracks: input_tracks}} end @impl true diff --git a/lib/membrane_webrtc/ex_webrtc/source.ex b/lib/membrane_webrtc/ex_webrtc/source.ex index b350eda..e9a60f6 100644 --- a/lib/membrane_webrtc/ex_webrtc/source.ex +++ b/lib/membrane_webrtc/ex_webrtc/source.ex @@ -8,7 +8,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do alias ExWebRTC.{ICECandidate, PeerConnection, SessionDescription} alias Membrane.WebRTC.{ExWebRTCUtils, SignalingChannel, SimpleWebSocketServer} - def_options signaling: [], video_codec: [], ice_servers: [] + def_options signaling: [], video_codec: [], ice_servers: [], keyframe_interval: [] def_output_pad :output, accepted_format: Membrane.RTP, @@ -16,19 +16,49 @@ defmodule Membrane.WebRTC.ExWebRTCSource do flow_control: :push, options: [kind: [default: nil]] + defmodule State do + @moduledoc false + + @type output_track :: %{ + status: :awaiting | :connected, + pad: Membrane.Pad.ref() | nil, + track: ExWebRTC.MediaStreamTrack.t(), + first_packet_received: boolean() + } + + @type t :: %__MODULE__{ + pc: pid() | nil, + output_tracks: %{(pad_id :: term()) => output_track()}, + awaiting_outputs: [{:video | :audio, Membrane.Pad.ref()}], + awaiting_candidates: [ExWebRTC.ICECandidate.t()], + signaling: SignalingChannel.t() | {:websocket, SimpleWebSocketServer.options()}, + status: :init | :connecting | :connected | :closed, + audio_params: [ExWebRTC.RTPCodecParameters.t()], + video_params: [ExWebRTC.RTPCodecParameters.t()], + ice_servers: [ExWebRTC.PeerConnection.Configuration.ice_server()], + keyframe_interval: Membrane.Time.t() | nil + } + + @enforce_keys [:signaling, :audio_params, :video_params, :ice_servers, :keyframe_interval] + defstruct @enforce_keys ++ + [ + pc: nil, + output_tracks: %{}, + awaiting_outputs: [], + awaiting_candidates: [], + status: :init + ] + end + @impl true def handle_init(_ctx, opts) do {[], - %{ - pc: nil, - output_tracks: %{}, - awaiting_outputs: [], - awaiting_candidates: [], + %State{ signaling: opts.signaling, - status: :init, audio_params: ExWebRTCUtils.codec_params(:opus), video_params: ExWebRTCUtils.codec_params(opts.video_codec), - ice_servers: opts.ice_servers + ice_servers: opts.ice_servers, + keyframe_interval: opts.keyframe_interval }} end @@ -69,7 +99,9 @@ defmodule Membrane.WebRTC.ExWebRTCSource do def handle_pad_added(Pad.ref(:output, pad_id) = pad, _ctx, state) do state = state - |> update_in([:output_tracks, pad_id], fn {:awaiting, _track} -> {:connected, pad} end) + |> Bunch.Struct.update_in([:output_tracks, pad_id], fn output_track -> + %{output_track | status: :connected, pad: pad} + end) |> maybe_answer() {[stream_format: {pad, %Membrane.RTP{}}], state} @@ -80,7 +112,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do track_id = state.output_tracks |> Enum.find_value(fn - {track_id, {:connected, ^pad}} -> track_id + {track_id, %{status: :connected, pad: ^pad}} -> track_id _other -> false end) @@ -110,19 +142,32 @@ defmodule Membrane.WebRTC.ExWebRTCSource do metadata: %{rtp: packet |> Map.from_struct() |> Map.delete(:payload)} } - %{output_tracks: output_tracks} = state - - case output_tracks[id] do - {:connected, pad} -> - {[buffer: {pad, buffer}], state} - - {:awaiting, track} -> + case state.output_tracks[id] do + %{status: :awaiting, track: track} -> Membrane.Logger.warning(""" Dropping packet of track #{inspect(id)}, kind #{inspect(track.kind)} \ that arrived before the SDP answer was sent. """) {[], state} + + %{status: :connected, pad: pad, track: %{kind: kind}, first_packet_received: false} -> + timer_action = + if kind == :video and state.keyframe_interval != nil do + [start_timer: {{:request_keyframe, id}, state.keyframe_interval}] + else + [] + end + + state = + Bunch.Struct.update_in(state, [:output_tracks, id], fn output_track -> + %{output_track | first_packet_received: true} + end) + + {[buffer: {pad, buffer}] ++ timer_action, state} + + %{status: :connected, pad: pad} -> + {[buffer: {pad, buffer}], state} end end @@ -152,8 +197,15 @@ defmodule Membrane.WebRTC.ExWebRTCSource do receive_new_tracks() |> Enum.map_reduce(state.awaiting_outputs, fn track, awaiting_outputs -> case List.keytake(awaiting_outputs, track.kind, 0) do - nil -> {{track.id, {:awaiting, track}}, awaiting_outputs} - {{_kind, pad}, awaiting_outputs} -> {{track.id, {:connected, pad}}, awaiting_outputs} + nil -> + {{track.id, + %{status: :awaiting, track: track, pad: nil, first_packet_received: false}}, + awaiting_outputs} + + {{_kind, pad}, awaiting_outputs} -> + {{track.id, + %{status: :connected, track: track, pad: pad, first_packet_received: false}}, + awaiting_outputs} end end) @@ -165,8 +217,8 @@ defmodule Membrane.WebRTC.ExWebRTCSource do tracks_notification = Enum.flat_map(new_tracks, fn - {_id, {:awaiting, track}} -> [track] - _other -> [] + {_id, %{status: :awaiting, track: track}} -> [track] + _connected_track -> [] end) |> case do [] -> [] @@ -175,8 +227,11 @@ defmodule Membrane.WebRTC.ExWebRTCSource do stream_formats = Enum.flat_map(new_tracks, fn - {_id, {:connected, pad}} -> [stream_format: {pad, %Membrane.RTP{}}] - _other -> [] + {_id, %{status: :connected, pad: pad}} -> + [stream_format: {pad, %Membrane.RTP{}}] + + _other -> + [] end) {tracks_notification ++ stream_formats, state} @@ -212,11 +267,14 @@ defmodule Membrane.WebRTC.ExWebRTCSource do handle_close(ctx, state) end + @impl true + def handle_tick({:request_keyframe, track_id}, _ctx, state) do + :ok = PeerConnection.send_pli(state.pc, track_id) + {[], state} + end + defp maybe_answer(state) do - if Enum.all?(state.output_tracks, fn - {_id, {:connected, _pad}} -> true - _track -> false - end) do + if Enum.all?(state.output_tracks, fn {_id, %{status: status}} -> status == :connected end) do %{pc: pc} = state {:ok, answer} = PeerConnection.create_answer(pc) :ok = PeerConnection.set_local_description(pc, answer) @@ -232,6 +290,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do end end + @spec receive_new_tracks() :: [ExWebRTC.MediaStreamTrack.t()] defp receive_new_tracks(), do: do_receive_new_tracks([]) defp do_receive_new_tracks(acc) do diff --git a/lib/membrane_webrtc/ex_webrtc/utils.ex b/lib/membrane_webrtc/ex_webrtc/utils.ex index a4fc4c0..8052acc 100644 --- a/lib/membrane_webrtc/ex_webrtc/utils.ex +++ b/lib/membrane_webrtc/ex_webrtc/utils.ex @@ -19,7 +19,13 @@ defmodule Membrane.WebRTC.ExWebRTCUtils do %RTPCodecParameters{ payload_type: 96, mime_type: "video/H264", - clock_rate: codec_clock_rate(:h264) + clock_rate: codec_clock_rate(:h264), + sdp_fmtp_line: %ExSDP.Attribute.FMTP{ + pt: 96, + level_asymmetry_allowed: 1, + packetization_mode: 1, + profile_level_id: 0x42E01F + } } ] end diff --git a/lib/membrane_webrtc/source.ex b/lib/membrane_webrtc/source.ex index d0be07a..f9951c8 100644 --- a/lib/membrane_webrtc/source.ex +++ b/lib/membrane_webrtc/source.ex @@ -40,6 +40,14 @@ defmodule Membrane.WebRTC.Source do spec: :vp8 | :h264, default: :vp8 ], + keyframe_interval: [ + spec: Membrane.Time.t() | nil, + default: nil, + description: """ + If set, a keyframe will be requested as often as specified on each video + track. + """ + ], ice_servers: [ spec: [ExWebRTC.PeerConnection.Configuration.ice_server()], default: [%{urls: "stun:stun.l.google.com:19302"}] @@ -68,7 +76,8 @@ defmodule Membrane.WebRTC.Source do child(:webrtc, %ExWebRTCSource{ signaling: signaling, video_codec: opts.video_codec, - ice_servers: opts.ice_servers + ice_servers: opts.ice_servers, + keyframe_interval: opts.keyframe_interval }) state = %{tracks: %{}} |> Map.merge(opts) @@ -93,7 +102,7 @@ defmodule Membrane.WebRTC.Source do spec = get_child(:webrtc) |> via_out(pad_ref, options: [kind: kind]) - |> then(if state.depayload_rtp, do: &child(&1, get_depayloader(kind, state)), else: & &1) + |> then(if state.depayload_rtp, do: &get_depayloader(&1, kind, state), else: & &1) |> bin_output(pad_ref) {[spec: spec], state} @@ -106,24 +115,24 @@ defmodule Membrane.WebRTC.Source do {[notify_parent: {:new_tracks, tracks}], state} end - defp get_depayloader(:audio, _state) do - %Membrane.RTP.DepayloaderBin{ + defp get_depayloader(builder, :audio, _state) do + child(builder, %Membrane.RTP.DepayloaderBin{ depayloader: Membrane.RTP.Opus.Depayloader, clock_rate: ExWebRTCUtils.codec_clock_rate(:opus) - } + }) end - defp get_depayloader(:video, %{video_codec: :vp8}) do - %Membrane.RTP.DepayloaderBin{ + defp get_depayloader(builder, :video, %{video_codec: :vp8}) do + child(builder, %Membrane.RTP.DepayloaderBin{ depayloader: Membrane.RTP.VP8.Depayloader, clock_rate: ExWebRTCUtils.codec_clock_rate(:vp8) - } + }) end - defp get_depayloader(:video, %{video_codec: :h264}) do - %Membrane.RTP.DepayloaderBin{ + defp get_depayloader(builder, :video, %{video_codec: :h264}) do + child(builder, %Membrane.RTP.DepayloaderBin{ depayloader: Membrane.RTP.H264.Depayloader, clock_rate: ExWebRTCUtils.codec_clock_rate(:h264) - } + }) end end diff --git a/mix.exs b/mix.exs index 2ed543a..8c8ec47 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.WebRTC.Plugin.Mixfile do use Mix.Project - @version "0.21.0" + @version "0.22.0" @github_url "https://github.com/membraneframework/membrane_webrtc_plugin" def project do @@ -37,7 +37,7 @@ defmodule Membrane.WebRTC.Plugin.Mixfile do defp deps do [ {:membrane_core, "~> 1.0"}, - {:ex_webrtc, "~> 0.3.0"}, + {:ex_webrtc, "~> 0.4.0"}, {:membrane_rtp_plugin, "~> 0.29.0"}, {:membrane_rtp_h264_plugin, "~> 0.19.0"}, {:membrane_rtp_vp8_plugin, "~> 0.9.1"}, diff --git a/mix.lock b/mix.lock index 2641f68..03d0202 100644 --- a/mix.lock +++ b/mix.lock @@ -16,14 +16,14 @@ "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, "ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"}, "ex_dtls": {:hex, :ex_dtls, "0.15.2", "6c8c0f8eb67525216551bd3e0322ab33c9d851d56ef3e065efab4fd277a8fbb9", [:mix], [{:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "6b852bc926bbdc9c1b9c4ecc6cfc73a89d4e106042802cefea2c1503072a9f2a"}, - "ex_ice": {:hex, :ex_ice, "0.7.1", "3ad14f7281ece304dfee227e332b8a67d93d5857602a8a4300a826c250af136e", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "78e6bc4abb5294dcf0a474d0a91e78a829916291d846a0e255867dc5db8733e7"}, + "ex_ice": {:hex, :ex_ice, "0.8.1", "4d5c911766ce92e13323b632a55d9ab821092f13fc2ebf236dc233c8c1f9a64c", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "8f10134e2eb7e6aebbf8fba0d5fcec56d8f8db3e94c3dde045feb463979c2dda"}, "ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"}, "ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"}, "ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"}, - "ex_sdp": {:hex, :ex_sdp, "0.17.0", "4c50e7814f01f149c0ccf258fba8428f8567dffecf1c416ec3f6aaaac607a161", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "c7fe0625902be2a835b5fe6834a189f7db7639d2625c8e9d8b3564e6d704145f"}, + "ex_sdp": {:hex, :ex_sdp, "1.0.0", "c66cd66d60ad03ff1eecdc6db6a1b8a7b89fec260fcc22e8d6703fc5bbf430a3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "e165dff988b8ab9d93588636aa5f3f683e1f848fc63b78b12382c8fa3dd39216"}, "ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"}, "ex_turn": {:hex, :ex_turn, "0.1.0", "177405aadf3d754567d0d37cf881a83f9cacf8f45314d188633b04c4a9e7c1ec", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "d677737fb7d45274d5dac19fe3c26b9038b6effbc0a6b3e7417bccc76b6d1cd3"}, - "ex_webrtc": {:hex, :ex_webrtc, "0.3.0", "283f5b31d539f65238596793aabcefe32d221618ceb751ae68951712a486cac2", [:mix], [{:crc, "~> 0.10", [hex: :crc, repo: "hexpm", optional: false]}, {:ex_dtls, "~> 0.15.0", [hex: :ex_dtls, repo: "hexpm", optional: false]}, {:ex_ice, "~> 0.7.0", [hex: :ex_ice, repo: "hexpm", optional: false]}, {:ex_libsrtp, "~> 0.7.1", [hex: :ex_libsrtp, repo: "hexpm", optional: false]}, {:ex_rtcp, "~> 0.4.0", [hex: :ex_rtcp, repo: "hexpm", optional: false]}, {:ex_rtp, "~> 0.4.0", [hex: :ex_rtp, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 0.17.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}], "hexpm", "a8a4f38cdcacae170615d6abb83d8c42220b6ac0133d84b900f4994d5eff7143"}, + "ex_webrtc": {:hex, :ex_webrtc, "0.4.1", "834054e38630f91a1c62e5d77ecf3394c0be64e9fd94be45e3021ea53baef6f9", [:mix], [{:crc, "~> 0.10", [hex: :crc, repo: "hexpm", optional: false]}, {:ex_dtls, "~> 0.15.0", [hex: :ex_dtls, repo: "hexpm", optional: false]}, {:ex_ice, "~> 0.8.0", [hex: :ex_ice, repo: "hexpm", optional: false]}, {:ex_libsrtp, "~> 0.7.1", [hex: :ex_libsrtp, repo: "hexpm", optional: false]}, {:ex_rtcp, "~> 0.4.0", [hex: :ex_rtcp, repo: "hexpm", optional: false]}, {:ex_rtp, "~> 0.4.0", [hex: :ex_rtp, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 1.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}], "hexpm", "be81984cf1624ccd1b50fa257558829db967764786e47ecc3da0cacbdcf1448d"}, "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, "finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"}, "heap": {:hex, :heap, "2.0.2", "d98cb178286cfeb5edbcf17785e2d20af73ca57b5a2cf4af584118afbcf917eb", [:mix], [], "hexpm", "ba9ea2fe99eb4bcbd9a8a28eaf71cbcac449ca1d8e71731596aace9028c9d429"}, @@ -62,9 +62,9 @@ "plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"}, "qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"}, "ratio": {:hex, :ratio, "4.0.1", "3044166f2fc6890aa53d3aef0c336f84b2bebb889dc57d5f95cc540daa1912f8", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "c60cbb3ccdff9ffa56e7d6d1654b5c70d9f90f4d753ab3a43a6bf40855b881ce"}, - "req": {:hex, :req, "0.5.4", "e375e4812adf83ffcf787871d7a124d873e983e3b77466e6608b973582f7f837", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "a17998ffe2ef54f79bfdd782ef9f4cbf987d93851e89444cbc466a6a25eee494"}, + "req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"}, "shmex": {:hex, :shmex, "0.5.1", "81dd209093416bf6608e66882cb7e676089307448a1afd4fc906c1f7e5b94cf4", [:mix], [{:bunch_native, "~> 0.5.0", [hex: :bunch_native, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "c29f8286891252f64c4e1dac40b217d960f7d58def597c4e606ff8fbe71ceb80"}, - "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "telemetry_metrics": {:hex, :telemetry_metrics, "0.6.2", "2caabe9344ec17eafe5403304771c3539f3b6e2f7fb6a6f602558c825d0d0bfb", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9b43db0dc33863930b9ef9d27137e78974756f5f198cae18409970ed6fa5b561"}, "thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"}, "unifex": {:hex, :unifex, "1.2.0", "90d1ec5e6d788350e07e474f7bd8b0ee866d6606beb9ca4e20dbb26328712a84", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}], "hexpm", "7a8395aabc3ba6cff04bbe5b995de7f899a38eb57f189e49927d6b8b6ccb6883"}, diff --git a/test/membrane_webrtc/integration_test.exs b/test/membrane_webrtc/integration_test.exs index 030671d..fe62950 100644 --- a/test/membrane_webrtc/integration_test.exs +++ b/test/membrane_webrtc/integration_test.exs @@ -11,6 +11,61 @@ defmodule Membrane.WebRTC.IntegrationTest do alias Membrane.WebRTC alias Membrane.WebRTC.SignalingChannel + defmodule KeyframeTestSource do + use Membrane.Source + + def_output_pad :output, flow_control: :manual, accepted_format: _any + + def_options stream_format: [spec: Membrane.StreamFormat.t()] + + @impl true + def handle_playing(_ctx, state) do + buffers = + Bunch.Enum.repeated( + %Membrane.Buffer{payload: "mock" <> <<0::1000*8>>, pts: 0, dts: 0}, + 10 + ) + + {[stream_format: {:output, state.stream_format}, buffer: {:output, buffers}], state} + end + + @impl true + def handle_demand(:output, _size, _unit, _ctx, state) do + {[], state} + end + + @impl true + def handle_event(:output, %Membrane.KeyframeRequestEvent{}, _ctx, state) do + {[notify_parent: :keyframe_requested], state} + end + + @impl true + def handle_event(:output, _event, _ctx, state) do + {[], state} + end + end + + defmodule KeyframeTestSink do + use Membrane.Sink + + def_input_pad :input, accepted_format: _any + + @impl true + def handle_playing(_ctx, state) do + {[notify_parent: :playing], state} + end + + @impl true + def handle_buffer(:input, buffer, _ctx, state) do + {[notify_parent: {:buffer, buffer}], state} + end + + @impl true + def handle_parent_notification(:request_keyframe, _ctx, state) do + {[event: {:input, %Membrane.KeyframeRequestEvent{}}], state} + end + end + defmodule Utils do import ExUnit.Assertions @@ -73,6 +128,59 @@ defmodule Membrane.WebRTC.IntegrationTest do ] ) end + + def run_keyframe_testing_pipelines(opts \\ []) do + signaling = SignalingChannel.new() + + send_pipeline = Testing.Pipeline.start_link_supervised!() + + video_src = %KeyframeTestSource{ + stream_format: %Membrane.RemoteStream{content_format: Membrane.VP8, type: :packetized} + } + + audio_src = %KeyframeTestSource{stream_format: %Membrane.Opus{channels: 2}} + + Testing.Pipeline.execute_actions(send_pipeline, + spec: [ + child(:vid1, video_src) + |> via_in(:input, options: [kind: :video]) + |> get_child(:webrtc), + child(:vid2, video_src) + |> via_in(:input, options: [kind: :video]) + |> get_child(:webrtc), + child(:audio, audio_src) + |> via_in(:input, options: [kind: :audio]) + |> get_child(:webrtc), + child(:webrtc, %WebRTC.Sink{signaling: signaling, tracks: [:audio, :video, :video]}) + ] + ) + + receive_pipeline = Testing.Pipeline.start_link_supervised!() + + Testing.Pipeline.execute_actions(receive_pipeline, + spec: [ + child(:webrtc, %WebRTC.Source{ + signaling: signaling, + keyframe_interval: opts[:keyframe_interval] + }), + get_child(:webrtc) + |> via_out(:output, options: [kind: :video]) + |> child(:vid1, KeyframeTestSink), + get_child(:webrtc) + |> via_out(:output, options: [kind: :video]) + |> child(:vid2, KeyframeTestSink), + get_child(:webrtc) + |> via_out(:output, options: [kind: :audio]) + |> child(:audio, KeyframeTestSink) + ] + ) + + assert_pipeline_notified(receive_pipeline, :vid1, {:buffer, _buffer}) + assert_pipeline_notified(receive_pipeline, :vid2, {:buffer, _buffer}) + assert_pipeline_notified(receive_pipeline, :audio, {:buffer, _buffer}) + + {send_pipeline, receive_pipeline} + end end defmodule SendRecv do @@ -177,4 +285,50 @@ defmodule Membrane.WebRTC.IntegrationTest do assert File.read!("#{tmp_dir}/out_video2") == File.read!("test/fixtures/ref_video") end end + + defmodule KeyframeRequestEvents do + use ExUnit.Case, async: true + + import Utils + + test "keyframe request events" do + {send_pipeline, receive_pipeline} = run_keyframe_testing_pipelines() + + Testing.Pipeline.notify_child(receive_pipeline, :vid1, :request_keyframe) + Testing.Pipeline.notify_child(receive_pipeline, :vid2, :request_keyframe) + Testing.Pipeline.notify_child(receive_pipeline, :audio, :request_keyframe) + + assert_pipeline_notified(send_pipeline, :vid1, :keyframe_requested) + assert_pipeline_notified(send_pipeline, :vid2, :keyframe_requested) + refute_pipeline_notified(send_pipeline, :vid1, :keyframe_requested) + refute_pipeline_notified(send_pipeline, :vid2, :keyframe_requested) + refute_pipeline_notified(send_pipeline, :audio, :keyframe_requested) + + Testing.Pipeline.terminate(send_pipeline) + Testing.Pipeline.terminate(receive_pipeline) + end + end + + defmodule KeyframeInterval do + use ExUnit.Case, async: true + + import Utils + + test "keyframe request events" do + {send_pipeline, receive_pipeline} = + run_keyframe_testing_pipelines(keyframe_interval: Membrane.Time.seconds(1)) + + Enum.each(1..3, fn _i -> + assert_pipeline_notified(send_pipeline, :vid1, :keyframe_requested) + assert_pipeline_notified(send_pipeline, :vid2, :keyframe_requested) + refute_pipeline_notified(send_pipeline, :vid1, :keyframe_requested, 800) + refute_pipeline_notified(send_pipeline, :vid2, :keyframe_requested, 0) + end) + + refute_pipeline_notified(send_pipeline, :audio, :keyframe_requested) + + Testing.Pipeline.terminate(send_pipeline) + Testing.Pipeline.terminate(receive_pipeline) + end + end end