diff --git a/.circleci/config.yml b/.circleci/config.yml index 09fadb36..c077fb41 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,6 +6,25 @@ workflows: version: 2 build: jobs: - - elixir/build_test - - elixir/test - - elixir/lint + - elixir/build_test: + filters: &filters + tags: + only: /v.*/ + - elixir/test: + filters: + <<: *filters + - elixir/lint: + filters: + <<: *filters + - elixir/hex_publish: + requires: + - elixir/build_test + - elixir/test + - elixir/lint + context: + - Deployment + filters: + branches: + ignore: /.*/ + tags: + only: /v.*/ diff --git a/.formatter.exs b/.formatter.exs index 7d0a6367..5fc01778 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -1,6 +1,6 @@ [ inputs: [ - "{lib,test,config}/**/*.{ex,exs}", + "{lib,test,config,examples}/**/*.{ex,exs}", "c_src/**/*.spec.exs", ".formatter.exs", "*.exs" diff --git a/README.md b/README.md index 528e593a..48b5bdb6 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![CircleCI](https://circleci.com/gh/membraneframework/membrane_rtmp_plugin.svg?style=svg)](https://circleci.com/gh/membraneframework/membrane_rtmp_plugin) This package provides RTMP server which receives an RTMP stream from a client and an element for streaming to an RTMP server. -It is part of [Membrane Multimedia Framework](https://membraneframework.org). +It is a part of [Membrane Multimedia Framework](https://membraneframework.org). ## Installation @@ -14,14 +14,13 @@ The package can be installed by adding `membrane_rtmp_plugin` to your list of de ```elixir def deps do [ - {:membrane_rtmp_plugin, "~> 0.9.1"} + {:membrane_rtmp_plugin, "~> 0.10.0"} ] end ``` ## SourceBin - -Requires a socket, which has been connected to the client. It receives RTMP stream, demuxes it and outputs H264 video and AAC audio. +Requires a socket, which has been connected to the client. It receives RTMP stream, demuxes it and outputs H264 video and AAC audio. ## Client After establishing connection with server it waits to receive video and audio streams. Once both streams are received they are streamed to the server. @@ -29,7 +28,7 @@ Currently only the following codecs are supported: - H264 for video - AAC for audio -## Tcp Server +## TCP Server It's a simple implementation of tcp server. It opens a tcp port and listens for incoming connections. For each new connection, a user-provided function is executed. ### Prerequisites @@ -62,7 +61,7 @@ RTMP server that will receive this stream can be launched with ffmpeg by running ```bash export RTMP_URL=rtmp://localhost:1935 -ffmpeg -listen 1 -f flv -i rtmp://localhost:1935 -c copy dest.flv +ffmpeg -y -listen 1 -f flv -i rtmp://localhost:1935 -c copy dest.flv ``` It will receive stream and once streaming is completed dump it to .flv file. If you are using the command above, please remember to run it **before** the streaming script. diff --git a/c_src/membrane_rtmp_plugin/sink/rtmp_sink.c b/c_src/membrane_rtmp_plugin/sink/rtmp_sink.c index fd964367..5f51a203 100644 --- a/c_src/membrane_rtmp_plugin/sink/rtmp_sink.c +++ b/c_src/membrane_rtmp_plugin/sink/rtmp_sink.c @@ -51,7 +51,7 @@ UNIFEX_TERM init_video_stream(UnifexEnv *env, State *state, int width, int height, UnifexPayload *avc_config) { AVStream *video_stream; if (state->video_stream_index != -1) { - return init_video_stream_result_error_caps_resent(env); + return init_video_stream_result_error_stream_format_resent(env); } video_stream = avformat_new_stream(state->output_ctx, NULL); @@ -89,7 +89,7 @@ UNIFEX_TERM init_audio_stream(UnifexEnv *env, State *state, int channels, int sample_rate, UnifexPayload *aac_config) { AVStream *audio_stream; if (state->audio_stream_index != -1) { - return init_audio_stream_result_error_caps_resent(env); + return init_audio_stream_result_error_stream_format_resent(env); } audio_stream = avformat_new_stream(state->output_ctx, NULL); @@ -127,14 +127,15 @@ UNIFEX_TERM write_video_frame(UnifexEnv *env, State *state, int is_key_frame) { if (state->video_stream_index == -1) { return write_video_frame_result_error( - env, "Video stream is not initialized. Caps has not been received"); + env, + "Video stream is not initialized. Stream format has not been received"); } AVRational video_stream_time_base = state->output_ctx->streams[state->video_stream_index]->time_base; AVPacket *packet = av_packet_alloc(); - uint8_t* data = (uint8_t *)av_malloc(frame->size); + uint8_t *data = (uint8_t *)av_malloc(frame->size); memcpy(data, frame->data, frame->size); av_packet_from_data(packet, data, frame->size); @@ -162,9 +163,11 @@ UNIFEX_TERM write_video_frame(UnifexEnv *env, State *state, packet->duration = dts_scaled - state->current_video_dts; state->current_video_dts = dts_scaled; - if (av_write_frame(state->output_ctx, packet)) { + int result = av_write_frame(state->output_ctx, packet); + + if (result) { write_frame_result = - write_video_frame_result_error(env, "Failed writing video frame"); + write_video_frame_result_error(env, av_err2str(result)); goto end; } write_frame_result = write_video_frame_result_ok(env, state); @@ -178,15 +181,15 @@ UNIFEX_TERM write_audio_frame(UnifexEnv *env, State *state, UnifexPayload *frame, int64_t pts) { if (state->audio_stream_index == -1) { return write_audio_frame_result_error( - env, - "Audio stream has not been initialized. Caps has not been received"); + env, "Audio stream has not been initialized. Stream format has not " + "been received"); } AVRational audio_stream_time_base = state->output_ctx->streams[state->audio_stream_index]->time_base; AVPacket *packet = av_packet_alloc(); - uint8_t* data = (uint8_t *)av_malloc(frame->size); + uint8_t *data = (uint8_t *)av_malloc(frame->size); memcpy(data, frame->data, frame->size); av_packet_from_data(packet, data, frame->size); @@ -209,9 +212,11 @@ UNIFEX_TERM write_audio_frame(UnifexEnv *env, State *state, packet->duration = pts_scaled - state->current_audio_pts; state->current_audio_pts = pts_scaled; - if (av_write_frame(state->output_ctx, packet)) { + int result = av_write_frame(state->output_ctx, packet); + + if (result) { write_frame_result = - write_audio_frame_result_error(env, "Failed writing audio frame"); + write_audio_frame_result_error(env, av_err2str(result)); goto end; } write_frame_result = write_audio_frame_result_ok(env, state); diff --git a/c_src/membrane_rtmp_plugin/sink/rtmp_sink.h b/c_src/membrane_rtmp_plugin/sink/rtmp_sink.h index 9f1ea763..06f3ca29 100644 --- a/c_src/membrane_rtmp_plugin/sink/rtmp_sink.h +++ b/c_src/membrane_rtmp_plugin/sink/rtmp_sink.h @@ -19,4 +19,4 @@ struct State { bool header_written; }; -#include "_generated/rtmp_sink.h" \ No newline at end of file +#include "_generated/rtmp_sink.h" diff --git a/c_src/membrane_rtmp_plugin/sink/rtmp_sink.spec.exs b/c_src/membrane_rtmp_plugin/sink/rtmp_sink.spec.exs index 8dc4ffd3..dab08131 100644 --- a/c_src/membrane_rtmp_plugin/sink/rtmp_sink.spec.exs +++ b/c_src/membrane_rtmp_plugin/sink/rtmp_sink.spec.exs @@ -14,13 +14,13 @@ spec try_connect(state) :: spec finalize_stream(state) :: :ok :: label spec init_video_stream(state, width :: int, height :: int, avc_config :: payload) :: - {:ok :: label, ready :: bool, state} | {:error :: label, :caps_resent :: label} + {:ok :: label, ready :: bool, state} | {:error :: label, :stream_format_resent :: label} spec write_video_frame(state, frame :: payload, dts :: int64, pts :: int64, is_key_frame :: bool) :: {:ok :: label, state} | {:error :: label, reason :: string} spec init_audio_stream(state, channels :: int, sample_rate :: int, aac_config :: payload) :: - {:ok :: label, ready :: bool, state} | {:error :: label, :caps_resent :: label} + {:ok :: label, ready :: bool, state} | {:error :: label, :stream_format_resent :: label} spec write_audio_frame(state, frame :: payload, pts :: int64) :: {:ok :: label, state} | {:error :: label, reason :: string} diff --git a/examples/sink.exs b/examples/sink.exs index 449f4bf7..d86488ee 100644 --- a/examples/sink.exs +++ b/examples/sink.exs @@ -1,85 +1,76 @@ # Before running this example, make sure that target RTMP server is live. # If you are streaming to eg. Youtube, you don't need to worry about it. # If you want to test it locally, you can run the FFmpeg server with: -# ffmpeg -listen 1 -f flv -i rtmp://localhost:1935 -c copy dest.flv +# ffmpeg -y -listen 1 -f flv -i rtmp://localhost:1935 -c copy dest.flv Mix.install([ - {:membrane_core, "~> 0.10.1"}, - {:membrane_rtmp_plugin, path: __DIR__ |> Path.join("..") |> Path.expand()}, :membrane_realtimer_plugin, :membrane_hackney_plugin, - :membrane_h264_ffmpeg_plugin, - :membrane_aac_plugin, - :membrane_mp4_plugin + {:membrane_rtmp_plugin, path: __DIR__ |> Path.join("..") |> Path.expand()} ]) defmodule Example do use Membrane.Pipeline + @samples_url "https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/" + @video_url @samples_url <> "bun33s_480x270.h264" + @audio_url @samples_url <> "bun33s.aac" + @impl true - def handle_init(_opts) do - children = [ - video_source: %Membrane.Hackney.Source{ - location: "https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/bun33s_480x270.h264", + def handle_init(_ctx, destination: destination) do + structure = [ + child(:rtmp_sink, %Membrane.RTMP.Sink{rtmp_url: destination}), + child(:video_source, %Membrane.Hackney.Source{ + location: @video_url, hackney_opts: [follow_redirect: true] - }, - video_parser: %Membrane.H264.FFmpeg.Parser{ + }) + |> child(:video_parser, %Membrane.H264.FFmpeg.Parser{ framerate: {25, 1}, alignment: :au, attach_nalus?: true, skip_until_keyframe?: true - }, - audio_parser: %Membrane.AAC.Parser{ + }) + |> child(:video_realtimer, Membrane.Realtimer) + |> child(:video_payloader, Membrane.MP4.Payloader.H264) + |> via_in(:video) + |> get_child(:rtmp_sink), + child(:audio_source, %Membrane.Hackney.Source{ + location: @audio_url, + hackney_opts: [follow_redirect: true] + }) + |> child(:audio_parser, %Membrane.AAC.Parser{ in_encapsulation: :ADTS, out_encapsulation: :none - }, - audio_source: %Membrane.Hackney.Source{ - location: "https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/bun33s.aac", - hackney_opts: [follow_redirect: true] - }, - video_realtimer: Membrane.Realtimer, - audio_realtimer: Membrane.Realtimer, - video_payloader: Membrane.MP4.Payloader.H264, - rtmp_sink: %Membrane.RTMP.Sink{rtmp_url: System.get_env("RTMP_URL", "rtmp://localhost:1935")} - ] - - links = [ - link(:video_source) - |> to(:video_parser) - |> to(:video_realtimer) - |> to(:video_payloader) - |> via_in(:video) - |> to(:rtmp_sink), - link(:audio_source) - |> to(:audio_parser) - |> to(:audio_realtimer) + }) + |> child(:audio_realtimer, Membrane.Realtimer) |> via_in(:audio) - |> to(:rtmp_sink) + |> get_child(:rtmp_sink) ] - {{:ok, spec: %ParentSpec{children: children, links: links}, playback: :playing}, %{finished_streams: []}} + {[spec: structure, playback: :playing], %{streams_to_end: 2}} end # The rest of the example module is only used for self-termination of the pipeline after processing finishes @impl true - def handle_element_end_of_stream({:rtmp_sink, pad}, _ctx, state) when length(state.finished_streams) == 1 do - Membrane.Pipeline.terminate(self()) - {:ok, Map.put(state, :finished_streams, &[pad | &1])} + def handle_element_end_of_stream(:rtmp_sink, _pad, _ctx, %{streams_to_end: 1} = state) do + {[terminate: :shutdown], %{state | streams_to_end: 0}} end @impl true - def handle_element_end_of_stream({:rtmp_sink, pad}, _ctx, state) do - {:ok, Map.put(state, :finished_streams, [pad])} + def handle_element_end_of_stream(:rtmp_sink, _pad, _ctx, state) do + {[], %{state | streams_to_end: 1}} end @impl true - def handle_element_end_of_stream(_element, _ctx, state) do - {:ok, state} + def handle_element_end_of_stream(_child, _pad, _ctx, state) do + {[], state} end end +destination = System.get_env("RTMP_URL", "rtmp://localhost:1935") + # Initialize the pipeline and start it -{:ok, pipeline} = Example.start_link() +{:ok, _supervisor, pipeline} = Example.start_link(destination: destination) monitor_ref = Process.monitor(pipeline) diff --git a/examples/source.exs b/examples/source.exs index e43f5d94..7bc472e9 100644 --- a/examples/source.exs +++ b/examples/source.exs @@ -3,12 +3,11 @@ # ffmpeg -re -i test/fixtures/testsrc.flv -f flv -c:v copy -c:a copy rtmp://localhost:5000 Mix.install([ - {:membrane_core, "~> 0.10"}, - {:membrane_rtmp_plugin, path: __DIR__ |> Path.join("../") |> Path.expand()}, - :membrane_file_plugin, + :membrane_aac_plugin, :membrane_mp4_plugin, :membrane_flv_plugin, - :membrane_aac_plugin + :membrane_file_plugin, + {:membrane_rtmp_plugin, path: __DIR__ |> Path.join("..") |> Path.expand()} ]) defmodule Pipeline do @@ -17,33 +16,29 @@ defmodule Pipeline do @output_file "received.flv" @impl true - def handle_init(socket: socket) do - spec = %ParentSpec{ - children: %{ - source: %Membrane.RTMP.SourceBin{ - socket: socket - }, - video_payloader: Membrane.MP4.Payloader.H264, - muxer: Membrane.FLV.Muxer, - sink: %Membrane.File.Sink{location: @output_file} - }, - links: [ - link(:source) |> via_out(:audio) |> via_in(Pad.ref(:audio, 0)) |> to(:muxer), - link(:source) - |> via_out(:video) - |> to(:video_payloader) - |> via_in(Pad.ref(:video, 0)) - |> to(:muxer), - link(:muxer) |> to(:sink) - ] - } - - {{:ok, spec: spec, playback: :playing}, %{}} + def handle_init(_ctx, socket: socket) do + structure = [ + child(:source, %Membrane.RTMP.SourceBin{ + socket: socket + }), + child(:video_payloader, Membrane.MP4.Payloader.H264), + child(:muxer, Membrane.FLV.Muxer), + child(:sink, %Membrane.File.Sink{location: @output_file}), + get_child(:source) |> via_out(:audio) |> via_in(Pad.ref(:audio, 0)) |> get_child(:muxer), + get_child(:source) + |> via_out(:video) + |> get_child(:video_payloader) + |> via_in(Pad.ref(:video, 0)) + |> get_child(:muxer), + get_child(:muxer) |> get_child(:sink) + ] + + {[spec: structure, playback: :playing], %{}} end # Once the source initializes, we grant it the control over the tcp socket @impl true - def handle_notification( + def handle_child_notification( {:socket_control_needed, _socket, _source} = notification, :source, _ctx, @@ -51,15 +46,15 @@ defmodule Pipeline do ) do send(self(), notification) - {:ok, state} + {[], state} end - def handle_notification(_notification, _child, _ctx, state) do - {:ok, state} + def handle_child_notification(_notification, _child, _ctx, state) do + {[], state} end @impl true - def handle_other({:socket_control_needed, socket, source} = notification, _ctx, state) do + def handle_info({:socket_control_needed, socket, source} = notification, _ctx, state) do case Membrane.RTMP.SourceBin.pass_control(socket, source) do :ok -> :ok @@ -68,19 +63,18 @@ defmodule Pipeline do Process.send_after(self(), notification, 200) end - {:ok, state} + {[], state} end # The rest of the module is used for self-termination of the pipeline after processing finishes @impl true - def handle_element_end_of_stream({:sink, _pad}, _ctx, state) do - Membrane.Pipeline.terminate(self()) - {:ok, state} + def handle_element_end_of_stream(:sink, _pad, _ctx, state) do + {[terminate: :shutdown], state} end @impl true - def handle_element_end_of_stream({_child, _pad}, _ctx, state) do - {:ok, state} + def handle_element_end_of_stream(_child, _pad, _ctx, state) do + {[], state} end end @@ -101,7 +95,7 @@ defmodule Example do ], socket_handler: fn socket -> # On new connection a pipeline is started - {:ok, pipeline} = Pipeline.start_link(socket: socket) + {:ok, _supervisor, pipeline} = Pipeline.start_link(socket: socket) send(parent, {:pipeline_spawned, pipeline}) {:ok, pipeline} end diff --git a/lib/membrane_rtmp_plugin/rtmp/sink/sink.ex b/lib/membrane_rtmp_plugin/rtmp/sink/sink.ex index 5f8543a9..26c69e32 100644 --- a/lib/membrane_rtmp_plugin/rtmp/sink/sink.ex +++ b/lib/membrane_rtmp_plugin/rtmp/sink/sink.ex @@ -1,10 +1,11 @@ defmodule Membrane.RTMP.Sink do @moduledoc """ Membrane element being client-side of RTMP streams. - To work successfuly it requires to receive both audio and video streams in AAC and H264 format respectively. Currently it supports only: + To work successfuly it requires to receive both audio and video streams in AAC and H264 format respectively. + Currently it supports only: - RTMP proper - "plain" RTMP protocol - RTMPS - RTMP over TLS/SSL - other RTMP veriants - RTMPT, RTMPE, RTMFP are not supported. + other RTMP variants - RTMPT, RTMPE, RTMFP are not supported. Implementation based on FFmpeg. """ use Membrane.Sink @@ -16,36 +17,25 @@ defmodule Membrane.RTMP.Sink do @supported_protocols ["rtmp://", "rtmps://"] @connection_attempt_interval 500 - @default_state %{ - attempts: 0, - native: nil, - # Keys here are the pad names. - frame_buffer: %{audio: nil, video: nil}, - ready?: false, - # Activated when one of the source inputs gets closed. Interleaving is - # disabled, frame buffer is flushed and from that point buffers on the - # remaining pad are simply forwarded to the output. - forward_mode?: false - } def_input_pad :audio, availability: :always, - caps: AAC, + accepted_format: AAC, mode: :pull, demand_unit: :buffers def_input_pad :video, availability: :always, - caps: MP4.Payload, + accepted_format: MP4.Payload, mode: :pull, demand_unit: :buffers def_options rtmp_url: [ - type: :string, spec: String.t(), - description: - "Destination URL of the stream. It needs to start with rtmp:// or rtmps:// depending on the protocol variant. - This URL should be provided by your streaming service." + description: """ + Destination URL of the stream. It needs to start with rtmp:// or rtmps:// depending on the protocol variant. + This URL should be provided by your streaming service. + """ ], max_attempts: [ spec: pos_integer() | :infinity, @@ -57,7 +47,7 @@ defmodule Membrane.RTMP.Sink do ] @impl true - def handle_init(options) do + def handle_init(_ctx, options) do unless String.starts_with?(options.rtmp_url, @supported_protocols) do raise ArgumentError, "Invalid destination URL provided" end @@ -67,78 +57,101 @@ defmodule Membrane.RTMP.Sink do raise ArgumentError, "Invalid max_attempts option value: #{options.max_attempts}" end - {:ok, options |> Map.from_struct() |> Map.merge(@default_state)} + state = + options + |> Map.from_struct() + |> Map.merge(%{ + attempts: 0, + native: nil, + # Keys here are the pad names. + frame_buffer: %{audio: nil, video: nil}, + ready?: false, + # Activated when one of the source inputs gets closed. Interleaving is + # disabled, frame buffer is flushed and from that point buffers on the + # remaining pad are simply forwarded to the output. + forward_mode?: false + }) + + {[], state} end @impl true - def handle_prepared_to_playing(_ctx, state) do + def handle_setup(_ctx, state) do {:ok, native} = Native.create(state.rtmp_url) - send(self(), :try_connect) - {{:ok, playback_change: :suspend}, %{state | native: native}} + state + |> Map.put(:native, native) + |> try_connect() + |> then(&{[], &1}) end @impl true - def handle_playing_to_prepared(_ctx, state) do - state = Map.merge(state, @default_state) - Membrane.Logger.debug("Stream correctly closed") - {:ok, state} + def handle_playing(_ctx, state) do + {build_demand(state), state} end @impl true - def handle_caps( + def handle_stream_format( :video, - %MP4.Payload{content: %MP4.Payload.AVC1{avcc: avc_config}} = caps, + %MP4.Payload{content: %MP4.Payload.AVC1{avcc: avc_config}} = stream_format, _ctx, state ) do - case Native.init_video_stream(state.native, caps.width, caps.height, avc_config) do - {:ok, ready, native} -> + case Native.init_video_stream( + state.native, + stream_format.width, + stream_format.height, + avc_config + ) do + {:ok, ready?, native} -> Membrane.Logger.debug("Correctly initialized video stream.") - state = Map.merge(state, %{native: native, ready: ready}) - {:ok, state} + {[], %{state | native: native, ready?: ready?}} - {:error, :caps_resent} -> + {:error, :stream_format_resent} -> Membrane.Logger.error( - "Input caps redefined on pad :video. RTMP Sink does not support changing stream parameters" + "Input stream format redefined on pad :video. RTMP Sink does not support dynamic stream parameters" ) - {:ok, state} + {[], state} end end @impl true - def handle_caps(:audio, %Membrane.AAC{} = caps, _ctx, state) do - profile = AAC.profile_to_aot_id(caps.profile) - sr_index = AAC.sample_rate_to_sampling_frequency_id(caps.sample_rate) - channel_configuration = AAC.channels_to_channel_config_id(caps.channels) - frame_length_id = AAC.samples_per_frame_to_frame_length_id(caps.samples_per_frame) + def handle_stream_format(:audio, %Membrane.AAC{} = stream_format, _ctx, state) do + profile = AAC.profile_to_aot_id(stream_format.profile) + sr_index = AAC.sample_rate_to_sampling_frequency_id(stream_format.sample_rate) + channel_configuration = AAC.channels_to_channel_config_id(stream_format.channels) + frame_length_id = AAC.samples_per_frame_to_frame_length_id(stream_format.samples_per_frame) aac_config = <> - case Native.init_audio_stream(state.native, caps.channels, caps.sample_rate, aac_config) do - {:ok, ready, native} -> + case Native.init_audio_stream( + state.native, + stream_format.channels, + stream_format.sample_rate, + aac_config + ) do + {:ok, ready?, native} -> Membrane.Logger.debug("Correctly initialized audio stream.") - state = Map.merge(state, %{native: native, ready: ready}) - {:ok, state} + {[], %{state | native: native, ready?: ready?}} - {:error, :caps_resent} -> + {:error, :stream_format_resent} -> Membrane.Logger.error( - "Input caps redefined on pas :audio. RTMP Sink does not support changing stream paremeters" + "Input stream format redefined on pad :audio. RTMP Sink does not support dynamic stream parameters" ) - {:ok, state} + {[], state} end end @impl true - def handle_write(pad, buffer, _ctx, %{ready: false} = state) do - {:ok, fill_frame_buffer(state, pad, buffer)} + def handle_write(pad, buffer, _ctx, %{ready?: false} = state) do + {[], fill_frame_buffer(state, pad, buffer)} end def handle_write(pad, buffer, _ctx, %{forward_mode?: true} = state) do - {{:ok, demand: pad}, write_frame(state, pad, buffer)} + {[demand: pad], write_frame(state, pad, buffer)} end def handle_write(pad, buffer, _ctx, state) do @@ -151,7 +164,7 @@ defmodule Membrane.RTMP.Sink do def handle_end_of_stream(pad, _ctx, state) do if state.forward_mode? do Native.finalize_stream(state.native) - {:ok, state} + {[], state} else # The interleave logic does not work if either one of the inputs does not # produce buffers. From this point on we act as a "forward" filter. @@ -162,43 +175,42 @@ defmodule Membrane.RTMP.Sink do end state = flush_frame_buffer(state) - {{:ok, demand: other_pad}, %{state | forward_mode?: true}} + {[demand: other_pad], %{state | forward_mode?: true}} end end - @impl true - def handle_other(:try_connect, _ctx, %{attempts: attempts, max_attempts: max_attempts} = state) - when max_attempts != :infinity and attempts >= max_attempts do + defp try_connect(%{attempts: attempts, max_attempts: max_attempts} = state) + when max_attempts != :infinity and attempts >= max_attempts do raise "failed to connect to '#{state.rtmp_url}' #{attempts} times, aborting" end - def handle_other(:try_connect, _ctx, state) do + defp try_connect(state) do state = %{state | attempts: state.attempts + 1} case Native.try_connect(state.native) do :ok -> Membrane.Logger.debug("Correctly initialized connection with: #{state.rtmp_url}") - {{:ok, [{:playback_change, :resume} | build_demand(state)]}, state} + state {:error, error} when error in [:econnrefused, :etimedout] -> - Process.send_after(self(), :try_connect, @connection_attempt_interval) - Membrane.Logger.warn( "Connection to #{state.rtmp_url} refused, retrying in #{@connection_attempt_interval}ms" ) - {:ok, state} + Process.sleep(@connection_attempt_interval) + + try_connect(state) {:error, reason} -> - raise "failed to connect to '#{state.rtmp_url}': #{reason}" + raise "failed to connect to '#{state.rtmp_url}': #{inspect(reason)}" end end defp build_demand(%{frame_buffer: frame_buffer}) do frame_buffer |> Enum.filter(fn {_pad, buffer} -> buffer == nil end) - |> Enum.map(fn {pad, _} -> {:demand, pad} end) + |> Enum.map(fn {pad, _buffer} -> {:demand, pad} end) end defp fill_frame_buffer(state, pad, buffer) do @@ -209,15 +221,15 @@ defmodule Membrane.RTMP.Sink do end end - defp write_frame_interleaved(state = %{frame_buffer: %{audio: audio, video: video}}) + defp write_frame_interleaved(%{frame_buffer: %{audio: audio, video: video}} = state) when audio == nil or video == nil do # We still have to wait for the other frame. - {:ok, state} + {[], state} end defp write_frame_interleaved(%{frame_buffer: frame_buffer} = state) do {pad, buffer} = - Enum.min_by(frame_buffer, fn {_, buffer} -> + Enum.min_by(frame_buffer, fn {_pad, buffer} -> buffer |> Buffer.get_dts_or_pts() |> Ratio.ceil() @@ -228,7 +240,7 @@ defmodule Membrane.RTMP.Sink do |> write_frame(pad, buffer) |> put_in([:frame_buffer, pad], nil) - {{:ok, build_demand(state)}, state} + {build_demand(state), state} end defp flush_frame_buffer(%{frame_buffer: frame_buffer} = state) do diff --git a/lib/membrane_rtmp_plugin/rtmp/source/bin.ex b/lib/membrane_rtmp_plugin/rtmp/source/bin.ex index 369aa2bf..b9784839 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/bin.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/bin.ex @@ -23,13 +23,13 @@ defmodule Membrane.RTMP.SourceBin do alias Membrane.{AAC, H264, RTMP} def_output_pad :video, - caps: H264, + accepted_format: H264, availability: :always, mode: :pull, demand_unit: :buffers def_output_pad :audio, - caps: AAC, + accepted_format: AAC, availability: :always, mode: :pull, demand_unit: :buffers @@ -50,57 +50,53 @@ defmodule Membrane.RTMP.SourceBin do ] @impl true - def handle_init(%__MODULE__{} = opts) do - spec = %ParentSpec{ - children: %{ - src: %RTMP.Source{socket: opts.socket, validator: opts.validator}, - demuxer: Membrane.FLV.Demuxer, - video_parser: %Membrane.H264.FFmpeg.Parser{ - alignment: :au, - attach_nalus?: true, - skip_until_keyframe?: true - }, - audio_parser: %Membrane.AAC.Parser{ - in_encapsulation: :none, - out_encapsulation: :none - } - }, - links: [ - link(:src) |> to(:demuxer), - # - link(:demuxer) - |> via_out(Pad.ref(:audio, 0)) - |> to(:audio_parser) - |> to_bin_output(:audio), - # - link(:demuxer) - |> via_out(Pad.ref(:video, 0)) - |> to(:video_parser) - |> to_bin_output(:video) - ] - } + def handle_init(_ctx, %__MODULE__{} = opts) do + structure = [ + child(:src, %RTMP.Source{socket: opts.socket, validator: opts.validator}) + |> child(:demuxer, Membrane.FLV.Demuxer), + # + child(:audio_parser, %Membrane.AAC.Parser{ + in_encapsulation: :none, + out_encapsulation: :none + }), + child(:video_parser, %Membrane.H264.FFmpeg.Parser{ + alignment: :au, + attach_nalus?: true, + skip_until_keyframe?: true + }), + # + get_child(:demuxer) + |> via_out(Pad.ref(:audio, 0)) + |> get_child(:audio_parser) + |> bin_output(:audio), + # + get_child(:demuxer) + |> via_out(Pad.ref(:video, 0)) + |> get_child(:video_parser) + |> bin_output(:video) + ] - {{:ok, spec: spec}, %{}} + {[spec: structure], %{}} end @impl true - def handle_notification( + def handle_child_notification( {:socket_control_needed, _socket, _pid} = notification, :src, _ctx, state ) do - {{:ok, [notify: notification]}, state} + {[notify_parent: notification], state} end - def handle_notification( + def handle_child_notification( {type, _stage, _reason} = notification, :src, _ctx, state ) when type in [:stream_validation_success, :stream_validation_error] do - {{:ok, [notify: notification]}, state} + {[notify_parent: notification], state} end @doc """ @@ -108,7 +104,7 @@ defmodule Membrane.RTMP.SourceBin do To succeed, the executing process must be in control of the socket, otherwise `{:error, :not_owner}` is returned. """ - @spec pass_control(:gen_tcp.socket(), pid) :: :ok | {:error, atom} + @spec pass_control(:gen_tcp.socket(), pid()) :: :ok | {:error, atom()} def pass_control(socket, source) do :gen_tcp.controlling_process(socket, source) end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex b/lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex index af5510f6..85eede41 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex @@ -254,8 +254,8 @@ defmodule Membrane.RTMP.MessageHandler do defp validation_action(state, stage, result) do notification = case result do - {:ok, msg} -> {:notify, {:stream_validation_success, stage, msg}} - {:error, reason} -> {:notify, {:stream_validation_error, stage, reason}} + {:ok, msg} -> {:notify_parent, {:stream_validation_success, stage, msg}} + {:error, reason} -> {:notify_parent, {:stream_validation_error, stage, reason}} end Map.update!(state, :actions, &[notification | &1]) diff --git a/lib/membrane_rtmp_plugin/rtmp/source/source.ex b/lib/membrane_rtmp_plugin/rtmp/source/source.ex index ee84425a..4d14e4c7 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/source.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/source.ex @@ -19,13 +19,14 @@ defmodule Membrane.RTMP.Source do def_output_pad :output, availability: :always, - caps: Membrane.RemoteStream, + accepted_format: Membrane.RemoteStream, mode: :pull def_options socket: [ spec: :gen_tcp.socket(), description: """ - Socket on which the source will be receiving the RTMP stream. The socket must be already connected to the RTMP client and be in non-active mode (`active` set to `false`). + Socket on which the source will be receiving the RTMP stream. + The socket must be already connected to the RTMP client and be in non-active mode (`active` set to `false`). """ ], validator: [ @@ -43,7 +44,7 @@ defmodule Membrane.RTMP.Source do @type validation_stage_t :: :publish | :release_stream | :set_data_frame @typedoc """ - Notification sent when the validator approves given validation stage.. + Notification sent when the validator approves given validation stage. """ @type stream_validation_success_t() :: {:stream_validation_success, validation_stage_t(), result :: any()} @@ -62,24 +63,27 @@ defmodule Membrane.RTMP.Source do @type unexpected_socket_closed_t() :: :unexpected_socket_closed @impl true - def handle_init(%__MODULE__{} = opts) do - {{:ok, [notify: {:socket_control_needed, opts.socket, self()}]}, - Map.from_struct(opts) - |> Map.merge(%{ - actions: [], - header_sent?: false, - message_parser: MessageParser.init(Handshake.init_server()), - receiver_pid: nil, - socket_ready?: false, - # how many times the Source tries to get control of the socket - socket_retries: 3, - # epoch required for performing a handshake with the pipeline - epoch: 0 - })} + def handle_init(_ctx, %__MODULE__{} = opts) do + state = + opts + |> Map.from_struct() + |> Map.merge(%{ + actions: [], + header_sent?: false, + message_parser: MessageParser.init(Handshake.init_server()), + receiver_pid: nil, + socket_ready?: false, + # how many times the Source tries to get control of the socket + socket_retries: 3, + # epoch required for performing a handshake with the pipeline + epoch: 0 + }) + + {[notify_parent: {:socket_control_needed, state.socket, self()}], state} end @impl true - def handle_prepared_to_playing(_ctx, state) do + def handle_playing(_ctx, state) do target_pid = self() {:ok, receiver_process} = @@ -89,11 +93,12 @@ defmodule Membrane.RTMP.Source do send(self(), :start_receiving) - actions = [ - caps: {:output, %Membrane.RemoteStream{content_format: Membrane.FLV, type: :bytestream}} + stream_format = [ + stream_format: + {:output, %Membrane.RemoteStream{content_format: Membrane.FLV, type: :bytestream}} ] - {{:ok, actions}, %{state | receiver_pid: receiver_process}} + {stream_format, %{state | receiver_pid: receiver_process}} end defp receive_loop(socket, target) do @@ -117,72 +122,59 @@ defmodule Membrane.RTMP.Source do @impl true def handle_demand(_pad, _size, _unit, _ctx, state) when state.socket_ready? do :inet.setopts(state.socket, active: :once) - {:ok, state} + {[], state} end @impl true def handle_demand(_pad, _size, _unit, _ctx, state) do - {:ok, state} + {[], state} end @impl true - def handle_playing_to_prepared(_ctx, state) do + def handle_terminate_request(_ctx, state) do send(state.receiver_pid, :terminate) - {:ok, %{state | receiver_pid: nil}} + {[terminate: :normal], %{state | receiver_pid: nil}} end @impl true - def handle_other(:start_receiving, _ctx, %{socket_retries: 0} = state) do + def handle_info(:start_receiving, _ctx, %{socket_retries: 0} = state) do Membrane.Logger.warn("Failed to take control of the socket") - {:ok, state} + {[], state} end - def handle_other(:start_receiving, _ctx, %{socket_retries: retries} = state) do + def handle_info(:start_receiving, _ctx, %{socket_retries: retries} = state) do case :gen_tcp.controlling_process(state.socket, state.receiver_pid) do :ok -> :ok = :inet.setopts(state.socket, active: :once) - {:ok, %{state | socket_ready?: true}} + {[], %{state | socket_ready?: true}} {:error, :not_owner} -> Process.send_after(self(), :start_receiving, 200) - {:ok, %{state | socket_retries: retries - 1}} + {[], %{state | socket_retries: retries - 1}} end end @impl true - def handle_other({:tcp, socket, packet}, _ctx, state) do - state = %{state | socket: socket} - + def handle_info({:tcp, socket, packet}, _ctx, %{socket: socket} = state) do {messages, message_parser} = MessageHandler.parse_packet_messages(packet, state.message_parser) state = MessageHandler.handle_client_messages(messages, state) - {actions, state} = get_actions(state) - {{:ok, actions}, %{state | message_parser: message_parser}} + {state.actions, %{state | actions: [], message_parser: message_parser}} end @impl true - def handle_other({:tcp_closed, _socket}, ctx, state) do + def handle_info({:tcp_closed, _socket}, ctx, state) do if ctx.pads.output.start_of_stream? do - {{:ok, end_of_stream: :output}, state} + {[end_of_stream: :output], state} else - {{:ok, notify: :unexpected_socket_closed}, state} + {[notify_parent: :unexpected_socket_closed], state} end end @impl true - def handle_other(_message, _ctx, state) do - {:ok, state} - end - - defp get_actions(state) do - case state do - %{actions: [_action | _rest] = actions} -> - {actions, %{state | actions: []}} - - _state -> - {[], state} - end + def handle_info(_message, _ctx, state) do + {[], state} end end diff --git a/mix.exs b/mix.exs index 773e06c1..bea047c0 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.RTMP.Mixfile do use Mix.Project - @version "0.9.1" + @version "0.10.0" @github_url "https://github.com/membraneframework/membrane_rtmp_plugin" def project do @@ -38,17 +38,17 @@ defmodule Membrane.RTMP.Mixfile do defp deps do [ - {:membrane_core, "~> 0.10.0"}, - {:unifex, "~> 1.0"}, - {:membrane_h264_ffmpeg_plugin, "~> 0.22.0"}, - {:membrane_aac_plugin, "~> 0.12.1"}, - {:membrane_mp4_plugin, "~> 0.16.0"}, - {:membrane_flv_plugin, "~> 0.3.1"}, - {:membrane_file_plugin, "~> 0.12.0"}, + {:membrane_core, "~> 0.11.2"}, + {:unifex, "~> 1.1.0"}, + {:membrane_h264_ffmpeg_plugin, "~> 0.25.1"}, + {:membrane_aac_plugin, "~> 0.13.0"}, + {:membrane_mp4_plugin, "~> 0.18.0"}, + {:membrane_flv_plugin, "~> 0.4.0"}, + {:membrane_file_plugin, "~> 0.13.2"}, # testing - {:membrane_hackney_plugin, "~> 0.8.2", only: :test}, + {:membrane_hackney_plugin, "~> 0.9.0", only: :test}, {:ffmpex, "~> 0.10.0", only: :test}, - {:membrane_stream_plugin, "~> 0.1.0", only: :test}, + {:membrane_stream_plugin, "~> 0.2.0", only: :test}, # development {:ex_doc, "~> 0.28", only: :dev, runtime: false}, {:dialyxir, "~> 1.1", only: :dev, runtime: false}, diff --git a/mix.lock b/mix.lock index 83c15ae0..cd29ad56 100644 --- a/mix.lock +++ b/mix.lock @@ -1,8 +1,8 @@ %{ "bimap": {:hex, :bimap, "1.3.0", "3ea4832e58dc83a9b5b407c6731e7bae87458aa618e6d11d8e12114a17afa4b3", [:mix], [], "hexpm", "bf5a2b078528465aa705f405a5c638becd63e41d280ada41e0f77e6d255a10b4"}, - "bunch": {:hex, :bunch, "1.3.2", "9a3647e8bf8859482206c554d907b13d60aa8e40a3b82057a34bf71c0e23a0ae", [:mix], [], "hexpm", "dd89f2df6e6284c06cf9c44be5aae622f2a5a0804098167a0cb5370542c8c22b"}, + "bunch": {:hex, :bunch, "1.6.0", "4775f8cdf5e801c06beed3913b0bd53fceec9d63380cdcccbda6be125a6cfd54", [:mix], [], "hexpm", "ef4e9abf83f0299d599daed3764d19e8eac5d27a5237e5e4d5e2c129cfeb9a22"}, "bunch_native": {:hex, :bunch_native, "0.5.0", "8ac1536789a597599c10b652e0b526d8833348c19e4739a0759a2bedfd924e63", [:mix], [{:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "24190c760e32b23b36edeb2dc4852515c7c5b3b8675b1a864e0715bdd1c8f80d"}, - "bundlex": {:hex, :bundlex, "1.0.0", "358c26a6c027359c6935dcd0716b68736b7604599d376c4e1ac17c6618ef6771", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:secure_random, "~> 0.5", [hex: :secure_random, repo: "hexpm", optional: false]}], "hexpm", "09b4a0c597a31e6e7ce8e103b94f19539bb2279f5966a3d6d46dcd32a5b6fd44"}, + "bundlex": {:hex, :bundlex, "1.1.0", "16fff7cfd8a61394c4dc2b22fe6c478ba125f6e5798e3e81ef0bc07ff237bde4", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:secure_random, "~> 0.5", [hex: :secure_random, repo: "hexpm", optional: false]}], "hexpm", "9da48a102a924baf8b6161c7c1767b5d68eeba18dd9d37a241b8b2ddf87d8a8f"}, "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, "certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"}, "coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"}, @@ -10,7 +10,7 @@ "credo": {:hex, :credo, "1.6.7", "323f5734350fd23a456f2688b9430e7d517afb313fbd38671b8a4449798a7854", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "41e110bfb007f7eda7f897c10bf019ceab9a0b269ce79f015d54b0dcf4fc7dd3"}, "dialyxir": {:hex, :dialyxir, "1.2.0", "58344b3e87c2e7095304c81a9ae65cb68b613e28340690dfe1a5597fd08dec37", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "61072136427a851674cab81762be4dbeae7679f85b1272b6d25c3a839aff8463"}, "earmark_parser": {:hex, :earmark_parser, "1.4.28", "0bf6546eb7cd6185ae086cbc5d20cd6dbb4b428aad14c02c49f7b554484b4586", [:mix], [], "hexpm", "501cef12286a3231dc80c81352a9453decf9586977f917a96e619293132743fb"}, - "elixir_make": {:hex, :elixir_make, "0.6.3", "bc07d53221216838d79e03a8019d0839786703129599e9619f4ab74c8c096eac", [:mix], [], "hexpm", "f5cbd651c5678bcaabdbb7857658ee106b12509cd976c2c2fca99688e1daf716"}, + "elixir_make": {:hex, :elixir_make, "0.7.2", "e83548b0500e654d1a595f1134af4862a2e92ec3282ec4c2a17641e9aa45ee73", [:mix], [{:castore, "~> 0.1", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "05fb44abf9582381c2eb1b73d485a55288c581071de0ee3ee1084ee69d6a8e5f"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.28.6", "2bbd7a143d3014fc26de9056793e97600ae8978af2ced82c2575f130b7c0d7d7", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bca1441614654710ba37a0e173079273d619f9160cbcc8cd04e6bd59f1ad0e29"}, "ffmpex": {:hex, :ffmpex, "0.10.0", "ce29281eac60bf109c05acb4342eecf813a3cd3f08c1bce350423caad86128af", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}, {:rambo, "~> 0.3.0", [hex: :rambo, repo: "hexpm", optional: false]}], "hexpm", "de8d81f8c51cc258dcee9a3e0b1568b0659c97be004557d9af47795206cff53b"}, @@ -22,20 +22,20 @@ "makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, "membrane_aac_format": {:hex, :membrane_aac_format, "0.7.0", "93ea75b57d5cbee5db59f0250baba65a8949d32ee4cfa38224e93ea306c30048", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "3233d2a5a1b264039a198abcef3b7f1c64f5bde9704ec01afbf1bab43a56d415"}, - "membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.12.2", "f688b8001e010377f33d4fd9ca9983f77a31b193f742c116923b51ffc07f1c0c", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:crc, "~> 0.10.2", [hex: :crc, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.7.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "cde8f7c5afdca51b25fa656778f57631074a4195e27ee1940ebf336e3740bede"}, + "membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.13.0", "ea3bd7105abca13897be6da5ef325e9dfa55e6b83b83cee7b71d5703f108344f", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:crc, "~> 0.10.2", [hex: :crc, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.7.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.11.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "91bc75536319b1a5c962bb5c211811898553c10a2210da905de035227e9f279f"}, "membrane_cmaf_format": {:hex, :membrane_cmaf_format, "0.6.1", "89d130b5f8786d4285d395697b0f7763a2c82a02de1658cdeb4f8e37e6a6c85c", [:mix], [], "hexpm", "e916e3c8216f3bf999b069ffda94da48c9bdbe3181ce7155a458d1ccf1a97b3d"}, - "membrane_common_c": {:hex, :membrane_common_c, "0.13.0", "c314623f93209eb2fa092379954c686f6e50ac89baa48360f836d24f4d53f5ee", [:mix], [{:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "90181fbbe481ccd0a4a76daf0300f8ad1b5b0bf0ebd8b42c133904f8839663ca"}, - "membrane_core": {:hex, :membrane_core, "0.10.2", "d2d17039f6df746e4a3c47da32f51867fbafe528272cdd9b226d16b1032bc337", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 2.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6a4f290f919ada66c772807d64d5830be2962b7c13a2f2bc9ace416a1cd19ee1"}, - "membrane_file_plugin": {:hex, :membrane_file_plugin, "0.12.0", "eb940e7a2f2abf30e048bd0b7c2bef9c17c18aa58875b9a833c0bc7e7b1fd709", [:mix], [{:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "281b9bf9467beead3f973adce55b9844bc4206bb3f3f60f0db8320a4af4fc5ca"}, - "membrane_flv_plugin": {:hex, :membrane_flv_plugin, "0.3.1", "3d831ca93b4c28775f532148ed1b1f6f8bf41b37f2fd137ab3940b61dd53112d", [:mix], [{:bimap, "~> 1.2", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.7.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.3.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.7.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}], "hexpm", "5918f4a1bef78c8a3bc7ecd899de5a7945eacb1e6da06cc7f9e83135b3b8347d"}, - "membrane_h264_ffmpeg_plugin": {:hex, :membrane_h264_ffmpeg_plugin, "0.22.0", "978ece9ad7c7cc211f73f9fa9d286f9ab3ebb64a05e09671c7c118a0d25cc9bc", [:mix], [{:bunch, "~> 1.3.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.13.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.3.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.2.0", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}, {:ratio, "~> 2.4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "aac73ba384a3e959c7957d1cac5dfb82f074fc16e5b64924238327decc3b2b2d"}, - "membrane_h264_format": {:hex, :membrane_h264_format, "0.3.0", "84426aac86c3f4d3e8110438c3514ad94aa528e7002650d40e3b3862e2af5e3e", [:mix], [], "hexpm", "8254e52cea3c5d7c078c960a32f1ba338eeae9e301515302fd293f1683fa8dd9"}, - "membrane_hackney_plugin": {:hex, :membrane_hackney_plugin, "0.8.2", "6b83628cc2019aa0b143c09e77f2dd9199a05528599d93c289dcab2e947369fa", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3", [hex: :mockery, repo: "hexpm", optional: false]}], "hexpm", "42906166b3692ba2270deb61721225ca7edadd1dbde6a44435664234a93597e2"}, + "membrane_common_c": {:hex, :membrane_common_c, "0.14.0", "35621d9736829bf675062dc0af66e931b0d82ff8361c2088576d63d4d002692e", [:mix], [{:membrane_core, "~> 0.11.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "262b06e93fe3f1be57a111fa19f5cba4f26000a442ac32a59f3678e83cbb001f"}, + "membrane_core": {:hex, :membrane_core, "0.11.2", "c8a257bea90c53e0fe99453630a07e4711e4d8ba25e647b3ba346b994aa4f7ab", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 2.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7e2566c9b6d1c22fbb832c22e5f9dbbf7c6cba1c72eeea53bd2f2b73efed58b3"}, + "membrane_file_plugin": {:hex, :membrane_file_plugin, "0.13.2", "220aa341b8d707b65c93cf8d42700a4c67340717ee4fbce3f8fc6503d182b0f8", [:mix], [{:membrane_core, "~> 0.11", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "d04159943f4f07fa45156487d9e7c19b9975678c1cda7957a6b0cca59442fae3"}, + "membrane_flv_plugin": {:hex, :membrane_flv_plugin, "0.4.0", "4e86e80e5bb22071f35b4a46fd229ea2edf3df751ac49767620ff23115a49fb8", [:mix], [{:bimap, "~> 1.2", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.7.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.11.2", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.4.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.7.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}], "hexpm", "95c7eabc1a4fdbfce23f82f66b2427064a4c2c2c1c3b03a41e70a8a7b218d73f"}, + "membrane_h264_ffmpeg_plugin": {:hex, :membrane_h264_ffmpeg_plugin, "0.25.1", "262ed6aef931826bef5c98cd1593893fbd9d488b9fdeb5898cad46026bcf7f12", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.14.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.11.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.4.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.2.0", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}, {:ratio, "~> 2.4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "f2ade09f84b97d5c9780dcbda74354f82f4d183582c52259619f274f23df388b"}, + "membrane_h264_format": {:hex, :membrane_h264_format, "0.4.0", "7103376864f0c0ef525ab31fc16684fd1aa8453f47fb060f1940125d8956dd5d", [:mix], [], "hexpm", "0a46ffa7a3f0da4da65134c7ef0feb64eb9eea3e55c2f0cf261969ab40f7bdd5"}, + "membrane_hackney_plugin": {:hex, :membrane_hackney_plugin, "0.9.0", "9736e48c3213295f5060e755133465535578a51fe5f01e073696dbe53903478e", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.11.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3", [hex: :mockery, repo: "hexpm", optional: false]}], "hexpm", "719971528cf23167b07a344eadc2b97ecdc648c2ea5937f87d98aa3c7e9fa3cf"}, "membrane_mp4_format": {:hex, :membrane_mp4_format, "0.7.0", "0cc33f21dc571b43b4d2db66a056e2b7eecdc7ada71a9e0e923ab7a1554f15f2", [:mix], [], "hexpm", "7653a20e7b0c048ea05ffad6df88249abf4c1b63772c14dee843acffcad6b038"}, - "membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.16.2", "e893c0c107b0ec9f00918e8c8927d0cebb15f6cb6f91d08bef5c9a7c333f701d", [:mix], [{:membrane_aac_format, "~> 0.7.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.6.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.12.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.3", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.7.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}], "hexpm", "3c3726e551eb14f8368d93849b79a765d7a08c66d29c33750cbc3e1696ea7b90"}, + "membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.18.0", "454ea89ec89f3606b0f9815ce20d5fb492cc31d48452711e95b522ec17bab408", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.7.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.6.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.11.2", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.13.2", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.4.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.7.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}], "hexpm", "2f8a840f4c12c07e054f5e21eb6de3a9643e442f7d94e625a2700cc4360d9552"}, "membrane_opus_format": {:hex, :membrane_opus_format, "0.3.0", "3804d9916058b7cfa2baa0131a644d8186198d64f52d592ae09e0942513cb4c2", [:mix], [], "hexpm", "8fc89c97be50de23ded15f2050fe603dcce732566fe6fdd15a2de01cb6b81afe"}, "membrane_raw_video_format": {:hex, :membrane_raw_video_format, "0.2.0", "cda8eb207cf65c93690a19001aba3edbb2ba5d22abc8068a1f6a785ba871e8cf", [:mix], [], "hexpm", "6b716fc24f60834323637c95aaaa0f99be23fcc6a84a21af70195ef50185b634"}, - "membrane_stream_plugin": {:hex, :membrane_stream_plugin, "0.1.0", "0a21e36c4523fae2c514d2661e49a0630f548dcdfe5d18bbf07a8c15e6d1b036", [:mix], [{:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "303658ac9e7915b77ed0a127df2813bdc3d57a5d287bbc0328506c21021138c4"}, + "membrane_stream_plugin": {:hex, :membrane_stream_plugin, "0.2.0", "002323fbb69efd1d82bc467023fdf08e9fca9136569e761c9195e85257d8ce22", [:mix], [{:membrane_core, "~> 0.11.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "68a62fa4fd9a4519ba621e197ffaa5838e293dfd64398e78dfdfdc927dae432a"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, "mockery": {:hex, :mockery, "2.3.1", "a02fd60b10ac9ed37a7a2ecf6786c1f1dd5c75d2b079a60594b089fba32dc087", [:mix], [], "hexpm", "1d0971d88ebf084e962da3f2cfee16f0ea8e04ff73a7710428500d4500b947fa"}, @@ -50,5 +50,5 @@ "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, "telemetry": {:hex, :telemetry, "1.1.0", "a589817034a27eab11144ad24d5c0f9fab1f58173274b1e9bae7074af9cbee51", [:rebar3], [], "hexpm", "b727b2a1f75614774cff2d7565b64d0dfa5bd52ba517f16543e6fc7efcc0df48"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, - "unifex": {:hex, :unifex, "1.0.1", "c8ebda892901e2fdb262a0ff95f4157fd5575709965b88a6415e43b527adcc0c", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}], "hexpm", "4b5217c06f5f8f82215d2edf077e40f7d4d0ee30c456b19f2eb99bfb484c558d"}, + "unifex": {:hex, :unifex, "1.1.0", "26b1bcb6c3b3454e1ea15f85b2e570aaa5b5c609566aa9f5c2e0a8b213379d6b", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}], "hexpm", "d8f47e9e3240301f5b20eec5792d1d4341e1a3a268d94f7204703b48da4aaa06"}, } diff --git a/test/fixtures/audio.msr b/test/fixtures/audio.msr index 71ec0a87..c89f3998 100644 Binary files a/test/fixtures/audio.msr and b/test/fixtures/audio.msr differ diff --git a/test/fixtures/video.msr b/test/fixtures/video.msr index 8c049596..f57a95fe 100644 Binary files a/test/fixtures/video.msr and b/test/fixtures/video.msr differ diff --git a/test/membrane_rtmp_plugin/rtmp_sink_test.exs b/test/membrane_rtmp_plugin/rtmp_sink_test.exs index 4072453a..f72cb698 100644 --- a/test/membrane_rtmp_plugin/rtmp_sink_test.exs +++ b/test/membrane_rtmp_plugin/rtmp_sink_test.exs @@ -1,5 +1,5 @@ defmodule Membrane.RTMP.SinkTest do - use ExUnit.Case + use ExUnit.Case, async: true import Membrane.Testing.Assertions require Logger @@ -22,11 +22,11 @@ defmodule Membrane.RTMP.SinkTest do output_file = Path.join(tmp_dir, "rtmp_sink_interleave_test.flv") rtmp_server = Task.async(fn -> start_rtmp_server(output_file) end) - {:ok, sink_pipeline_pid} = start_interleaving_sink_pipeline(@rtmp_server_url) + sink_pipeline_pid = start_interleaving_sink_pipeline(@rtmp_server_url) # There's an RC - there's no way to ensure RTMP server starts to listen before pipeline is started # so it may retry a few times before succeeding - assert_pipeline_playback_changed(sink_pipeline_pid, :prepared, :playing, 5000) + assert_pipeline_play(sink_pipeline_pid, 5000) assert_start_of_stream(sink_pipeline_pid, :rtmp_sink, :video, 5_000) assert_start_of_stream(sink_pipeline_pid, :rtmp_sink, :audio, 5_000) @@ -34,7 +34,7 @@ defmodule Membrane.RTMP.SinkTest do assert_end_of_stream(sink_pipeline_pid, :rtmp_sink, :video, 5_000) assert_end_of_stream(sink_pipeline_pid, :rtmp_sink, :audio, 5_000) - Membrane.Testing.Pipeline.terminate(sink_pipeline_pid, blocking?: true) + :ok = Pipeline.terminate(sink_pipeline_pid, blocking?: true) # RTMP server should terminate when the connection is closed assert :ok = Task.await(rtmp_server) @@ -47,11 +47,11 @@ defmodule Membrane.RTMP.SinkTest do } do rtmp_server = Task.async(fn -> start_rtmp_server(flv_output_file) end) - {:ok, sink_pipeline_pid} = start_sink_pipeline(@rtmp_server_url) + sink_pipeline_pid = start_sink_pipeline(@rtmp_server_url) # There's an RC - there's no way to ensure RTMP server starts to listen before pipeline is started # so it may retry a few times before succeeding - assert_pipeline_playback_changed(sink_pipeline_pid, :prepared, :playing, 5000) + assert_pipeline_play(sink_pipeline_pid, 5000) assert_start_of_stream(sink_pipeline_pid, :rtmp_sink, :video, 5_000) assert_start_of_stream(sink_pipeline_pid, :rtmp_sink, :audio, 5_000) @@ -59,7 +59,7 @@ defmodule Membrane.RTMP.SinkTest do assert_end_of_stream(sink_pipeline_pid, :rtmp_sink, :video, 5_000) assert_end_of_stream(sink_pipeline_pid, :rtmp_sink, :audio, 5_000) - Membrane.Testing.Pipeline.terminate(sink_pipeline_pid, blocking?: true) + :ok = Pipeline.terminate(sink_pipeline_pid, blocking?: true) # RTMP server should terminate when the connection is closed assert :ok = Task.await(rtmp_server) @@ -68,75 +68,73 @@ defmodule Membrane.RTMP.SinkTest do end defp start_interleaving_sink_pipeline(rtmp_url) do - import Membrane.ParentSpec + import Membrane.ChildrenSpec options = [ - children: [ - rtmp_sink: %Membrane.RTMP.Sink{rtmp_url: rtmp_url, max_attempts: 5} - ], - links: [ - link(:video_source, %Membrane.File.Source{location: "test/fixtures/video.msr"}) - |> to(:video_deserializer, Membrane.Stream.Deserializer) - |> to(:video_parser, %Membrane.H264.FFmpeg.Parser{ + structure: [ + child(:rtmp_sink, %Membrane.RTMP.Sink{rtmp_url: rtmp_url, max_attempts: 5}), + # + child(:video_source, %Membrane.File.Source{location: "test/fixtures/video.msr"}) + |> child(:video_deserializer, Membrane.Stream.Deserializer) + |> child(:video_parser, %Membrane.H264.FFmpeg.Parser{ alignment: :au, attach_nalus?: true, skip_until_parameters?: true }) - |> to(:video_payloader, Membrane.MP4.Payloader.H264) + |> child(:video_payloader, Membrane.MP4.Payloader.H264) |> via_in(:video) - |> to(:rtmp_sink), - link(:audio_source, %Membrane.File.Source{location: "test/fixtures/audio.msr"}) - |> to(:audio_deserializer, Membrane.Stream.Deserializer) - |> to(:audio_parser, Membrane.AAC.Parser) + |> get_child(:rtmp_sink), + # + child(:audio_source, %Membrane.File.Source{location: "test/fixtures/audio.msr"}) + |> child(:audio_deserializer, Membrane.Stream.Deserializer) + |> child(:audio_parser, Membrane.AAC.Parser) |> via_in(:audio) - |> to(:rtmp_sink) + |> get_child(:rtmp_sink) ], test_process: self() ] - Pipeline.start_link(options) + Pipeline.start_link_supervised!(options) end defp start_sink_pipeline(rtmp_url) do - import Membrane.ParentSpec + import Membrane.ChildrenSpec options = [ - children: [ - video_source: %Membrane.Hackney.Source{ + structure: [ + child(:rtmp_sink, %Membrane.RTMP.Sink{rtmp_url: rtmp_url, max_attempts: 5}), + # + child(:video_source, %Membrane.Hackney.Source{ location: @input_video_url, hackney_opts: [follow_redirect: true] - }, - audio_source: %Membrane.Hackney.Source{ - location: @input_audio_url, - hackney_opts: [follow_redirect: true] - }, - video_parser: %Membrane.H264.FFmpeg.Parser{ + }) + |> child(:video_parser, %Membrane.H264.FFmpeg.Parser{ framerate: {30, 1}, alignment: :au, attach_nalus?: true, skip_until_parameters?: false - }, - audio_parser: %Membrane.AAC.Parser{ - out_encapsulation: :none - }, - video_payloader: Membrane.MP4.Payloader.H264, - rtmp_sink: %Membrane.RTMP.Sink{rtmp_url: rtmp_url, max_attempts: 5} - ], - links: [ - link(:video_source) - |> to(:video_parser) - |> to(:video_payloader) + }) + |> child(:video_payloader, Membrane.MP4.Payloader.H264) |> via_in(:video) - |> to(:rtmp_sink), - link(:audio_source) |> to(:audio_parser) |> via_in(:audio) |> to(:rtmp_sink) + |> get_child(:rtmp_sink), + # + child(:audio_source, %Membrane.Hackney.Source{ + location: @input_audio_url, + hackney_opts: [follow_redirect: true] + }) + |> child(:audio_parser, %Membrane.AAC.Parser{ + out_encapsulation: :none + }) + |> via_in(:audio) + |> get_child(:rtmp_sink) ], test_process: self() ] - Pipeline.start_link(options) + Pipeline.start_link_supervised!(options) end - @spec start_rtmp_server(Path.t()) :: {:ok, pid()} + @spec start_rtmp_server(Path.t()) :: :ok | :error def start_rtmp_server(out_file) do import FFmpex use FFmpex.Options @@ -158,8 +156,15 @@ defmodule Membrane.RTMP.SinkTest do {:ok, _stdout} -> :ok - error -> - Logger.error(inspect(error)) + {:error, {error, exit_code}} -> + Logger.error( + """ + FFmpeg exited with a non-zero exit code (#{exit_code}) and the error message: + #{error} + """ + |> String.trim() + ) + :error end end diff --git a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs index ee9884ae..55226a85 100644 --- a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs +++ b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs @@ -26,7 +26,7 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do pipeline = await_pipeline_started() - assert_pipeline_playback_changed(pipeline, :prepared, :playing) + assert_pipeline_play(pipeline) assert_buffers(%{ pipeline: pipeline, @@ -59,7 +59,7 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do end) pipeline = await_pipeline_started() - assert_pipeline_playback_changed(pipeline, :prepared, :playing) + assert_pipeline_play(pipeline) assert_pipeline_notified( pipeline, @@ -84,7 +84,7 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do end) pipeline = await_pipeline_started() - assert_pipeline_playback_changed(pipeline, :prepared, :playing) + assert_pipeline_play(pipeline) assert_pipeline_notified( pipeline, @@ -111,11 +111,12 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do socket_handler: fn socket -> options = [ module: Membrane.RTMP.Source.TestPipeline, - custom_args: [socket: socket, test_process: test_process, verifier: verifier], + custom_args: %{socket: socket, test_process: test_process, verifier: verifier}, test_process: test_process ] - Testing.Pipeline.start_link(options) + {:ok, _supervisor_pid, pipeline_pid} = Testing.Pipeline.start_link(options) + {:ok, pipeline_pid} end, parent: test_process } @@ -152,8 +153,15 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do {:ok, ""} -> :ok - error -> - Logger.error(inspect(error)) + {:error, {error, exit_code}} -> + Logger.error( + """ + FFmpeg exited with a non-zero exit code (#{exit_code}) and the error message: + #{error} + """ + |> String.trim() + ) + :error end end diff --git a/test/membrane_rtmp_plugin/tcp_server_test.exs b/test/membrane_rtmp_plugin/tcp_server_test.exs index bbb01563..512663c3 100644 --- a/test/membrane_rtmp_plugin/tcp_server_test.exs +++ b/test/membrane_rtmp_plugin/tcp_server_test.exs @@ -1,5 +1,5 @@ defmodule Membrane.RTMP.Source.TcpServerTest do - use ExUnit.Case + use ExUnit.Case, async: true alias Membrane.RTMP.Source.TcpServer @@ -33,7 +33,7 @@ defmodule Membrane.RTMP.Source.TcpServerTest do end } - TcpServer.start_link(server_options) + {:ok, _pid} = TcpServer.start_link(server_options) Process.sleep(500) diff --git a/test/support/rtmp_source_test_pipeline.ex b/test/support/rtmp_source_test_pipeline.ex index d26d7bf8..eb4b1171 100644 --- a/test/support/rtmp_source_test_pipeline.ex +++ b/test/support/rtmp_source_test_pipeline.ex @@ -2,35 +2,29 @@ defmodule Membrane.RTMP.Source.TestPipeline do @moduledoc false use Membrane.Pipeline - import Membrane.ParentSpec - alias Membrane.RTMP.SourceBin alias Membrane.Testing @impl true - def handle_init(socket: socket, test_process: test_process, verifier: verifier) do - spec = %Membrane.ParentSpec{ - children: [ - src: %SourceBin{ - socket: socket, - validator: verifier - }, - audio_sink: Testing.Sink, - video_sink: Testing.Sink - ], - links: [ - link(:src) |> via_out(:audio) |> to(:audio_sink), - link(:src) |> via_out(:video) |> to(:video_sink) - ] - } + def handle_init(_ctx, %{socket: socket, test_process: test_process, verifier: verifier}) do + structure = [ + child(:src, %SourceBin{ + socket: socket, + validator: verifier + }), + child(:audio_sink, Testing.Sink), + child(:video_sink, Testing.Sink), + get_child(:src) |> via_out(:audio) |> get_child(:audio_sink), + get_child(:src) |> via_out(:video) |> get_child(:video_sink) + ] send(test_process, {:pipeline_started, self()}) - {{:ok, [spec: spec, playback: :playing]}, %{socket: socket}} + {[spec: structure, playback: :playing], %{socket: socket}} end @impl true - def handle_notification( + def handle_child_notification( {:socket_control_needed, _socket, _source} = notification, :src, _ctx, @@ -38,15 +32,15 @@ defmodule Membrane.RTMP.Source.TestPipeline do ) do send(self(), notification) - {:ok, state} + {[], state} end - def handle_notification(_notification, _child, _ctx, state) do - {:ok, state} + def handle_child_notification(_notification, _child, _ctx, state) do + {[], state} end @impl true - def handle_other({:socket_control_needed, socket, source} = notification, _ctx, state) do + def handle_info({:socket_control_needed, socket, source} = notification, _ctx, state) do case SourceBin.pass_control(socket, source) do :ok -> :ok @@ -55,6 +49,6 @@ defmodule Membrane.RTMP.Source.TestPipeline do Process.send_after(self(), notification, 200) end - {:ok, state} + {[], state} end end