Skip to content

Commit

Permalink
Ensure handle_demand busy wait infinite loop doesn't occur & deprecat…
Browse files Browse the repository at this point in the history
…e message_child (#779)

* Membrane.Testing.Pipeline.message_child/3 -> notify_child/3

* Write test for delayed demands loop, send at most 1 resume delayed demands loop message at the time

* Set default fields values in Element state

* Add line to changelog

* Rename test

* Rename function

* Refactor test

* Refactor due to CR

* Comment due to CR

* Refactor comment
  • Loading branch information
FelonEkonom authored Mar 20, 2024
1 parent 30c0bef commit 0143b79
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 69 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 1.1.0-rc0
* Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708)
* Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693)
* Deprecate `Membrane.Testing.Pipeline.message_child/3`. Introduce `Membrane.Testing.Pipeline.notify_child/3` instead. [#779](https://github.com/membraneframework/membrane_core/pull/779)

## 1.0.1
* Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614)
Expand Down
22 changes: 3 additions & 19 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -137,33 +137,17 @@ defmodule Membrane.Core.Element do
module: options.module,
type: options.module.membrane_element_type(),
name: options.name,
internal_state: nil,
parent_pid: options.parent,
supplying_demand?: false,
delayed_demands: MapSet.new(),
handle_demand_loop_counter: 0,
synchronization: %{
parent_clock: options.parent_clock,
timers: %{},
clock: nil,
stream_sync: options.sync,
latency: 0
},
initialized?: false,
playback: :stopped,
playback_queue: [],
resource_guard: resource_guard,
subprocess_supervisor: options.subprocess_supervisor,
terminating?: false,
setup_incomplete?: false,
effective_flow_control: :push,
handling_action?: false,
popping_auto_flow_queue?: false,
pads_to_snapshot: MapSet.new(),
stalker: options.stalker,
satisfied_auto_output_pads: MapSet.new(),
awaiting_auto_input_pads: MapSet.new(),
auto_input_pads: []
stalker: options.stalker
}
|> PadSpecHandler.init_pads()

Expand Down Expand Up @@ -226,8 +210,8 @@ defmodule Membrane.Core.Element do
{:noreply, state}
end

defp do_handle_info(Message.new(:resume_handle_demand_loop), state) do
state = DemandHandler.handle_delayed_demands(state)
defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do
state = DemandHandler.resume_delayed_demands_loop(state)
{:noreply, state}
end

Expand Down
15 changes: 13 additions & 2 deletions lib/membrane/core/element/demand_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ defmodule Membrane.Core.Element.DemandHandler do
PadModel.set_data!(state, pad_ref, :manual_demand_size, new_manual_demand_size)
end

@spec resume_delayed_demands_loop(State.t()) :: State.t()
def resume_delayed_demands_loop(%State{} = state) do
%{state | resume_delayed_demands_loop_in_mailbox?: false}
|> handle_delayed_demands()
end

@spec handle_delayed_demands(State.t()) :: State.t()
def handle_delayed_demands(%State{} = state) do
# Taking random element of `:delayed_demands` is done to keep data flow
Expand All @@ -125,10 +131,15 @@ defmodule Membrane.Core.Element.DemandHandler do
raise "Cannot handle delayed demands while already supplying demand"

state.handle_demand_loop_counter >= @handle_demand_loop_limit ->
Message.self(:resume_handle_demand_loop)
state =
with %{resume_delayed_demands_loop_in_mailbox?: false} <- state do
Message.self(:resume_delayed_demands_loop)
%{state | resume_delayed_demands_loop_in_mailbox?: true}
end

%{state | handle_demand_loop_counter: 0}

Enum.empty?(state.delayed_demands) ->
MapSet.size(state.delayed_demands) == 0 ->
%{state | handle_demand_loop_counter: 0}

true ->
Expand Down
61 changes: 30 additions & 31 deletions lib/membrane/core/element/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ defmodule Membrane.Core.Element.State do
pads_to_snapshot: MapSet.t(),
stalker: Membrane.Core.Stalker.t(),
satisfied_auto_output_pads: MapSet.t(),
awaiting_auto_input_pads: MapSet.t()
awaiting_auto_input_pads: MapSet.t(),
resume_delayed_demands_loop_in_mailbox?: boolean()
}

# READ THIS BEFORE ADDING NEW FIELD!!!
Expand All @@ -59,34 +60,32 @@ defmodule Membrane.Core.Element.State do
# as the last item in the list, because sometimes it is so big, that everything after it
# might be truncated during the inspection.

defstruct [
:module,
:name,
:parent_pid,
:playback,
:type,
:internal_state,
:pad_refs,
:pads_info,
:synchronization,
:delayed_demands,
:effective_flow_control,
:initialized?,
:terminating?,
:setup_incomplete?,
:supplying_demand?,
:handling_action?,
:popping_auto_flow_queue?,
:stalker,
:resource_guard,
:subprocess_supervisor,
:handle_demand_loop_counter,
:demand_size,
:pads_to_snapshot,
:playback_queue,
:pads_data,
:satisfied_auto_output_pads,
:awaiting_auto_input_pads,
:auto_input_pads
]
defstruct module: nil,
name: nil,
parent_pid: nil,
playback: :stopped,
type: nil,
internal_state: nil,
pad_refs: [],
pads_info: %{},
synchronization: nil,
delayed_demands: MapSet.new(),
effective_flow_control: :push,
initialized?: false,
terminating?: false,
setup_incomplete?: false,
supplying_demand?: false,
handling_action?: false,
popping_auto_flow_queue?: false,
stalker: nil,
resource_guard: nil,
subprocess_supervisor: nil,
handle_demand_loop_counter: 0,
pads_to_snapshot: MapSet.new(),
playback_queue: [],
pads_data: %{},
satisfied_auto_output_pads: MapSet.new(),
awaiting_auto_input_pads: MapSet.new(),
auto_input_pads: [],
resume_delayed_demands_loop_in_mailbox?: false
end
23 changes: 20 additions & 3 deletions lib/membrane/testing/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Membrane.Testing.Pipeline do
## Messaging children
You can send messages to children using their names specified in the children
list. Please check `message_child/3` for more details.
list. Please check `notify_child/3` for more details.
## Example usage
Expand Down Expand Up @@ -202,6 +202,23 @@ defmodule Membrane.Testing.Pipeline do
defdelegate terminate(pipeline, opts \\ []), to: Pipeline

@doc """
Sends notification to a child by Element name.
## Example
Knowing that `pipeline` has child named `sink`, notification can be sent as follows:
notify_child(pipeline, :sink, {:notification, "to handle"})
"""
@spec notify_child(pid(), Element.name(), any()) :: :ok
def notify_child(pipeline, child, notification) do
send(pipeline, {:for_element, child, notification})
:ok
end

@doc """
Deprecated since `v1.1.0-rc0`, use `notify_child/3` instead.
Sends message to a child by Element name.
## Example
Expand All @@ -210,10 +227,10 @@ defmodule Membrane.Testing.Pipeline do
message_child(pipeline, :sink, {:message, "to handle"})
"""
@deprecated "Use #{inspect(__MODULE__)}.notify_child/3 instead"
@spec message_child(pid(), Element.name(), any()) :: :ok
def message_child(pipeline, child, message) do
send(pipeline, {:for_element, child, message})
:ok
notify_child(pipeline, child, message)
end

@doc """
Expand Down
2 changes: 1 addition & 1 deletion test/membrane/integration/actions_handling_order_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ defmodule Membrane.Integration.ActionsHandlingOrderTest do
# time for pipeline to play
Process.sleep(100)

Testing.Pipeline.message_child(pipeline, :sink, :start_timer)
Testing.Pipeline.notify_child(pipeline, :sink, :start_timer)

assert_pipeline_notified(pipeline, :sink, :second_tick)

Expand Down
18 changes: 9 additions & 9 deletions test/membrane/integration/auto_demands_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ defmodule Membrane.Integration.AutoDemandsTest do

assert_sink_playing(pipeline, :right_sink)

Pipeline.message_child(pipeline, :right_sink, {:make_demand, 1000})
Pipeline.notify_child(pipeline, :right_sink, {:make_demand, 1000})

Enum.each(1..1000, fn payload ->
assert_sink_buffer(pipeline, :right_sink, buffer)
Expand Down Expand Up @@ -246,15 +246,15 @@ defmodule Membrane.Integration.AutoDemandsTest do
assert_pipeline_notified(pipeline, :source, :playing)

buffers = Enum.map(1..10, &%Membrane.Buffer{payload: &1})
Pipeline.message_child(pipeline, :source, buffer: {:output, buffers})
Pipeline.notify_child(pipeline, :source, buffer: {:output, buffers})

Enum.each(1..100_010, fn i ->
assert_sink_buffer(pipeline, :sink, buffer)
assert buffer.payload == i

if i <= 100_000 do
buffer = %Membrane.Buffer{payload: i + 10}
Pipeline.message_child(pipeline, :source, buffer: {:output, buffer})
Pipeline.notify_child(pipeline, :source, buffer: {:output, buffer})
end
end)

Expand All @@ -276,7 +276,7 @@ defmodule Membrane.Integration.AutoDemandsTest do
assert_pipeline_notified(pipeline, :source, :playing)

buffers = Enum.map(1..100_000, &%Membrane.Buffer{payload: &1})
Pipeline.message_child(pipeline, :source, buffer: {:output, buffers})
Pipeline.notify_child(pipeline, :source, buffer: {:output, buffers})

assert_receive(
{:DOWN, _ref, :process, ^pipeline, {:membrane_child_crash, :sink, _sink_reason}}
Expand Down Expand Up @@ -347,7 +347,7 @@ defmodule Membrane.Integration.AutoDemandsTest do
assert length(buffers) == manual_flow_queue_size

demand = 10_000
Pipeline.message_child(pipeline, :sink, {:make_demand, demand})
Pipeline.notify_child(pipeline, :sink, {:make_demand, demand})

buffers = receive_processed_buffers(pipeline, 2 * demand)
buffers_number = length(buffers)
Expand Down Expand Up @@ -379,7 +379,7 @@ defmodule Membrane.Integration.AutoDemandsTest do

assert_pipeline_notified(pipeline, :filter, :playing)

Pipeline.message_child(pipeline, :filter, pause_auto_demand: Pad.ref(:input, 0))
Pipeline.notify_child(pipeline, :filter, pause_auto_demand: Pad.ref(:input, 0))

# time for :filter to pause demand on Pad.ref(:input, 0)
Process.sleep(500)
Expand All @@ -388,7 +388,7 @@ defmodule Membrane.Integration.AutoDemandsTest do
assert length(buffers) == manual_flow_queue_size

demand = 10_000
Pipeline.message_child(pipeline, :sink, {:make_demand, demand})
Pipeline.notify_child(pipeline, :sink, {:make_demand, demand})

# fliter paused auto demand on Pad.ref(:input, 0), so it should receive
# at most auto_flow_demand_size buffers from there and rest of the buffers
Expand All @@ -410,12 +410,12 @@ defmodule Membrane.Integration.AutoDemandsTest do
# rest of them came from {:source, 1}
assert demand - auto_flow_demand_size <= counter_1

Pipeline.message_child(pipeline, :filter, resume_auto_demand: Pad.ref(:input, 0))
Pipeline.notify_child(pipeline, :filter, resume_auto_demand: Pad.ref(:input, 0))

# time for :filter to resume demand on Pad.ref(:input, 0)
Process.sleep(500)

Pipeline.message_child(pipeline, :sink, {:make_demand, demand})
Pipeline.notify_child(pipeline, :sink, {:make_demand, demand})

buffers = receive_processed_buffers(pipeline, 2 * demand)
buffers_number = length(buffers)
Expand Down
83 changes: 83 additions & 0 deletions test/membrane/integration/delayed_demands_loop_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
defmodule Membrane.Test.DelayedDemandsLoopTest do
use ExUnit.Case, async: true

import Membrane.ChildrenSpec
import Membrane.Testing.Assertions

alias Membrane.Buffer
alias Membrane.Debug
alias Membrane.Testing

defmodule Source do
use Membrane.Source

defmodule StreamFormat do
defstruct []
end

@sleep_time 5

def_output_pad :output,
accepted_format: _any,
availability: :on_request,
flow_control: :manual

@impl true
def handle_demand(_pad, _size, :buffers, %{pads: pads}, state) do
Process.sleep(@sleep_time)

stream_format_actions =
Enum.flat_map(pads, fn
{pad_ref, %{start_of_stream?: false}} -> [stream_format: {pad_ref, %StreamFormat{}}]
_pad_entry -> []
end)

buffer = %Buffer{payload: "a"}

buffer_and_redemand_actions =
Map.keys(pads)
|> Enum.flat_map(&[buffer: {&1, buffer}, redemand: &1])

{stream_format_actions ++ buffer_and_redemand_actions, state}
end

@impl true
def handle_parent_notification(:request, _ctx, state) do
{[notify_parent: :response], state}
end
end

describe "delayed demands loop pauses from time to time, when source has" do
test "1 pad", do: do_test(1)
test "2 pads", do: do_test(2)
test "10 pads", do: do_test(10)
end

defp do_test(sinks_number) do
# auto_demand_size is smaller than delayed_demands_loop_counter_limit, to ensure that
# after a snapshot, the counter is not reset
auto_demand_size = 15
requests_number = 20

spec =
[child(:source, Source)] ++
for i <- 1..sinks_number do
get_child(:source)
|> via_in(:input, auto_demand_size: auto_demand_size)
|> child({:sink, i}, Debug.Sink)
end

pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

for i <- 1..sinks_number do
assert_start_of_stream(pipeline, {:sink, ^i})
end

for _i <- 1..requests_number do
Testing.Pipeline.notify_child(pipeline, :source, :request)
assert_pipeline_notified(pipeline, :source, :response)
end

Testing.Pipeline.terminate(pipeline)
end
end
4 changes: 2 additions & 2 deletions test/membrane/integration/demands_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ defmodule Membrane.Integration.DemandsTest do
assert_sink_playing(pid, :sink)

demand = 500
Pipeline.message_child(pid, :sink, {:make_demand, demand})
Pipeline.notify_child(pid, :sink, {:make_demand, demand})

0..(demand - 1)
|> assert_buffers_received(pid)

pattern = %Buffer{payload: <<demand::16>> <> <<255>>}
refute_sink_buffer(pid, :sink, ^pattern, 0)
Pipeline.message_child(pid, :sink, {:make_demand, demand})
Pipeline.notify_child(pid, :sink, {:make_demand, demand})

demand..(2 * demand - 1)
|> assert_buffers_received(pid)
Expand Down
Loading

0 comments on commit 0143b79

Please sign in to comment.