diff --git a/README.md b/README.md index 011abb2..04d8f95 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_webrtc_plugin` to your list of ```elixir def deps do [ - {:membrane_webrtc_plugin, "~> 0.22.0"} + {:membrane_webrtc_plugin, "~> 0.22.1"} ] end ``` diff --git a/lib/membrane_webrtc/ex_webrtc/sink.ex b/lib/membrane_webrtc/ex_webrtc/sink.ex index 1487316..6103e9b 100644 --- a/lib/membrane_webrtc/ex_webrtc/sink.ex +++ b/lib/membrane_webrtc/ex_webrtc/sink.ex @@ -188,10 +188,19 @@ defmodule Membrane.WebRTC.ExWebRTCSink do %{negotiating_tracks: negotiating_tracks, negotiated_tracks: negotiated_tracks} = state + video_codecs = get_negotiated_video_codecs(sdp) + to_notify = - negotiating_tracks |> Enum.filter(& &1.notify) |> Enum.map(&Map.take(&1, [:id, :kind])) + negotiating_tracks + |> Enum.filter(& &1.notify) + |> Enum.map(&Map.take(&1, [:id, :kind])) + |> Enum.map(fn + %{kind: :audio} = track -> Map.put(track, :codec, :opus) + %{kind: :video} = track -> Map.put(track, :codec, video_codecs) + end) - actions = if to_notify == [], do: [], else: [notify_parent: {:new_tracks, to_notify}] + actions = + if to_notify == [], do: [], else: [notify_parent: {:new_tracks, to_notify}] negotiated_tracks = negotiated_tracks ++ negotiating_tracks @@ -265,4 +274,20 @@ defmodule Membrane.WebRTC.ExWebRTCSink do seq_num = rem(params.seq_num + 1, @max_rtp_seq_num + 1) put_in(state.input_tracks[pad], {id, %{params | seq_num: seq_num}}) end + + defp get_negotiated_video_codecs(sdp_answer) do + ex_sdp = ExSDP.parse!(sdp_answer.sdp) + + ex_sdp.media + |> Enum.flat_map(fn + %{type: :video, attributes: attributes} -> attributes + _media -> [] + end) + |> Enum.flat_map(fn + %ExSDP.Attribute.RTPMapping{encoding: "H264"} -> [:h264] + %ExSDP.Attribute.RTPMapping{encoding: "VP8"} -> [:vp8] + _attribute -> [] + end) + |> Enum.uniq() + end end diff --git a/lib/membrane_webrtc/ex_webrtc/utils.ex b/lib/membrane_webrtc/ex_webrtc/utils.ex index 8052acc..e140b07 100644 --- a/lib/membrane_webrtc/ex_webrtc/utils.ex +++ b/lib/membrane_webrtc/ex_webrtc/utils.ex @@ -3,7 +3,10 @@ defmodule Membrane.WebRTC.ExWebRTCUtils do alias ExWebRTC.RTPCodecParameters - @spec codec_params(:opus | :h264 | :vp8) :: [RTPCodecParameters.t()] + @type codec :: :opus | :h264 | :vp8 + @type codec_or_codecs :: codec() | [codec()] + + @spec codec_params(codec_or_codecs()) :: [RTPCodecParameters.t()] def codec_params(:opus), do: [ %RTPCodecParameters{ @@ -40,8 +43,22 @@ defmodule Membrane.WebRTC.ExWebRTCUtils do ] end - @spec codec_clock_rate(:opus | :h264 | :vp8) :: pos_integer() + def codec_params(codecs) when is_list(codecs) do + codecs |> Enum.flat_map(&codec_params/1) + end + + @spec codec_clock_rate(codec_or_codecs()) :: pos_integer() def codec_clock_rate(:opus), do: 48_000 def codec_clock_rate(:vp8), do: 90_000 def codec_clock_rate(:h264), do: 90_000 + + def codec_clock_rate(codecs) when is_list(codecs) do + cond do + codecs == [:opus] -> + 48_000 + + codecs != [] and Enum.all?(codecs, &(&1 in [:vp8, :h264])) -> + 90_000 + end + end end diff --git a/lib/membrane_webrtc/sink.ex b/lib/membrane_webrtc/sink.ex index 84c535e..83e902d 100644 --- a/lib/membrane_webrtc/sink.ex +++ b/lib/membrane_webrtc/sink.ex @@ -17,6 +17,11 @@ defmodule Membrane.WebRTC.Sink do """ use Membrane.Bin + alias __MODULE__.ForwardingFilter + + alias Membrane.H264 + alias Membrane.RemoteStream + alias Membrane.VP8 alias Membrane.WebRTC.{ExWebRTCSink, SignalingChannel, SimpleWebSocketServer} @typedoc """ @@ -53,8 +58,18 @@ defmodule Membrane.WebRTC.Sink do """ ], video_codec: [ - spec: :vp8 | :h264, - default: :vp8 + spec: :vp8 | :h264 | [:vp8 | :h264], + default: [:vp8, :h264], + description: """ + Video codecs, that #{inspect(__MODULE__)} will try to negotiatie in SDP + message exchange. Even if `[:vp8, :h264]` is passed to this option, there + is a chance, that one of these codecs won't be approved by the second + WebRTC peer. + + After SDP messages exchange, #{inspect(__MODULE__)} will send a parent + notification `{:new_tracks, tracks}`, where every track in `tracks` + contains info about supported codecs. + """ ], ice_servers: [ spec: [ExWebRTC.PeerConnection.Configuration.ice_server()], @@ -104,13 +119,46 @@ defmodule Membrane.WebRTC.Sink do end @impl true - def handle_pad_added(Pad.ref(:input, _pid) = pad_ref, ctx, state) do - %{kind: kind} = ctx.pad_options + def handle_pad_added(Pad.ref(:input, pid) = pad_ref, %{pad_options: %{kind: kind}}, state) do + spec = + cond do + not state.payload_rtp -> + bin_input(pad_ref) + |> via_in(pad_ref, options: [kind: kind]) + |> get_child(:webrtc) + + kind == :audio -> + bin_input(pad_ref) + |> child({:rtp_opus_payloader, pid}, Membrane.RTP.Opus.Payloader) + |> via_in(pad_ref, options: [kind: :audio]) + |> get_child(:webrtc) + + kind == :video -> + bin_input(pad_ref) + |> child({:forwarding_filter, pad_ref}, ForwardingFilter) + end + + {[spec: spec], state} + end + + @impl true + def handle_child_notification( + {:stream_format, stream_format}, + {:forwarding_filter, pad_ref}, + _ctx, + state + ) do + payoader = + case stream_format do + %H264{} -> %Membrane.RTP.H264.Payloader{max_payload_size: 1000} + %VP8{} -> Membrane.RTP.VP8.Payloader + %RemoteStream{content_format: VP8} -> Membrane.RTP.VP8.Payloader + end spec = - bin_input(pad_ref) - |> then(if state.payload_rtp, do: &child(&1, get_payloader(kind, state)), else: & &1) - |> via_in(pad_ref, options: [kind: kind]) + get_child({:forwarding_filter, pad_ref}) + |> child({:rtp_payloader, pad_ref}, payoader) + |> via_in(pad_ref, options: [kind: :video]) |> get_child(:webrtc) {[spec: spec], state} @@ -126,6 +174,11 @@ defmodule Membrane.WebRTC.Sink do {[notify_parent: {:new_tracks, tracks}], state} end + @impl true + def handle_child_notification({:negotiated_video_codecs, codecs}, :webrtc, _ctx, state) do + {[notify_parent: {:negotiated_video_codecs, codecs}], state} + end + @impl true def handle_parent_notification({:add_tracks, tracks}, _ctx, state) do {[notify_child: {:webrtc, {:add_tracks, tracks}}], state} @@ -140,11 +193,4 @@ defmodule Membrane.WebRTC.Sink do def handle_element_end_of_stream(_name, _pad, _ctx, state) do {[], state} end - - defp get_payloader(:audio, _state), do: Membrane.RTP.Opus.Payloader - - defp get_payloader(:video, %{video_codec: :h264}), - do: %Membrane.RTP.H264.Payloader{max_payload_size: 1000} - - defp get_payloader(:video, %{video_codec: :vp8}), do: Membrane.RTP.VP8.Payloader end diff --git a/lib/membrane_webrtc/sink/forwarding_filter.ex b/lib/membrane_webrtc/sink/forwarding_filter.ex new file mode 100644 index 0000000..463e4f9 --- /dev/null +++ b/lib/membrane_webrtc/sink/forwarding_filter.ex @@ -0,0 +1,117 @@ +defmodule Membrane.WebRTC.Sink.ForwardingFilter do + @moduledoc false + use Membrane.Filter + + alias Membrane.TimestampQueue + + def_input_pad :input, + accepted_format: _any, + availability: :on_request + + def_output_pad :output, + accepted_format: _any, + availability: :on_request + + defguardp is_input_linked(state) when state.input_pad_ref != nil + defguardp is_output_linked(state) when state.output_pad_ref != nil + + @impl true + def handle_init(_ctx, _opts) do + state = %{queue: TimestampQueue.new(), output_pad_ref: nil, input_pad_ref: nil} + {[], state} + end + + @impl true + def handle_playing(ctx, state), do: maybe_flush_queue(ctx, state) + + @impl true + def handle_pad_added(Pad.ref(direction, _id) = pad_ref, ctx, state) do + same_direction_pads_number = + ctx.pads + |> Enum.count(fn {_pad_ref, pad_data} -> pad_data.direction == direction end) + + if same_direction_pads_number > 1 do + raise """ + #{inspect(__MODULE__)} can have only one #{inspect(direction)} pad, but it has \ + #{same_direction_pads_number} + """ + end + + state = + case direction do + :input -> %{state | input_pad_ref: pad_ref} + :output -> %{state | output_pad_ref: pad_ref} + end + + maybe_flush_queue(ctx, state) + end + + @impl true + def handle_stream_format(_input_pad_ref, stream_format, _ctx, state) + when is_output_linked(state) do + {[ + stream_format: {state.output_pad_ref, stream_format}, + notify_parent: {:stream_format, stream_format} + ], state} + end + + @impl true + def handle_stream_format(input_pad_ref, stream_format, _ctx, state) do + queue = TimestampQueue.push_stream_format(state.queue, input_pad_ref, stream_format) + {[notify_parent: {:stream_format, stream_format}], %{state | queue: queue}} + end + + @impl true + def handle_buffer(_input_pad_ref, buffer, _ctx, state) when is_output_linked(state) do + {[buffer: {state.output_pad_ref, buffer}], state} + end + + @impl true + def handle_buffer(input_pad_ref, buffer, _ctx, state) do + {_suggested_actions, queue} = TimestampQueue.push_buffer(state.queue, input_pad_ref, buffer) + {[], %{state | queue: queue}} + end + + @impl true + def handle_event(Pad.ref(:input, _id), event, _ctx, state) when is_output_linked(state) do + {[forward: event], state} + end + + @impl true + def handle_event(Pad.ref(:output, _id), event, _ctx, state) when is_input_linked(state) do + {[forward: event], state} + end + + @impl true + def handle_event(pad_ref, event, _ctx, state) do + queue = TimestampQueue.push_event(state.queue, pad_ref, event) + {[], %{state | queue: queue}} + end + + @impl true + def handle_end_of_stream(_input_pad_ref, _ctx, state) when is_output_linked(state) do + {[end_of_stream: state.output_pad_ref], state} + end + + @impl true + def handle_end_of_stream(input_pad_ref, _ctx, state) do + queue = TimestampQueue.push_end_of_stream(state.queue, input_pad_ref) + {[], %{state | queue: queue}} + end + + defp maybe_flush_queue(ctx, state) + when ctx.playback == :playing and is_input_linked(state) and is_output_linked(state) do + {_suggested_actions, items, queue} = TimestampQueue.flush_and_close(state.queue) + + actions = + Enum.map(items, fn + {Pad.ref(:input, _id), {item_type, item}} -> {item_type, {state.output_pad_ref, item}} + {Pad.ref(:input, _id), :end_of_stream} -> {:end_of_stream, state.output_pad_ref} + {Pad.ref(:output, _id), {:event, item}} -> {:event, {state.input_pad_ref, item}} + end) + + {actions, %{state | queue: queue}} + end + + defp maybe_flush_queue(_ctx, state), do: {[], state} +end diff --git a/mix.exs b/mix.exs index 8c8ec47..3f91527 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.WebRTC.Plugin.Mixfile do use Mix.Project - @version "0.22.0" + @version "0.22.1" @github_url "https://github.com/membraneframework/membrane_webrtc_plugin" def project do @@ -42,6 +42,7 @@ defmodule Membrane.WebRTC.Plugin.Mixfile do {:membrane_rtp_h264_plugin, "~> 0.19.0"}, {:membrane_rtp_vp8_plugin, "~> 0.9.1"}, {:membrane_rtp_opus_plugin, "~> 0.9.0"}, + {:membrane_timestamp_queue, "~> 0.2.0"}, {:bandit, "~> 1.2"}, {:websock_adapter, "~> 0.5.0"}, {:membrane_matroska_plugin, "~> 0.5.0", only: :test}, diff --git a/mix.lock b/mix.lock index 03d0202..ab7b570 100644 --- a/mix.lock +++ b/mix.lock @@ -50,6 +50,7 @@ "membrane_rtp_plugin": {:hex, :membrane_rtp_plugin, "0.29.0", "0277310eb599b8e6de9e0b864807f23b3b245865e39a28f0cbab695d1f2c157e", [:mix], [{:bimap, "~> 1.2", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_libsrtp, "~> 0.6.0 or ~> 0.7.0", [hex: :ex_libsrtp, repo: "hexpm", optional: true]}, {:heap, "~> 2.0.2", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_funnel_plugin, "~> 0.9.0", [hex: :membrane_funnel_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_telemetry_metrics, "~> 0.1.0", [hex: :membrane_telemetry_metrics, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}], "hexpm", "1b3fd808114e06332b6a4e000238998a9188d1ef625c414ca3239aee70f0775d"}, "membrane_rtp_vp8_plugin": {:hex, :membrane_rtp_vp8_plugin, "0.9.1", "9e8a74d764730a23382ba862a238963c9639b4c6963238caeb6fe2449a66add8", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_vp8_format, "~> 0.4.0", [hex: :membrane_vp8_format, repo: "hexpm", optional: false]}], "hexpm", "704856eb2734bb6ea5cc47242c241de45debb5724a81cffb344bacda9867fe98"}, "membrane_telemetry_metrics": {:hex, :membrane_telemetry_metrics, "0.1.0", "cb93d28356b436b0597736c3e4153738d82d2a14ff547f831df7e9051e54fc06", [:mix], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6.1", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "aba28dc8311f70ced95d984509be930fac55857d2d18bffcf768815e627be3f0"}, + "membrane_timestamp_queue": {:hex, :membrane_timestamp_queue, "0.2.2", "1c831b2273d018a6548654aa9f7fa7c4b683f71d96ffe164934ef55f9d11f693", [:mix], [{:heap, "~> 2.0", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c830e760baaced0988421671cd2c83c7cda8d1bd2b61fd05332711675d1204f"}, "membrane_vp8_format": {:hex, :membrane_vp8_format, "0.4.0", "6c29ec67479edfbab27b11266dc92f18f3baf4421262c5c31af348c33e5b92c7", [:mix], [], "hexpm", "8bb005ede61db8fcb3535a883f32168b251c2dfd1109197c8c3b39ce28ed08e2"}, "membrane_vp9_format": {:hex, :membrane_vp9_format, "0.4.0", "d50a1711b846974bcd32721180e8b714adeaabfd6ef09400248ad8232c2b6d79", [:mix], [], "hexpm", "3817f30280ea2450054300cdda601b4f5c5ca62bae36e183b45386cc6b9883d5"}, "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"}, diff --git a/test/membrane_webrtc/integration_test.exs b/test/membrane_webrtc/integration_test.exs index fe62950..3ce28e5 100644 --- a/test/membrane_webrtc/integration_test.exs +++ b/test/membrane_webrtc/integration_test.exs @@ -192,7 +192,11 @@ defmodule Membrane.WebRTC.IntegrationTest do test "send and receive a file", %{tmp_dir: tmp_dir} do signaling = SignalingChannel.new() send_pipeline = Testing.Pipeline.start_link_supervised!() - prepare_input(send_pipeline, webrtc: %WebRTC.Sink{signaling: signaling}) + + prepare_input(send_pipeline, + webrtc: %WebRTC.Sink{signaling: signaling, video_codec: [:vp8, :h264]} + ) + receive_pipeline = Testing.Pipeline.start_link_supervised!() prepare_output(receive_pipeline, tmp_dir, webrtc: %WebRTC.Source{signaling: signaling})