diff --git a/CHANGELOG.md b/CHANGELOG.md index dfa42737a..420c9e0f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ * Add `:pause_auto_demand` and `:resume_auto_demand` actions. [#586](https://github.com/membraneframework/membrane_core/pull/586) * Send `:end_of_stream`, even if it is not preceded by `:start_of_stream`. [#557](https://github.com/membraneframework/membrane_core/pull/577) * Fix process leak in starting clocks. [#594](https://github.com/membraneframework/membrane_core/pull/594) + * Add child exit reason to the supervisor exit reason. [#595](https://github.com/membraneframework/membrane_core/pull/595) ## 0.11.0 * Separate element_name and pad arguments in handle_element_{start, end}_of_stream signature [#219](https://github.com/membraneframework/membrane_core/issues/219) diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index b1ddb7389..794c5ea64 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -681,7 +681,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do Terminating. """) - {:error, {:membrane_child_crash, child_name}} + {:error, {:membrane_child_crash, child_name, reason}} end end diff --git a/lib/membrane/testing/assertions.ex b/lib/membrane/testing/assertions.ex index 30f5f14f3..496f2bf0f 100644 --- a/lib/membrane/testing/assertions.ex +++ b/lib/membrane/testing/assertions.ex @@ -405,6 +405,40 @@ defmodule Membrane.Testing.Assertions do end end + @doc """ + Asserts that `Membrane.Testing.Sink` with name `sink_name` entered the playing + playback. + """ + defmacro assert_sink_playing(pipeline, sink_name, timeout \\ @default_timeout) do + do_sink_playing(&assert_receive_from_pipeline/3, pipeline, sink_name, timeout) + end + + @doc """ + Asserts that `Membrane.Testing.Sink` with name `sink_name` didn't enter the playing + playback. + """ + defmacro refute_sink_playing(pipeline, sink_name, timeout \\ @default_timeout) do + do_sink_playing(&refute_receive_from_pipeline/3, pipeline, sink_name, timeout) + end + + defp do_sink_playing(assertion, pipeline, sink_name, timeout) do + quote do + element_name_value = unquote(sink_name) + + unquote( + assertion.( + pipeline, + {:handle_child_notification, + {:playing, + quote do + ^element_name_value + end}}, + timeout + ) + ) + end + end + @doc """ Asserts that `Membrane.Testing.Pipeline` received or is going to receive start_of_stream notification from the element with a name `element_name` within the `timeout` period diff --git a/lib/membrane/testing/sink.ex b/lib/membrane/testing/sink.ex index 9b5923b24..5b036fdda 100644 --- a/lib/membrane/testing/sink.ex +++ b/lib/membrane/testing/sink.ex @@ -52,13 +52,16 @@ defmodule Membrane.Testing.Sink do end @impl true - def handle_playing(_context, %{autodemand: true} = state), - do: {[demand: :input], state} + def handle_playing(_ctx, state) do + actions = + notify(:playing) ++ + if(state.autodemand, do: [demand: :input], else: []) - def handle_playing(_context, state), do: {[], state} + {actions, state} + end @impl true - def handle_event(:input, event, _context, state) do + def handle_event(:input, event, _ctx, state) do {notify({:event, event}), state} end diff --git a/test/membrane/integration/auto_demands_test.exs b/test/membrane/integration/auto_demands_test.exs index 91cd114f0..18bdd7b71 100644 --- a/test/membrane/integration/auto_demands_test.exs +++ b/test/membrane/integration/auto_demands_test.exs @@ -96,6 +96,8 @@ defmodule Membrane.Integration.AutoDemandsTest do ] ) + assert_sink_playing(pipeline, :right_sink) + Pipeline.message_child(pipeline, :right_sink, {:make_demand, 1000}) Enum.each(1..1000, fn payload -> @@ -191,7 +193,7 @@ defmodule Membrane.Integration.AutoDemandsTest do @impl true def handle_playing(_ctx, state) do - {[stream_format: {:output, %StreamFormat{}}], state} + {[stream_format: {:output, %StreamFormat{}}, notify_parent: :playing], state} end end @@ -204,6 +206,8 @@ defmodule Membrane.Integration.AutoDemandsTest do |> child(:sink, Sink) ) + assert_pipeline_notified(pipeline, :source, :playing) + buffers = Enum.map(1..10, &%Membrane.Buffer{payload: &1}) Pipeline.message_child(pipeline, :source, buffer: {:output, buffers}) @@ -232,9 +236,14 @@ defmodule Membrane.Integration.AutoDemandsTest do Process.monitor(pipeline) + assert_pipeline_notified(pipeline, :source, :playing) + buffers = Enum.map(1..100_000, &%Membrane.Buffer{payload: &1}) Pipeline.message_child(pipeline, :source, buffer: {:output, buffers}) - assert_receive({:DOWN, _ref, :process, ^pipeline, {:membrane_child_crash, :sink}}) + + assert_receive( + {:DOWN, _ref, :process, ^pipeline, {:membrane_child_crash, :sink, _sink_reason}} + ) end defp reduce_link(link, enum, fun) do diff --git a/test/membrane/integration/child_pad_removed_test.exs b/test/membrane/integration/child_pad_removed_test.exs index 6ef6d87f2..a558e6a47 100644 --- a/test/membrane/integration/child_pad_removed_test.exs +++ b/test/membrane/integration/child_pad_removed_test.exs @@ -40,9 +40,12 @@ defmodule Membrane.Integration.ChildPadRemovedTest do def_options test_process: [spec: pid()] @impl true - def handle_init(ctx, opts) do - send(opts.test_process, {:init, ctx.name}) - {[], Map.from_struct(opts)} + def handle_init(_ctx, opts), do: {[], Map.from_struct(opts)} + + @impl true + def handle_playing(ctx, state) do + send(state.test_process, {:playing, ctx.name}) + {[], state} end end @@ -161,7 +164,7 @@ defmodule Membrane.Integration.ChildPadRemovedTest do ] do pipeline = start_pipeline!(DynamicBin, StaticSink) - assert_receive {:init, :sink} + assert_receive {:playing, :sink} sink_pid = Testing.Pipeline.get_child_pid!(pipeline, :sink) monitor_ref = Process.monitor(sink_pid) @@ -183,7 +186,7 @@ defmodule Membrane.Integration.ChildPadRemovedTest do ] do pipeline = start_link_pipeline!(DynamicBin, StaticSink, remove_children: :sink) - assert_receive {:init, :sink} + assert_receive {:playing, :sink} sink_pid = Testing.Pipeline.get_child_pid!(pipeline, :sink) monitor_ref = Process.monitor(sink_pid) diff --git a/test/membrane/integration/defer_setup_test.exs b/test/membrane/integration/defer_setup_test.exs index 75bb82cc6..f058cb013 100644 --- a/test/membrane/integration/defer_setup_test.exs +++ b/test/membrane/integration/defer_setup_test.exs @@ -132,7 +132,9 @@ defmodule Membrane.Integration.DeferSetupTest do monitor_ref = Process.monitor(pipeline) complete_child_setup(pipeline, :bin_2) - assert_receive {:DOWN, ^monitor_ref, :process, ^pipeline, {:membrane_child_crash, :bin_2}} + + assert_receive {:DOWN, ^monitor_ref, :process, ^pipeline, + {:membrane_child_crash, :bin_2, _bin_exit_reason}} end defp complete_child_setup(pipeline, child) do diff --git a/test/membrane/integration/demands_test.exs b/test/membrane/integration/demands_test.exs index 8b42d4279..ea9880c56 100644 --- a/test/membrane/integration/demands_test.exs +++ b/test/membrane/integration/demands_test.exs @@ -20,6 +20,8 @@ defmodule Membrane.Integration.DemandsTest do end defp test_pipeline(pid) do + assert_sink_playing(pid, :sink) + demand = 500 Pipeline.message_child(pid, :sink, {:make_demand, demand}) @@ -172,6 +174,11 @@ defmodule Membrane.Integration.DemandsTest do @impl true def handle_init(_ctx, _opts), do: {[], %{counter: 0}} + @impl true + def handle_playing(_ctx, state) do + {[notify_parent: :playing], state} + end + @impl true def handle_buffer(:input, _buffer, _ctx, state) do {[], Map.update!(state, :counter, &(&1 + 1))} @@ -189,44 +196,6 @@ defmodule Membrane.Integration.DemandsTest do end end - test "actions :pause_auto_demand and :resume_auto_demand" do - pipeline = - Testing.Pipeline.start_link_supervised!( - spec: - child(RedemandingSource) - |> via_in(:input, auto_demand_size: 10) - |> child(:sink, PausingSink) - ) - - # time for pipeline to start playing - Process.sleep(500) - - for _i <- 1..10 do - # during sleep below source should send around 100 buffers - Process.sleep(100 * RedemandingSource.sleep_time()) - - Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :pause_auto_demand}) - - assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no}) - # sink should receive around 100 buffers, but the boundary is set to 70, in case of eg. - # slowdown of the source when running all tests in the project asynchronously - assert buff_no > 70 - - # during sleep below source should send up to about auto_demand_size = 10 buffers - Process.sleep(100 * RedemandingSource.sleep_time()) - - Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :resume_auto_demand}) - - assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no}) - # sink should probably receive between 5 and 15 buffers, but the boundary is set to 25, - # to handle the case when eg. there is a delay in receiving the notification from the - # pipeline by the :sink - assert buff_no < 25 - end - - Testing.Pipeline.terminate(pipeline) - end - defmodule Funnel do use Membrane.Filter @@ -293,4 +262,51 @@ defmodule Membrane.Integration.DemandsTest do Testing.Pipeline.terminate(pipeline) end + + defmodule Sync do + use Bunch + use ExUnit.Case, async: false + + alias Membrane.Integration.DemandsTest.{PausingSink, RedemandingSource} + + test "actions :pause_auto_demand and :resume_auto_demand" do + pipeline = + Testing.Pipeline.start_link_supervised!( + spec: + child(RedemandingSource) + |> via_in(:input, auto_demand_size: 10) + |> child(:sink, PausingSink) + ) + + assert_sink_playing(pipeline, :sink) + + # time for pipeline to start playing + Process.sleep(1000) + + for i <- 1..10 do + # during sleep below source should send around 100 buffers + Process.sleep(100 * RedemandingSource.sleep_time()) + + Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :pause_auto_demand}) + + assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no}) + # sink should receive around 100 buffers, but the boundary is set to 65, in case of eg. + # slowdown of the source when running all tests in the project asynchronously + if i != 1, do: assert(buff_no > 65) + + # during sleep below source should send up to about auto_demand_size = 10 buffers + Process.sleep(100 * RedemandingSource.sleep_time()) + + Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :resume_auto_demand}) + + assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no}) + # sink should probably receive between 5 and 15 buffers, but the boundary is set to 25, + # to handle the case when eg. there is a delay in receiving the notification from the + # pipeline by the :sink + assert buff_no < 25 + end + + Testing.Pipeline.terminate(pipeline) + end + end end diff --git a/test/membrane/integration/links_validation_test.exs b/test/membrane/integration/links_validation_test.exs index ff301afdc..9f69e2f38 100644 --- a/test/membrane/integration/links_validation_test.exs +++ b/test/membrane/integration/links_validation_test.exs @@ -126,7 +126,8 @@ defmodule Membrane.LinksValidationTest do Pipeline.execute_actions(pipeline, spec: spec) - assert_receive({:DOWN, ^ref, :process, ^pipeline, {%Membrane.LinkError{}, _stacktrace}}) + assert_receive {:DOWN, ^ref, :process, ^pipeline, {%Membrane.LinkError{}, _stacktrace}}, + 1000 end end end diff --git a/test/membrane/integration/toilet_forwarding_test.exs b/test/membrane/integration/toilet_forwarding_test.exs index 3953c5dc5..1f240dfff 100644 --- a/test/membrane/integration/toilet_forwarding_test.exs +++ b/test/membrane/integration/toilet_forwarding_test.exs @@ -222,6 +222,8 @@ defmodule Membrane.Integration.ToiletForwardingTest do pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + assert_sink_playing(pipeline, :sink) + buffers = 1..4000 |> Enum.map(fn i -> %Membrane.Buffer{payload: <>} end) diff --git a/test/membrane/pipeline_supervisor_test.exs b/test/membrane/pipeline_supervisor_test.exs index cdaf6605e..688506925 100644 --- a/test/membrane/pipeline_supervisor_test.exs +++ b/test/membrane/pipeline_supervisor_test.exs @@ -23,7 +23,7 @@ defmodule Membrane.PipelineSupervisorTest do assert_receive {:DOWN, ^supervisor_monitor_ref, _process, _pid, ^exit_reason} end - test "Pipeline supervisor exits with {:membrane_child_crash, child_name} when pipeline's child crashes" do + test "Pipeline supervisor exits with {:membrane_child_crash, child_name, child_exit_reason} when pipeline's child crashes" do defmodule MyElement do use Membrane.Endpoint @@ -44,7 +44,7 @@ defmodule Membrane.PipelineSupervisorTest do element_exit_reason = :custom_exit_reason Process.exit(element, element_exit_reason) - pipeline_exit_reason = {:membrane_child_crash, :element} + pipeline_exit_reason = {:membrane_child_crash, :element, element_exit_reason} assert_receive {:DOWN, ^element_monitor_ref, _process, _pid, ^element_exit_reason} assert_receive {:DOWN, ^pipeline_monitor_ref, _process, _pid, ^pipeline_exit_reason} diff --git a/test/membrane/remote_controlled/pipeline_test.exs b/test/membrane/remote_controlled/pipeline_test.exs index 5a57bbc3d..8e79aeb33 100644 --- a/test/membrane/remote_controlled/pipeline_test.exs +++ b/test/membrane/remote_controlled/pipeline_test.exs @@ -12,6 +12,8 @@ defmodule Membrane.RCPipelineTest do use Membrane.Filter alias Membrane.Buffer + require Membrane.Logger + def_output_pad :output, flow_control: :manual, accepted_format: _any, availability: :always def_input_pad :input, @@ -73,10 +75,11 @@ defmodule Membrane.RCPipelineTest do # TEST assert_receive %RCMessage.Notification{ - from: ^pipeline, - element: :b, - data: %Membrane.Buffer{payload: "test"} - } + from: ^pipeline, + element: :b, + data: %Membrane.Buffer{payload: "test"} + }, + 2000 assert_receive %RCMessage.StartOfStream{from: ^pipeline, element: :b, pad: :input} @@ -91,7 +94,7 @@ defmodule Membrane.RCPipelineTest do RCPipeline.exec_actions(pipeline, spec: @pipeline_spec) # TEST - assert_receive %RCMessage.EndOfStream{from: ^pipeline, element: :b, pad: :input} + assert_receive %RCMessage.EndOfStream{from: ^pipeline, element: :b, pad: :input}, 2000 assert_receive %RCMessage.EndOfStream{from: ^pipeline, element: :c, pad: :input} diff --git a/test/membrane/sync_test.exs b/test/membrane/sync_test.exs index d750046fa..e57168bdf 100644 --- a/test/membrane/sync_test.exs +++ b/test/membrane/sync_test.exs @@ -1,6 +1,6 @@ defmodule Membrane.SyncTest do use Bunch - use ExUnit.Case, async: true + use ExUnit.Case, async: false @module Membrane.Sync diff --git a/test/membrane/utility_supervisor_test.exs b/test/membrane/utility_supervisor_test.exs index 0aba5ce10..ed182b094 100644 --- a/test/membrane/utility_supervisor_test.exs +++ b/test/membrane/utility_supervisor_test.exs @@ -7,6 +7,8 @@ defmodule Membrane.UtilitySupervisorTest do alias Membrane.Testing test "Utility supervisor terminates utility when element exits" do + Process.register(self(), :utility_supervisor_test_process) + defmodule TestFilter do use Membrane.Filter @@ -16,7 +18,7 @@ defmodule Membrane.UtilitySupervisorTest do ctx.utility_supervisor, {Task, fn -> - Process.register(self(), :utility_supervisor_test_task) + send(:utility_supervisor_test_process, {:task_pid, self()}) Process.sleep(:infinity) end} ) @@ -28,8 +30,11 @@ defmodule Membrane.UtilitySupervisorTest do pipeline = Testing.Pipeline.start_supervised!(spec: [child(:filter, TestFilter)]) assert_pipeline_notified(pipeline, :filter, :setup) - monitor = Process.monitor(:utility_supervisor_test_task) + assert_receive {:task_pid, task_pid} + + monitor_ref = Process.monitor(task_pid) + Testing.Pipeline.terminate(pipeline) - assert_receive {:DOWN, ^monitor, :process, _pid, :shutdown} + assert_receive {:DOWN, ^monitor_ref, :process, _pid, :shutdown} end end