Skip to content

Commit

Permalink
Fix flaky tests on Elixir 1.15 (#595)
Browse files Browse the repository at this point in the history
* Tag flaky tests

* Fix race condition in supervisor test

* Fix flaky tests on Elixir 1.15, add more info to the supervisor exit
reason, add new notification to Testing.Sink
  • Loading branch information
FelonEkonom authored Sep 11, 2023
1 parent 1658b76 commit c260827
Show file tree
Hide file tree
Showing 14 changed files with 142 additions and 63 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* Add `:pause_auto_demand` and `:resume_auto_demand` actions. [#586](https://github.com/membraneframework/membrane_core/pull/586)
* Send `:end_of_stream`, even if it is not preceded by `:start_of_stream`. [#557](https://github.com/membraneframework/membrane_core/pull/577)
* Fix process leak in starting clocks. [#594](https://github.com/membraneframework/membrane_core/pull/594)
* Add child exit reason to the supervisor exit reason. [#595](https://github.com/membraneframework/membrane_core/pull/595)

## 0.11.0
* Separate element_name and pad arguments in handle_element_{start, end}_of_stream signature [#219](https://github.com/membraneframework/membrane_core/issues/219)
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
Terminating.
""")

{:error, {:membrane_child_crash, child_name}}
{:error, {:membrane_child_crash, child_name, reason}}
end
end

Expand Down
34 changes: 34 additions & 0 deletions lib/membrane/testing/assertions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,40 @@ defmodule Membrane.Testing.Assertions do
end
end

@doc """
Asserts that `Membrane.Testing.Sink` with name `sink_name` entered the playing
playback.
"""
defmacro assert_sink_playing(pipeline, sink_name, timeout \\ @default_timeout) do
do_sink_playing(&assert_receive_from_pipeline/3, pipeline, sink_name, timeout)
end

@doc """
Asserts that `Membrane.Testing.Sink` with name `sink_name` didn't enter the playing
playback.
"""
defmacro refute_sink_playing(pipeline, sink_name, timeout \\ @default_timeout) do
do_sink_playing(&refute_receive_from_pipeline/3, pipeline, sink_name, timeout)
end

defp do_sink_playing(assertion, pipeline, sink_name, timeout) do
quote do
element_name_value = unquote(sink_name)

unquote(
assertion.(
pipeline,
{:handle_child_notification,
{:playing,
quote do
^element_name_value
end}},
timeout
)
)
end
end

@doc """
Asserts that `Membrane.Testing.Pipeline` received or is going to receive start_of_stream
notification from the element with a name `element_name` within the `timeout` period
Expand Down
11 changes: 7 additions & 4 deletions lib/membrane/testing/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,16 @@ defmodule Membrane.Testing.Sink do
end

@impl true
def handle_playing(_context, %{autodemand: true} = state),
do: {[demand: :input], state}
def handle_playing(_ctx, state) do
actions =
notify(:playing) ++
if(state.autodemand, do: [demand: :input], else: [])

def handle_playing(_context, state), do: {[], state}
{actions, state}
end

@impl true
def handle_event(:input, event, _context, state) do
def handle_event(:input, event, _ctx, state) do
{notify({:event, event}), state}
end

Expand Down
13 changes: 11 additions & 2 deletions test/membrane/integration/auto_demands_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ defmodule Membrane.Integration.AutoDemandsTest do
]
)

assert_sink_playing(pipeline, :right_sink)

Pipeline.message_child(pipeline, :right_sink, {:make_demand, 1000})

Enum.each(1..1000, fn payload ->
Expand Down Expand Up @@ -191,7 +193,7 @@ defmodule Membrane.Integration.AutoDemandsTest do

@impl true
def handle_playing(_ctx, state) do
{[stream_format: {:output, %StreamFormat{}}], state}
{[stream_format: {:output, %StreamFormat{}}, notify_parent: :playing], state}
end
end

Expand All @@ -204,6 +206,8 @@ defmodule Membrane.Integration.AutoDemandsTest do
|> child(:sink, Sink)
)

assert_pipeline_notified(pipeline, :source, :playing)

buffers = Enum.map(1..10, &%Membrane.Buffer{payload: &1})
Pipeline.message_child(pipeline, :source, buffer: {:output, buffers})

Expand Down Expand Up @@ -232,9 +236,14 @@ defmodule Membrane.Integration.AutoDemandsTest do

Process.monitor(pipeline)

assert_pipeline_notified(pipeline, :source, :playing)

buffers = Enum.map(1..100_000, &%Membrane.Buffer{payload: &1})
Pipeline.message_child(pipeline, :source, buffer: {:output, buffers})
assert_receive({:DOWN, _ref, :process, ^pipeline, {:membrane_child_crash, :sink}})

assert_receive(
{:DOWN, _ref, :process, ^pipeline, {:membrane_child_crash, :sink, _sink_reason}}
)
end

defp reduce_link(link, enum, fun) do
Expand Down
13 changes: 8 additions & 5 deletions test/membrane/integration/child_pad_removed_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ defmodule Membrane.Integration.ChildPadRemovedTest do
def_options test_process: [spec: pid()]

@impl true
def handle_init(ctx, opts) do
send(opts.test_process, {:init, ctx.name})
{[], Map.from_struct(opts)}
def handle_init(_ctx, opts), do: {[], Map.from_struct(opts)}

@impl true
def handle_playing(ctx, state) do
send(state.test_process, {:playing, ctx.name})
{[], state}
end
end

Expand Down Expand Up @@ -161,7 +164,7 @@ defmodule Membrane.Integration.ChildPadRemovedTest do
] do
pipeline = start_pipeline!(DynamicBin, StaticSink)

assert_receive {:init, :sink}
assert_receive {:playing, :sink}
sink_pid = Testing.Pipeline.get_child_pid!(pipeline, :sink)
monitor_ref = Process.monitor(sink_pid)

Expand All @@ -183,7 +186,7 @@ defmodule Membrane.Integration.ChildPadRemovedTest do
] do
pipeline = start_link_pipeline!(DynamicBin, StaticSink, remove_children: :sink)

assert_receive {:init, :sink}
assert_receive {:playing, :sink}
sink_pid = Testing.Pipeline.get_child_pid!(pipeline, :sink)
monitor_ref = Process.monitor(sink_pid)

Expand Down
4 changes: 3 additions & 1 deletion test/membrane/integration/defer_setup_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ defmodule Membrane.Integration.DeferSetupTest do

monitor_ref = Process.monitor(pipeline)
complete_child_setup(pipeline, :bin_2)
assert_receive {:DOWN, ^monitor_ref, :process, ^pipeline, {:membrane_child_crash, :bin_2}}

assert_receive {:DOWN, ^monitor_ref, :process, ^pipeline,
{:membrane_child_crash, :bin_2, _bin_exit_reason}}
end

defp complete_child_setup(pipeline, child) do
Expand Down
92 changes: 54 additions & 38 deletions test/membrane/integration/demands_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ defmodule Membrane.Integration.DemandsTest do
end

defp test_pipeline(pid) do
assert_sink_playing(pid, :sink)

demand = 500
Pipeline.message_child(pid, :sink, {:make_demand, demand})

Expand Down Expand Up @@ -172,6 +174,11 @@ defmodule Membrane.Integration.DemandsTest do
@impl true
def handle_init(_ctx, _opts), do: {[], %{counter: 0}}

@impl true
def handle_playing(_ctx, state) do
{[notify_parent: :playing], state}
end

@impl true
def handle_buffer(:input, _buffer, _ctx, state) do
{[], Map.update!(state, :counter, &(&1 + 1))}
Expand All @@ -189,44 +196,6 @@ defmodule Membrane.Integration.DemandsTest do
end
end

test "actions :pause_auto_demand and :resume_auto_demand" do
pipeline =
Testing.Pipeline.start_link_supervised!(
spec:
child(RedemandingSource)
|> via_in(:input, auto_demand_size: 10)
|> child(:sink, PausingSink)
)

# time for pipeline to start playing
Process.sleep(500)

for _i <- 1..10 do
# during sleep below source should send around 100 buffers
Process.sleep(100 * RedemandingSource.sleep_time())

Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :pause_auto_demand})

assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no})
# sink should receive around 100 buffers, but the boundary is set to 70, in case of eg.
# slowdown of the source when running all tests in the project asynchronously
assert buff_no > 70

# during sleep below source should send up to about auto_demand_size = 10 buffers
Process.sleep(100 * RedemandingSource.sleep_time())

Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :resume_auto_demand})

assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no})
# sink should probably receive between 5 and 15 buffers, but the boundary is set to 25,
# to handle the case when eg. there is a delay in receiving the notification from the
# pipeline by the :sink
assert buff_no < 25
end

Testing.Pipeline.terminate(pipeline)
end

defmodule Funnel do
use Membrane.Filter

Expand Down Expand Up @@ -293,4 +262,51 @@ defmodule Membrane.Integration.DemandsTest do

Testing.Pipeline.terminate(pipeline)
end

defmodule Sync do
use Bunch
use ExUnit.Case, async: false

alias Membrane.Integration.DemandsTest.{PausingSink, RedemandingSource}

test "actions :pause_auto_demand and :resume_auto_demand" do
pipeline =
Testing.Pipeline.start_link_supervised!(
spec:
child(RedemandingSource)
|> via_in(:input, auto_demand_size: 10)
|> child(:sink, PausingSink)
)

assert_sink_playing(pipeline, :sink)

# time for pipeline to start playing
Process.sleep(1000)

for i <- 1..10 do
# during sleep below source should send around 100 buffers
Process.sleep(100 * RedemandingSource.sleep_time())

Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :pause_auto_demand})

assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no})
# sink should receive around 100 buffers, but the boundary is set to 65, in case of eg.
# slowdown of the source when running all tests in the project asynchronously
if i != 1, do: assert(buff_no > 65)

# during sleep below source should send up to about auto_demand_size = 10 buffers
Process.sleep(100 * RedemandingSource.sleep_time())

Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, :resume_auto_demand})

assert_pipeline_notified(pipeline, :sink, {:buff_no, buff_no})
# sink should probably receive between 5 and 15 buffers, but the boundary is set to 25,
# to handle the case when eg. there is a delay in receiving the notification from the
# pipeline by the :sink
assert buff_no < 25
end

Testing.Pipeline.terminate(pipeline)
end
end
end
3 changes: 2 additions & 1 deletion test/membrane/integration/links_validation_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ defmodule Membrane.LinksValidationTest do

Pipeline.execute_actions(pipeline, spec: spec)

assert_receive({:DOWN, ^ref, :process, ^pipeline, {%Membrane.LinkError{}, _stacktrace}})
assert_receive {:DOWN, ^ref, :process, ^pipeline, {%Membrane.LinkError{}, _stacktrace}},
1000
end
end
end
2 changes: 2 additions & 0 deletions test/membrane/integration/toilet_forwarding_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ defmodule Membrane.Integration.ToiletForwardingTest do

pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

assert_sink_playing(pipeline, :sink)

buffers =
1..4000
|> Enum.map(fn i -> %Membrane.Buffer{payload: <<i::64>>} end)
Expand Down
4 changes: 2 additions & 2 deletions test/membrane/pipeline_supervisor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule Membrane.PipelineSupervisorTest do
assert_receive {:DOWN, ^supervisor_monitor_ref, _process, _pid, ^exit_reason}
end

test "Pipeline supervisor exits with {:membrane_child_crash, child_name} when pipeline's child crashes" do
test "Pipeline supervisor exits with {:membrane_child_crash, child_name, child_exit_reason} when pipeline's child crashes" do
defmodule MyElement do
use Membrane.Endpoint

Expand All @@ -44,7 +44,7 @@ defmodule Membrane.PipelineSupervisorTest do
element_exit_reason = :custom_exit_reason
Process.exit(element, element_exit_reason)

pipeline_exit_reason = {:membrane_child_crash, :element}
pipeline_exit_reason = {:membrane_child_crash, :element, element_exit_reason}

assert_receive {:DOWN, ^element_monitor_ref, _process, _pid, ^element_exit_reason}
assert_receive {:DOWN, ^pipeline_monitor_ref, _process, _pid, ^pipeline_exit_reason}
Expand Down
13 changes: 8 additions & 5 deletions test/membrane/remote_controlled/pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ defmodule Membrane.RCPipelineTest do
use Membrane.Filter
alias Membrane.Buffer

require Membrane.Logger

def_output_pad :output, flow_control: :manual, accepted_format: _any, availability: :always

def_input_pad :input,
Expand Down Expand Up @@ -73,10 +75,11 @@ defmodule Membrane.RCPipelineTest do

# TEST
assert_receive %RCMessage.Notification{
from: ^pipeline,
element: :b,
data: %Membrane.Buffer{payload: "test"}
}
from: ^pipeline,
element: :b,
data: %Membrane.Buffer{payload: "test"}
},
2000

assert_receive %RCMessage.StartOfStream{from: ^pipeline, element: :b, pad: :input}

Expand All @@ -91,7 +94,7 @@ defmodule Membrane.RCPipelineTest do
RCPipeline.exec_actions(pipeline, spec: @pipeline_spec)

# TEST
assert_receive %RCMessage.EndOfStream{from: ^pipeline, element: :b, pad: :input}
assert_receive %RCMessage.EndOfStream{from: ^pipeline, element: :b, pad: :input}, 2000

assert_receive %RCMessage.EndOfStream{from: ^pipeline, element: :c, pad: :input}

Expand Down
2 changes: 1 addition & 1 deletion test/membrane/sync_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Membrane.SyncTest do
use Bunch
use ExUnit.Case, async: true
use ExUnit.Case, async: false

@module Membrane.Sync

Expand Down
11 changes: 8 additions & 3 deletions test/membrane/utility_supervisor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ defmodule Membrane.UtilitySupervisorTest do
alias Membrane.Testing

test "Utility supervisor terminates utility when element exits" do
Process.register(self(), :utility_supervisor_test_process)

defmodule TestFilter do
use Membrane.Filter

Expand All @@ -16,7 +18,7 @@ defmodule Membrane.UtilitySupervisorTest do
ctx.utility_supervisor,
{Task,
fn ->
Process.register(self(), :utility_supervisor_test_task)
send(:utility_supervisor_test_process, {:task_pid, self()})
Process.sleep(:infinity)
end}
)
Expand All @@ -28,8 +30,11 @@ defmodule Membrane.UtilitySupervisorTest do
pipeline = Testing.Pipeline.start_supervised!(spec: [child(:filter, TestFilter)])

assert_pipeline_notified(pipeline, :filter, :setup)
monitor = Process.monitor(:utility_supervisor_test_task)
assert_receive {:task_pid, task_pid}

monitor_ref = Process.monitor(task_pid)

Testing.Pipeline.terminate(pipeline)
assert_receive {:DOWN, ^monitor, :process, _pid, :shutdown}
assert_receive {:DOWN, ^monitor_ref, :process, _pid, :shutdown}
end
end

0 comments on commit c260827

Please sign in to comment.