diff --git a/README.md b/README.md index 7195d14e..b9c1446b 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_rtp_plugin` to your list of dep ```elixir def deps do [ - {:membrane_rtp_plugin, "~> 0.29.1"}, + {:membrane_rtp_plugin, "~> 0.30.0"}, {:ex_libsrtp, ">= 0.0.0"} # required only if SRTP/SRTCP support is needed ] end diff --git a/lib/membrane/rtp/demuxer.ex b/lib/membrane/rtp/demuxer.ex index 4a299cba..54e47d96 100644 --- a/lib/membrane/rtp/demuxer.ex +++ b/lib/membrane/rtp/demuxer.ex @@ -1,6 +1,7 @@ defmodule Membrane.RTP.Demuxer do @moduledoc """ - Element capable of receiving a raw RTP stream and demuxing it into individual parsed streams based on packet ssrcs. + Element capable of receiving a raw RTP stream and demuxing it into individual parsed streams based on packet ssrcs. + Output pads can be linked either before or after a corresponding stream has been recognized. In the first case the demuxer will start sending buffers on the pad once a stream with payload type or SSRC matching the identification provided via the pad's options is recognized. In the second case, whenever a new stream is recognized and no waiting pad has matching identification, a diff --git a/lib/membrane/rtp/muxer.ex b/lib/membrane/rtp/muxer.ex new file mode 100644 index 00000000..6b30548d --- /dev/null +++ b/lib/membrane/rtp/muxer.ex @@ -0,0 +1,191 @@ +defmodule Membrane.RTP.Muxer do + @moduledoc """ + Element that combines multiple streams into a single RTP stream. + + Each new input stream is assigned a unique SSRC that the packets + transporting this stream will have. When a new pad is conneted, it's required to pass it options sufficient to resolve what + `payload_type` and `clock_rate` should be assumed. Timestamps are calculated based on assumed `clock_rate`. + """ + use Membrane.Filter + + require Membrane.Pad + + alias Membrane.{Pad, RemoteStream, RTP} + + @max_ssrc Bitwise.bsl(1, 32) - 1 + @max_sequence_number Bitwise.bsl(1, 16) - 1 + @max_timestamp Bitwise.bsl(1, 32) - 1 + + def_input_pad :input, + accepted_format: RTP, + availability: :on_request, + options: [ + ssrc: [ + spec: RTP.ssrc() | :random, + default: :random, + description: """ + SSRC that this stream will be assigned. If not present, a random free value will be assigned. + """ + ], + payload_type: [ + spec: RTP.payload_type() | nil, + default: nil, + description: """ + Payload type of the stream. If not provided, determined from `:encoding`. + """ + ], + encoding: [ + spec: RTP.encoding_name() | nil, + default: nil, + description: """ + Encoding name of the stream. Used for determining payload_type, it it wasn't provided. + """ + ], + clock_rate: [ + spec: non_neg_integer() | nil, + default: nil, + description: """ + Clock rate to use. If not provided, determined from `:payload_type`. + """ + ] + ] + + def_output_pad :output, accepted_format: %RemoteStream{type: :packetized, content_format: RTP} + + defmodule State do + @moduledoc false + defmodule StreamState do + @moduledoc false + alias Membrane.RTP + + @type t :: %__MODULE__{ + ssrc: RTP.ssrc(), + sequence_number: ExRTP.Packet.uint16(), + initial_timestamp: ExRTP.Packet.uint32(), + clock_rate: RTP.clock_rate(), + payload_type: RTP.payload_type(), + end_of_stream: boolean() + } + + @enforce_keys [:ssrc, :sequence_number, :initial_timestamp, :clock_rate, :payload_type] + + defstruct @enforce_keys ++ [end_of_stream: false] + end + + @type t :: %__MODULE__{ + stream_states: %{Pad.ref() => StreamState.t()} + } + + @enforce_keys [] + defstruct @enforce_keys ++ [stream_states: %{}] + end + + @impl true + def handle_init(_ctx, _opts) do + {[], %State{}} + end + + @impl true + def handle_pad_added(Pad.ref(:input, _ref) = pad_ref, ctx, state) do + pad_options = ctx.pads[pad_ref].options + + ssrc = get_stream_ssrc(pad_options, state) + + %{payload_type: payload_type, clock_rate: clock_rate} = + RTP.PayloadFormat.resolve( + encoding_name: pad_options.encoding, + payload_type: pad_options.payload_type, + clock_rate: pad_options.clock_rate + ) + + if payload_type == nil, do: raise("Could not resolve payload type") + if clock_rate == nil, do: raise("Could not resolve clock rate") + + new_stream_state = %State.StreamState{ + ssrc: ssrc, + sequence_number: Enum.random(0..@max_sequence_number), + initial_timestamp: Enum.random(0..@max_timestamp), + clock_rate: clock_rate, + payload_type: payload_type + } + + state = put_in(state.stream_states[pad_ref], new_stream_state) + + {[], state} + end + + @impl true + def handle_playing(_ctx, state) do + {[stream_format: {:output, %RemoteStream{type: :packetized, content_format: RTP}}], state} + end + + @impl true + def handle_stream_format(_pad, _stream_format, _ctx, state) do + {[], state} + end + + @impl true + def handle_buffer(Pad.ref(:input, _ref) = pad_ref, buffer, _ctx, state) do + {rtp_metadata, metadata} = Map.pop(buffer.metadata, :rtp, %{}) + stream_state = state.stream_states[pad_ref] + + rtp_offset = + buffer.pts + |> Membrane.Time.as_seconds() + |> Numbers.mult(stream_state.clock_rate) + |> Ratio.trunc() + + timestamp = rem(stream_state.initial_timestamp + rtp_offset, @max_timestamp + 1) + sequence_number = rem(stream_state.sequence_number + 1, @max_sequence_number + 1) + + state = put_in(state.stream_states[pad_ref].sequence_number, sequence_number) + + packet = + ExRTP.Packet.new(buffer.payload, + payload_type: stream_state.payload_type, + sequence_number: sequence_number, + timestamp: timestamp, + ssrc: stream_state.ssrc, + csrc: Map.get(rtp_metadata, :csrcs, []), + marker: Map.get(rtp_metadata, :marker, false) + ) + + raw_packet = ExRTP.Packet.encode(packet) + + buffer = %Membrane.Buffer{ + buffer + | payload: raw_packet, + metadata: Map.put(metadata, :rtp, %{packet | payload: <<>>}) + } + + {[buffer: {:output, buffer}], state} + end + + @impl true + def handle_end_of_stream(Pad.ref(:input, _ref) = pad_ref, _ctx, state) do + state = put_in(state.stream_states[pad_ref].end_of_stream, true) + + if Enum.all?(Enum.map(state.stream_states, fn {_pad_ref, %{end_of_stream: eos}} -> eos end)) do + {[end_of_stream: :output], state} + else + {[], state} + end + end + + defp get_stream_ssrc(pad_options, state) do + assigned_ssrcs = Enum.map(state.stream_states, fn {_pad_ref, %{ssrc: ssrc}} -> ssrc end) + + case pad_options.ssrc do + :random -> + Stream.repeatedly(fn -> Enum.random(0..@max_ssrc) end) + |> Enum.find(&(&1 not in assigned_ssrcs)) + + provided_ssrc -> + if provided_ssrc in assigned_ssrcs do + raise("SSRC #{provided_ssrc} already assigned to a different stream") + end + + provided_ssrc + end + end +end diff --git a/mix.exs b/mix.exs index d9fcdd32..190a3bc2 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.RTP.Plugin.MixProject do use Mix.Project - @version "0.29.1" + @version "0.30.0" @github_url "https://github.com/membraneframework/membrane_rtp_plugin" def project do @@ -54,6 +54,8 @@ defmodule Membrane.RTP.Plugin.MixProject do {:membrane_rtp_mpegaudio_plugin, "~> 0.14.1", only: :test}, {:membrane_h264_ffmpeg_plugin, "~> 0.31.0", only: :test}, {:membrane_h26x_plugin, "~> 0.10.2", only: :test}, + {:membrane_aac_plugin, "~> 0.19.0", only: :test}, + {:membrane_mp4_plugin, "~> 0.35.0", only: :test}, {:membrane_pcap_plugin, github: "membraneframework/membrane_pcap_plugin", tag: "v0.9.0", only: :test}, {:membrane_hackney_plugin, "~> 0.11.0", only: :test}, diff --git a/mix.lock b/mix.lock index 7e6c187d..f9837f48 100644 --- a/mix.lock +++ b/mix.lock @@ -23,19 +23,26 @@ "hpax": {:hex, :hpax, "1.0.1", "c857057f89e8bd71d97d9042e009df2a42705d6d690d54eca84c8b29af0787b0", [:mix], [], "hexpm", "4e2d5a4f76ae1e3048f35ae7adb1641c36265510a2d4638157fbcb53dda38445"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, + "logger_backends": {:hex, :logger_backends, "1.0.0", "09c4fad6202e08cb0fbd37f328282f16539aca380f512523ce9472b28edc6bdf", [:mix], [], "hexpm", "1faceb3e7ec3ef66a8f5746c5afd020e63996df6fd4eb8cdb789e5665ae6c9ce"}, "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, "makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"}, "makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"}, "membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"}, + "membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.19.0", "58a15efaaa4f2cc91b968464cfd269244e035efdd983aac2e3ddeb176fcf0585", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "eb7e786e650608ee205f4eebff4c1df3677e545acf09802458f77f64f9942fe9"}, + "membrane_cmaf_format": {:hex, :membrane_cmaf_format, "0.7.1", "9ea858faefdcb181cdfa8001be827c35c5f854e9809ad57d7062cff1f0f703fd", [:mix], [], "hexpm", "3c7b4ed2a986e27f6f336d2f19e9442cb31d93b3142fc024c019572faca54a73"}, "membrane_common_c": {:hex, :membrane_common_c, "0.16.0", "caf3f29d2f5a1d32d8c2c122866110775866db2726e4272be58e66dfdf4bce40", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "a3c7e91de1ce1f8b23b9823188a5d13654d317235ea0ca781c05353ed3be9b1c"}, "membrane_core": {:hex, :membrane_core, "1.1.2", "3ca206893e1d3739a24d5092d21c06fcb4db326733a1798f9788fc53abb74829", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a989fd7e0516a7e66f5fb63950b1027315b7f8c8d82d8d685e178b0fb780901b"}, + "membrane_file_plugin": {:hex, :membrane_file_plugin, "0.17.2", "650e134c2345d946f930082fac8bac9f5aba785a7817d38a9a9da41ffc56fa92", [:mix], [{:logger_backends, "~> 1.0", [hex: :logger_backends, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "df50c6040004cd7b901cf057bd7e99c875bbbd6ae574efc93b2c753c96f43b9d"}, "membrane_funnel_plugin": {:hex, :membrane_funnel_plugin, "0.9.1", "9e108f4ef9d905ebff2da3ba5e58a5b756b58812f4fa68bd576add68fda310a0", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "39fdef1bf29eac949f65a37ea941f997c22ed042c55af044d27a781b63e82f6b"}, "membrane_h264_ffmpeg_plugin": {:hex, :membrane_h264_ffmpeg_plugin, "0.31.8", "d347449f4ef28ed67ed1d12573a1d60cc665885737dcbf5ffb246bf95e9648e1", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.3.0", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "723b5da92cb39d3d6395a4292842b9fbd60f6d8e796b45320226c6c13a431478"}, "membrane_h264_format": {:hex, :membrane_h264_format, "0.6.1", "44836cd9de0abe989b146df1e114507787efc0cf0da2368f17a10c47b4e0738c", [:mix], [], "hexpm", "4b79be56465a876d2eac2c3af99e115374bbdc03eb1dea4f696ee9a8033cd4b0"}, "membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"}, "membrane_h26x_plugin": {:hex, :membrane_h26x_plugin, "0.10.2", "caf2790d8c107df35f8d456b45f4e09fb9c56ce6c7669a3a03f7d59972e6ed82", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}], "hexpm", "becf1ac4a589adecd850137ccd61a33058f686083a514a7e39fcd721bcf9fb2e"}, "membrane_hackney_plugin": {:hex, :membrane_hackney_plugin, "0.11.0", "54b368333a23394e7cac2f4d6b701bf8c5ee6614670a31f4ebe009b5e691a5c1", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3", [hex: :mockery, repo: "hexpm", optional: false]}], "hexpm", "2b28fd1be3c889d5824d7d985598386c7673828c88f49a91221df3626af8a998"}, + "membrane_mp4_format": {:hex, :membrane_mp4_format, "0.8.0", "8c6e7d68829228117d333b4fbb030e7be829aab49dd8cb047fdc664db1812e6a", [:mix], [], "hexpm", "148dea678a1f82ccfd44dbde6f936d2f21255f496cb45a22cc6eec427f025522"}, + "membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.35.2", "cbedb5272ef1c8f7d9cd3c44f820a90306469b1dc84b8db30ff55bb6195b7cb2", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.7.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.17.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.8.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_timestamp_queue, "~> 0.2.1", [hex: :membrane_timestamp_queue, repo: "hexpm", optional: false]}], "hexpm", "8afd4e7779a742dd56c23f1f23053933d1b0b34d397ad368a2f56f995edb2fe0"}, "membrane_mpegaudio_format": {:hex, :membrane_mpegaudio_format, "0.3.0", "d4fee77fad9f953171c52acd6d53b6646cfc7fbb827c63caa7c6a1efeb86450a", [:mix], [], "hexpm", "dec903efd0086133402b44515d04301790832b4f39995747b0e712c8f966d50d"}, + "membrane_opus_format": {:hex, :membrane_opus_format, "0.3.0", "3804d9916058b7cfa2baa0131a644d8186198d64f52d592ae09e0942513cb4c2", [:mix], [], "hexpm", "8fc89c97be50de23ded15f2050fe603dcce732566fe6fdd15a2de01cb6b81afe"}, "membrane_pcap_plugin": {:git, "https://github.com/membraneframework/membrane_pcap_plugin.git", "5614261a417a365dc9b0dd72490bf9a31781a094", [tag: "v0.9.0"]}, "membrane_precompiled_dependency_provider": {:hex, :membrane_precompiled_dependency_provider, "0.1.2", "8af73b7dc15ba55c9f5fbfc0453d4a8edfb007ade54b56c37d626be0d1189aba", [:mix], [{:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "7fe3e07361510445a29bee95336adde667c4162b76b7f4c8af3aeb3415292023"}, "membrane_raw_video_format": {:hex, :membrane_raw_video_format, "0.3.0", "ba10f475e0814a6fe79602a74536b796047577c7ef5b0e33def27cd344229699", [:mix], [], "hexpm", "2f08760061c8a5386ecf04273480f10e48d25a1a40aa99476302b0bcd34ccb1c"}, @@ -44,6 +51,7 @@ "membrane_rtp_h264_plugin": {:hex, :membrane_rtp_h264_plugin, "0.20.2", "ab84db505d3102a9cdc300f137c78245ef3982a7ec545838f9544b6b0a2ca1ba", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.10.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}], "hexpm", "27f38c49544d1acf6f7c3f8770a7893f90813a31e8a26461e112a3d3142aff46"}, "membrane_rtp_mpegaudio_plugin": {:hex, :membrane_rtp_mpegaudio_plugin, "0.14.2", "3353287d6c6376e666c23337fd8fe61a2e14b8931ce87002c79cc281c7ef0c7a", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_mpegaudio_format, "~> 0.3.0", [hex: :membrane_mpegaudio_format, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.10.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}], "hexpm", "fe1a4e9e8b018d9e5d7b7e86e72657e1da8323ba04c79363e5511fb474c54873"}, "membrane_telemetry_metrics": {:hex, :membrane_telemetry_metrics, "0.1.1", "57917e72012f9ebe124eab54f29ca74c9d9eb3ae2207f55c95618ee51280eb4f", [:mix], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6.1 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "69392966e6bd51937244758c2b3d835c5ff47d8953d25431a9d37059737afc11"}, + "membrane_timestamp_queue": {:hex, :membrane_timestamp_queue, "0.2.2", "1c831b2273d018a6548654aa9f7fa7c4b683f71d96ffe164934ef55f9d11f693", [:mix], [{:heap, "~> 2.0", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c830e760baaced0988421671cd2c83c7cda8d1bd2b61fd05332711675d1204f"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"}, "mimerl": {:hex, :mimerl, "1.3.0", "d0cd9fc04b9061f82490f6581e0128379830e78535e017f7780f37fea7545726", [:rebar3], [], "hexpm", "a1e15a50d1887217de95f0b9b0793e32853f7c258a5cd227650889b38839fe9d"}, diff --git a/test/fixtures/rtp/h264/bun.h264 b/test/fixtures/rtp/h264/bun.h264 new file mode 100644 index 00000000..c0d36ac2 Binary files /dev/null and b/test/fixtures/rtp/h264/bun.h264 differ diff --git a/test/membrane/rtp/demuxer_muxer_integration_test.exs b/test/membrane/rtp/demuxer_muxer_integration_test.exs new file mode 100644 index 00000000..1399309b --- /dev/null +++ b/test/membrane/rtp/demuxer_muxer_integration_test.exs @@ -0,0 +1,127 @@ +defmodule Membrane.RTP.DemuxerMuxerTest do + @moduledoc false + use ExUnit.Case + import Membrane.Testing.Assertions + alias Membrane.RTP + alias Membrane.Testing + + @rtp_input %{ + pcap_path: "test/fixtures/rtp/session/demo_rtp.pcap", + packets: 862 + } + + defmodule ReferencePipeline do + use Membrane.Pipeline + + @impl true + def handle_init(_ctx, opts) do + spec = + child(:source, %Membrane.Pcap.Source{path: opts.input_path}) + |> child(:sink, Membrane.Testing.Sink) + + {[spec: spec], %{}} + end + end + + defmodule SubjectPipeline do + use Membrane.Pipeline + + @impl true + def handle_init(_ctx, opts) do + spec = [ + child(:source, %Membrane.Pcap.Source{path: opts.input_path}) + |> child(:rtp_demuxer, Membrane.RTP.Demuxer), + child(:rtp_muxer, Membrane.RTP.Muxer) + |> child(:sink, Membrane.Testing.Sink) + ] + + {[spec: spec], %{}} + end + + @impl true + def handle_child_notification( + {:new_rtp_stream, %{ssrc: ssrc, payload_type: pt}}, + :rtp_demuxer, + _ctx, + state + ) do + %{encoding_name: encoding_name, clock_rate: clock_rate} = + Membrane.RTP.PayloadFormat.get_payload_type_mapping(pt) + + spec = + get_child(:rtp_demuxer) + |> via_out(:output, options: [stream_id: {:ssrc, ssrc}]) + |> child({:jitter_buffer, ssrc}, %Membrane.RTP.JitterBuffer{clock_rate: clock_rate}) + |> via_in(:input, options: [encoding: encoding_name]) + |> get_child(:rtp_muxer) + + {[spec: spec], state} + end + end + + test "Demuxed and muxed stream is the same as unchanged one" do + reference_pipeline = + Testing.Pipeline.start_supervised!( + module: ReferencePipeline, + custom_args: %{input_path: @rtp_input.pcap_path} + ) + + subject_pipeline = + Testing.Pipeline.start_supervised!( + module: SubjectPipeline, + custom_args: %{input_path: @rtp_input.pcap_path} + ) + + assert_start_of_stream(reference_pipeline, :sink) + assert_start_of_stream(subject_pipeline, :sink) + + reference_normalized_packets = get_normalized_packets(reference_pipeline, @rtp_input.packets) + subject_normalized_packets = get_normalized_packets(subject_pipeline, @rtp_input.packets) + + assert reference_normalized_packets == subject_normalized_packets + + assert_end_of_stream(reference_pipeline, :sink) + assert_end_of_stream(subject_pipeline, :sink) + Testing.Pipeline.terminate(reference_pipeline) + Testing.Pipeline.terminate(subject_pipeline) + end + + @spec get_normalized_packets(pid(), non_neg_integer()) :: + %{RTP.encoding_name() => [ExRTP.Packet.t()]} + defp get_normalized_packets(pipeline, buffers_amount) do + Enum.map(1..buffers_amount, fn _i -> + assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{ + payload: payload + }) + + {:ok, packet} = ExRTP.Packet.decode(payload) + + packet + end) + |> Enum.group_by(& &1.payload_type) + |> Map.new(fn {payload_type, payload_type_buffers} -> + %{encoding_name: encoding_name} = RTP.PayloadFormat.get_payload_type_mapping(payload_type) + + sorted_packets = + payload_type_buffers + |> Enum.sort(&(&1.sequence_number < &2.sequence_number)) + + %{ssrc: ssrc, sequence_number: first_sequence_number, timestamp: first_timestamp} = + List.first(sorted_packets) + + normalized_packets = + sorted_packets + |> Enum.map(fn packet -> + %{ + packet + | ssrc: packet.ssrc - ssrc, + sequence_number: packet.sequence_number - first_sequence_number, + # round to ignore insignificant differences in timestamps + timestamp: round((packet.timestamp - first_timestamp) / 10) + } + end) + + {encoding_name, normalized_packets} + end) + end +end diff --git a/test/membrane/rtp/muxer_demuxer_integration_test.exs b/test/membrane/rtp/muxer_demuxer_integration_test.exs new file mode 100644 index 00000000..c5ac12cc --- /dev/null +++ b/test/membrane/rtp/muxer_demuxer_integration_test.exs @@ -0,0 +1,54 @@ +defmodule Membrane.RTP.MuxerDemuxerTest do + @moduledoc false + use ExUnit.Case + import Membrane.Testing.Assertions + alias Membrane.RTP + alias Membrane.Testing + + @input_path "test/fixtures/rtp/h264/bun.h264" + + defmodule MuxerDemuxerPipeline do + use Membrane.Pipeline + + @impl true + def handle_init(_ctx, opts) do + %{clock_rate: clock_rate} = RTP.PayloadFormat.resolve(encoding_name: :H264) + + spec = [ + child(:source, %Membrane.File.Source{location: opts.input_path}) + |> child(:h264_parser, %Membrane.H264.Parser{ + output_alignment: :nalu, + generate_best_effort_timestamps: %{framerate: {30, 1}} + }) + |> child(:rtp_h264_payloader, Membrane.RTP.H264.Payloader) + |> via_in(:input, options: [encoding: :H264]) + |> child(:rtp_muxer, Membrane.RTP.Muxer) + |> child(:rtp_demuxer, Membrane.RTP.Demuxer) + |> via_out(:output, options: [stream_id: {:encoding_name, :H264}]) + |> child(:jitter_buffer, %Membrane.RTP.JitterBuffer{clock_rate: clock_rate}) + |> child(:rtp_h264_depayloader, Membrane.RTP.H264.Depayloader) + |> child(:sink, %Membrane.File.Sink{location: opts.output_path}) + ] + + {[spec: spec], %{}} + end + end + + @tag :tmp_dir + test "Muxed and demuxed stream is the same as unchanged one", %{tmp_dir: tmp_dir} do + output_path = Path.join(tmp_dir, "output.h264") + + pipeline = + Testing.Pipeline.start_supervised!( + module: MuxerDemuxerPipeline, + custom_args: %{input_path: @input_path, output_path: output_path} + ) + + assert_start_of_stream(pipeline, :sink) + assert_end_of_stream(pipeline, :sink) + + assert File.read!(@input_path) == File.read!(output_path) + + Testing.Pipeline.terminate(pipeline) + end +end diff --git a/test/membrane/rtp/muxer_test.exs b/test/membrane/rtp/muxer_test.exs new file mode 100644 index 00000000..a183ab0f --- /dev/null +++ b/test/membrane/rtp/muxer_test.exs @@ -0,0 +1,70 @@ +defmodule Membrane.RTP.MuxerTest do + @moduledoc false + use ExUnit.Case + import Membrane.Testing.Assertions + alias Membrane.Testing + + @rtp_output %{ + video: %{payload_type: 96, packets: 1054}, + audio: %{payload_type: 127, packets: 431} + } + + defmodule Pipeline do + use Membrane.Pipeline + + @impl true + def handle_init(_ctx, _opts) do + spec = [ + child(:hackney_source, %Membrane.Hackney.Source{ + location: + "https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/bun10s.mp4", + hackney_opts: [follow_redirect: true] + }) + |> child(:mp4_demuxer, Membrane.MP4.Demuxer.ISOM) + |> via_out(:output, options: [kind: :video]) + |> child(:h264_parser, %Membrane.H264.Parser{ + output_stream_structure: :annexb, + output_alignment: :nalu + }) + |> child(:h264_payloader, Membrane.RTP.H264.Payloader) + |> via_in(:input, options: [encoding: :H264]) + |> child(:rtp_muxer, Membrane.RTP.Muxer) + |> child(:sink, Membrane.Testing.Sink), + get_child(:mp4_demuxer) + |> via_out(:output, options: [kind: :audio]) + |> child(:aac_parser, %Membrane.AAC.Parser{out_encapsulation: :none}) + |> child(:aac_payloader, %Membrane.RTP.AAC.Payloader{mode: :hbr, frames_per_packet: 1}) + |> via_in(:input, options: [encoding: :AAC]) + |> get_child(:rtp_muxer) + ] + + {[spec: spec], %{}} + end + end + + test "Muxer muxes correct amount of packets" do + pipeline = Testing.Pipeline.start_supervised!(module: Pipeline) + + %{audio: %{payload_type: audio_payload_type}, video: %{payload_type: video_payload_type}} = + @rtp_output + + assert_start_of_stream(pipeline, :sink) + + 1..@rtp_output.video.packets + |> Enum.each(fn _i -> + assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{ + metadata: %{rtp: %ExRTP.Packet{payload_type: ^video_payload_type}} + }) + end) + + 1..@rtp_output.audio.packets + |> Enum.each(fn _i -> + assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{ + metadata: %{rtp: %ExRTP.Packet{payload_type: ^audio_payload_type}} + }) + end) + + assert_end_of_stream(pipeline, :sink) + Testing.Pipeline.terminate(pipeline) + end +end diff --git a/test/support/rtcp_fixtures.ex b/test/support/rtcp_fixtures.ex index 8ca79b8f..3c372c5d 100644 --- a/test/support/rtcp_fixtures.ex +++ b/test/support/rtcp_fixtures.ex @@ -51,7 +51,7 @@ defmodule Membrane.RTCP.Fixtures do <<0x81, 0xCE, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x41, 0x6F, 0xB1, 0x0D>> end - @spec pli_contents() :: binary() + @spec pli_contents() :: map() def pli_contents() do %{ origin_ssrc: 1,