From fb105648953c3d60f5b2c0cac35105d8f28aba85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Feliks=20Pobiedzi=C5=84ski?= <38541925+FelonEkonom@users.noreply.github.com> Date: Wed, 3 Apr 2024 15:41:26 +0200 Subject: [PATCH] Refactor demand mechanism (#783) * Comment out handling_action? flag * Add new delayed demands loop tests * supplying_demand? -> delay_demands? * Delete unncessary flags, rename some modules * Introduce ManualFlowController * Introduce AutoFlowController * Format * InputQueue -> ManualFlowController.InputQueue * Bump default input queue size to 100 * Fix tests * Remove unnecessary comments * Remove leftovers * Clean up aliases * format * Implement comments from CR * Fix test performance * Fix performance test * Implement suggestion from CR --- .circleci/config.yml | 5 +- lib/membrane/children_spec.ex | 2 +- lib/membrane/core/bin.ex | 2 - lib/membrane/core/bin/state.ex | 2 - lib/membrane/core/callback_handler.ex | 21 ---- lib/membrane/core/child/pad_model.ex | 3 +- lib/membrane/core/element.ex | 8 +- lib/membrane/core/element/action_handler.ex | 64 +++++------ ..._flow_utils.ex => auto_flow_controller.ex} | 2 +- .../core/element/buffer_controller.ex | 38 ++++--- .../core/element/demand_controller.ex | 72 ++++++++++-- .../core/element/effective_flow_controller.ex | 6 +- lib/membrane/core/element/event_controller.ex | 10 +- ...d_handler.ex => manual_flow_controller.ex} | 104 ++++++------------ .../input_queue.ex | 11 +- lib/membrane/core/element/pad_controller.ex | 11 +- lib/membrane/core/element/state.ex | 6 +- .../core/element/stream_format_controller.ex | 14 ++- lib/membrane/core/pipeline/state.ex | 2 - lib/membrane/element/pad_data.ex | 2 +- .../core/element/action_handler_test.exs | 7 +- .../core/element/event_controller_test.exs | 8 +- .../core/element/input_queue_test.exs | 11 +- .../element/lifecycle_controller_test.exs | 8 +- .../core/element/pad_controller_test.exs | 3 +- .../element/stream_format_controller_test.exs | 8 +- test/membrane/core/pipeline_test.exs | 2 - .../integration/auto_demands_test.exs | 4 +- .../integration/delayed_demands_loop_test.exs | 96 ++++++++++++++++ 29 files changed, 310 insertions(+), 222 deletions(-) rename lib/membrane/core/element/{demand_controller/auto_flow_utils.ex => auto_flow_controller.ex} (99%) rename lib/membrane/core/element/{demand_handler.ex => manual_flow_controller.ex} (72%) rename lib/membrane/core/element/{ => manual_flow_controller}/input_queue.ex (97%) diff --git a/.circleci/config.yml b/.circleci/config.yml index fedce8b42..9b890aa0f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -152,13 +152,14 @@ jobs: steps: - attach_workspace: at: . + - run: cd app; git add -A; git reset --hard; cd .. - run: cp -r benchmark/ ~/benchmark_backup/ - run: cp mix.exs ~/benchmark_backup/ - run: docker pull membraneframeworklabs/docker_membrane - - run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, run benchmark/run.exs /root/results/feature_branch_results + - run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, compile --force, run benchmark/run.exs /root/results/feature_branch_results - run: git checkout -f master - run: cp ~/benchmark_backup/mix.exs ~/app - - run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -v ~/benchmark_backup/benchmark:/root/app/benchmark -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, run benchmark/run.exs /root/results/master_results + - run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -v ~/benchmark_backup/benchmark:/root/app/benchmark -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, compile --force, run benchmark/run.exs /root/results/master_results - run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -v ~/benchmark_backup/benchmark:/root/app/benchmark -w /root/app membraneframeworklabs/docker_membrane mix run benchmark/compare.exs /root/results/feature_branch_results /root/results/master_results - run: command: rm ~/results/* diff --git a/lib/membrane/children_spec.ex b/lib/membrane/children_spec.ex index cd810096d..7507da384 100644 --- a/lib/membrane/children_spec.ex +++ b/lib/membrane/children_spec.ex @@ -514,7 +514,7 @@ defmodule Membrane.ChildrenSpec do Membrane won't send smaller demand than `minimal demand`, to reduce demands' overhead. However, the user will always receive as many buffers, as demanded, all excess buffers will be queued internally. Used only for pads working in `:manual` flow control mode. See `t:Membrane.Pad.flow_control/0` - for more info. Defaults to `#{Membrane.Core.Element.InputQueue.default_min_demand_factor()}` (the default may change in the future). + for more info. Defaults to `#{Membrane.Core.Element.ManualFlowController.InputQueue.default_min_demand_factor()}` (the default may change in the future). - `auto_demand_size` - Size of automatically generated demands. Used only for pads working in `:auto` flow control mode. See `t:Membrane.Pad.flow_control/0` for more info. - `throttling_factor` - an integer specifying how frequently should a sender update the number of buffers in the `Toilet`. Defaults to 1, diff --git a/lib/membrane/core/bin.ex b/lib/membrane/core/bin.ex index 9c3ce1950..37b09b22a 100644 --- a/lib/membrane/core/bin.ex +++ b/lib/membrane/core/bin.ex @@ -70,8 +70,6 @@ defmodule Membrane.Core.Bin do if node do result = :rpc.call(node, GenServer, :start, [__MODULE__, options]) - - # TODO: use an atomic way of linking once https://github.com/erlang/otp/issues/6375 is solved with {:start_link, {:ok, pid}} <- {method, result}, do: Process.link(pid) result else diff --git a/lib/membrane/core/bin/state.ex b/lib/membrane/core/bin/state.ex index 9ac285763..e9605a1cf 100644 --- a/lib/membrane/core/bin/state.ex +++ b/lib/membrane/core/bin/state.ex @@ -45,7 +45,6 @@ defmodule Membrane.Core.Bin.State do terminating?: boolean(), resource_guard: Membrane.ResourceGuard.t(), setup_incomplete?: boolean(), - handling_action?: boolean(), stalker: Membrane.Core.Stalker.t() } @@ -73,7 +72,6 @@ defmodule Membrane.Core.Bin.State do initialized?: false, terminating?: false, setup_incomplete?: false, - handling_action?: false, stalker: nil, resource_guard: nil, subprocess_supervisor: nil, diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index 91729aef0..c42a7400c 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -8,7 +8,6 @@ defmodule Membrane.Core.CallbackHandler do use Bunch alias Membrane.CallbackError - alias Membrane.Core.Component require Membrane.Logger @@ -189,16 +188,6 @@ defmodule Membrane.Core.CallbackHandler do reraise e, __STACKTRACE__ end - was_handling_action? = state.handling_action? - state = %{state | handling_action?: true} - - # Updating :supplying_demand? flag value here is a temporal fix. - # Setting it to `true` while handling actions causes postponing calls - # of handle_redemand/2 and supply_demand/2 until a moment, when all - # actions returned from the callback are handled - was_supplying_demand? = Map.get(state, :supplying_demand?, false) - state = if Component.is_element?(state), do: %{state | supplying_demand?: true}, else: state - state = Enum.reduce(actions, state, fn action, state -> try do @@ -213,16 +202,6 @@ defmodule Membrane.Core.CallbackHandler do end end) - state = - if was_handling_action?, - do: state, - else: %{state | handling_action?: false} - - state = - if Component.is_element?(state) and not was_supplying_demand?, - do: %{state | supplying_demand?: false}, - else: state - handler_module.handle_end_of_actions(state) end end diff --git a/lib/membrane/core/child/pad_model.ex b/lib/membrane/core/child/pad_model.ex index 9fd1c4ace..cfd7a97a9 100644 --- a/lib/membrane/core/child/pad_model.ex +++ b/lib/membrane/core/child/pad_model.ex @@ -7,6 +7,7 @@ defmodule Membrane.Core.Child.PadModel do alias Membrane.Core.Child alias Membrane.Core.Element.EffectiveFlowController + alias Membrane.Core.Element.ManualFlowController.InputQueue alias Membrane.{Pad, UnknownPadError} @type bin_pad_data :: %Membrane.Bin.PadData{ @@ -39,7 +40,7 @@ defmodule Membrane.Core.Child.PadModel do pid: pid, other_ref: Pad.ref(), sticky_messages: [Membrane.Event.t()], - input_queue: Membrane.Core.Element.InputQueue.t() | nil, + input_queue: InputQueue.t() | nil, options: %{optional(atom) => any}, auto_demand_size: pos_integer() | nil, sticky_events: [Membrane.Event.t()], diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index d76497770..bf9ab243e 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -24,10 +24,10 @@ defmodule Membrane.Core.Element do alias Membrane.Core.Element.{ BufferController, DemandController, - DemandHandler, EffectiveFlowController, EventController, LifecycleController, + ManualFlowController, PadController, State, StreamFormatController @@ -85,8 +85,6 @@ defmodule Membrane.Core.Element do # rpc if necessary if node do result = :rpc.call(node, GenServer, :start, [__MODULE__, options]) - - # TODO: use an atomic way of linking once https://github.com/erlang/otp/issues/6375 is solved with {:start_link, {:ok, pid}} <- {method, result}, do: Process.link(pid) result else @@ -211,13 +209,13 @@ defmodule Membrane.Core.Element do end defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do - state = DemandHandler.resume_delayed_demands_loop(state) + state = ManualFlowController.resume_delayed_demands_loop(state) {:noreply, state} end defp do_handle_info(Message.new(:buffer, buffers, _opts) = msg, state) do pad_ref = Message.for_pad(msg) - state = BufferController.handle_buffer(pad_ref, buffers, state) + state = BufferController.handle_incoming_buffers(pad_ref, buffers, state) {:noreply, state} end diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 3650b58ad..d6027870d 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -22,13 +22,13 @@ defmodule Membrane.Core.Element.ActionHandler do } alias Membrane.Core.Element.{ + AutoFlowController, DemandController, - DemandHandler, + ManualFlowController, State, StreamFormatController } - alias Membrane.Core.Element.DemandController.AutoFlowUtils alias Membrane.Core.{Events, TimerController} alias Membrane.Element.Action @@ -50,34 +50,29 @@ defmodule Membrane.Core.Element.ActionHandler do # Fixed order of handling demand of manual and auto pads would lead to # favoring manual pads over auto pads (or vice versa), especially after # introducting auto flow queues. - manual_demands_first? = Enum.random([1, 2]) == 1 - state = - if manual_demands_first?, - do: maybe_handle_delayed_demands(state), - else: state - - state = maybe_handle_pads_to_snapshot(state) - - state = - if manual_demands_first?, - do: state, - else: maybe_handle_delayed_demands(state) - - state + if Enum.random([true, false]) do + state + |> handle_pads_to_snapshot() + |> maybe_handle_delayed_demands() + else + state + |> maybe_handle_delayed_demands() + |> handle_pads_to_snapshot() + end end defp maybe_handle_delayed_demands(state) do - with %{supplying_demand?: false} <- state do - DemandHandler.handle_delayed_demands(state) + with %{delay_demands?: false} <- state do + ManualFlowController.handle_delayed_demands(state) end end - defp maybe_handle_pads_to_snapshot(state) do - with %{handling_action?: false} <- state do - Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2) - |> Map.put(:pads_to_snapshot, MapSet.new()) - end + defp handle_pads_to_snapshot(state) do + state.pads_to_snapshot + |> Enum.shuffle() + |> Enum.reduce(state, &DemandController.snapshot_atomic_demand/2) + |> Map.put(:pads_to_snapshot, MapSet.new()) end @impl CallbackHandler @@ -178,13 +173,13 @@ defmodule Membrane.Core.Element.ActionHandler do @impl CallbackHandler def handle_action({:pause_auto_demand, in_ref}, _cb, _params, %State{type: type} = state) when type in [:sink, :filter, :endpoint] do - DemandController.AutoFlowUtils.pause_demands(in_ref, state) + AutoFlowController.pause_demands(in_ref, state) end @impl CallbackHandler def handle_action({:resume_auto_demand, in_ref}, _cb, _params, %State{type: type} = state) when type in [:sink, :filter, :endpoint] do - DemandController.AutoFlowUtils.resume_demands(in_ref, state) + AutoFlowController.resume_demands(in_ref, state) end @impl CallbackHandler @@ -239,7 +234,7 @@ defmodule Membrane.Core.Element.ActionHandler do %State{type: type} = state ) when is_pad_ref(pad_ref) and is_demand_size(size) and type in [:sink, :filter, :endpoint] do - supply_demand(pad_ref, size, state) + delay_supplying_demand(pad_ref, size, state) end @impl CallbackHandler @@ -396,26 +391,27 @@ defmodule Membrane.Core.Element.ActionHandler do end end - @spec supply_demand( + @spec delay_supplying_demand( Pad.ref(), Action.demand_size(), State.t() ) :: State.t() - defp supply_demand(pad_ref, 0, state) do + defp delay_supplying_demand(pad_ref, 0, state) do Membrane.Logger.debug_verbose("Ignoring demand of size of 0 on pad #{inspect(pad_ref)}") state end - defp supply_demand(pad_ref, size, _state) + defp delay_supplying_demand(pad_ref, size, _state) when is_integer(size) and size < 0 do raise ElementError, "Tried to request a negative demand of size #{inspect(size)} on pad #{inspect(pad_ref)}" end - defp supply_demand(pad_ref, size, state) do + defp delay_supplying_demand(pad_ref, size, state) do with %{direction: :input, flow_control: :manual} <- PadModel.get_data!(state, pad_ref) do - DemandHandler.supply_demand(pad_ref, size, state) + state = ManualFlowController.update_demand(pad_ref, size, state) + ManualFlowController.delay_supplying_demand(pad_ref, state) else %{direction: :output} -> raise PadDirectionError, action: :demand, direction: :output, pad: pad_ref @@ -435,7 +431,7 @@ defmodule Membrane.Core.Element.ActionHandler do when type in [:source, :filter, :endpoint] do with %{direction: :output, flow_control: :manual} <- PadModel.get_data!(state, pad_ref) do - DemandHandler.handle_redemand(pad_ref, state) + ManualFlowController.delay_redemand(pad_ref, state) else %{direction: :input} -> raise ElementError, "Tried to make a redemand on input pad #{inspect(pad_ref)}" @@ -471,10 +467,10 @@ defmodule Membrane.Core.Element.ActionHandler do @spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t() defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do - DemandHandler.remove_pad_from_delayed_demands(pad_ref, state) + ManualFlowController.remove_pad_from_delayed_demands(pad_ref, state) |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) |> PadModel.set_data!(pad_ref, :end_of_stream?, true) - |> AutoFlowUtils.pop_queues_and_bump_demand() + |> AutoFlowController.pop_queues_and_bump_demand() else %{direction: :input} -> raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref diff --git a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex b/lib/membrane/core/element/auto_flow_controller.ex similarity index 99% rename from lib/membrane/core/element/demand_controller/auto_flow_utils.ex rename to lib/membrane/core/element/auto_flow_controller.ex index 62d0b2077..36c6fc3e1 100644 --- a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex +++ b/lib/membrane/core/element/auto_flow_controller.ex @@ -1,4 +1,4 @@ -defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do +defmodule Membrane.Core.Element.AutoFlowController do @moduledoc false alias Membrane.Buffer diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index 986a204cf..d275502d3 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -11,15 +11,16 @@ defmodule Membrane.Core.Element.BufferController do alias Membrane.Core.Element.{ ActionHandler, + AutoFlowController, CallbackContext, - DemandHandler, EventController, - InputQueue, + ManualFlowController, PlaybackQueue, State } - alias Membrane.Core.Element.DemandController.AutoFlowUtils + alias Membrane.Core.Element.ManualFlowController.InputQueue + alias Membrane.Core.Telemetry require Membrane.Core.Child.PadModel @@ -27,11 +28,11 @@ defmodule Membrane.Core.Element.BufferController do @doc """ Handles incoming buffer: either stores it in InputQueue, or executes element's - callback. Also calls `Membrane.Core.Element.DemandHandler.supply_demand/2` + callback. Also calls `Membrane.Core.Element.ManualFlowController.supply_demand/2` to check if there are any unsupplied demands. """ - @spec handle_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() - def handle_buffer(pad_ref, buffers, state) do + @spec handle_incoming_buffers(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() + def handle_incoming_buffers(pad_ref, buffers, state) do withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref), playback: %State{playback: :playing} <- state do %{ @@ -49,20 +50,25 @@ defmodule Membrane.Core.Element.BufferController do EventController.handle_start_of_stream(pad_ref, state) end - do_handle_buffer(pad_ref, data, buffers, state) + do_handle_incoming_buffers(pad_ref, data, buffers, state) else pad: {:error, :unknown_pad} -> # We've got a buffer from already unlinked pad state playback: _playback -> - PlaybackQueue.store(&handle_buffer(pad_ref, buffers, &1), state) + PlaybackQueue.store(&handle_incoming_buffers(pad_ref, buffers, &1), state) end end - @spec do_handle_buffer(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) :: + @spec do_handle_incoming_buffers( + Pad.ref(), + PadModel.pad_data(), + [Buffer.t()] | Buffer.t(), + State.t() + ) :: State.t() - defp do_handle_buffer(pad_ref, %{flow_control: :auto} = data, buffers, state) do + defp do_handle_incoming_buffers(pad_ref, %{flow_control: :auto} = data, buffers, state) do %{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers) @@ -70,27 +76,27 @@ defmodule Membrane.Core.Element.BufferController do :atomics.put(stalker_metrics.demand, 1, demand - buf_size) if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do - AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state) + AutoFlowController.store_buffers_in_queue(pad_ref, buffers, state) else state = exec_buffer_callback(pad_ref, buffers, state) - AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state) + AutoFlowController.auto_adjust_atomic_demand(pad_ref, state) end end - defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do + defp do_handle_incoming_buffers(pad_ref, %{flow_control: :manual} = data, buffers, state) do %{input_queue: old_input_queue} = data input_queue = InputQueue.store(old_input_queue, buffers) state = PadModel.set_data!(state, pad_ref, :input_queue, input_queue) - if old_input_queue |> InputQueue.empty?() do - DemandHandler.supply_demand(pad_ref, state) + if InputQueue.empty?(old_input_queue) do + ManualFlowController.supply_demand(pad_ref, state) else state end end - defp do_handle_buffer(pad_ref, %{flow_control: :push}, buffers, state) do + defp do_handle_incoming_buffers(pad_ref, %{flow_control: :push}, buffers, state) do exec_buffer_callback(pad_ref, buffers, state) end diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index daa5fb784..985e56c85 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -5,13 +5,17 @@ defmodule Membrane.Core.Element.DemandController do use Bunch - alias __MODULE__.AutoFlowUtils - alias Membrane.Buffer + alias Membrane.Element.PadData + + alias Membrane.Core.CallbackHandler + alias Membrane.Core.Element.CallbackContext alias Membrane.Core.Element.{ + ActionHandler, AtomicDemand, - DemandHandler, + AutoFlowController, + ManualFlowController, PlaybackQueue, State } @@ -25,8 +29,10 @@ defmodule Membrane.Core.Element.DemandController do def snapshot_atomic_demand(pad_ref, state) do with {:ok, pad_data} when not pad_data.end_of_stream? <- PadModel.get_data(state, pad_ref), %State{playback: :playing} <- state do - if pad_data.direction == :input, - do: raise("cannot snapshot atomic counter in input pad") + if pad_data.direction == :input do + raise Membrane.ElementError, + "Cannot snapshot atomic counter in input pad #{inspect(pad_ref)}" + end do_snapshot_atomic_demand(pad_data, state) else @@ -50,10 +56,13 @@ defmodule Membrane.Core.Element.DemandController do %{flow_control: :auto} = pad_data, %{effective_flow_control: :pull} = state ) do - if AtomicDemand.get(pad_data.atomic_demand) > 0 do + atomic_value = AtomicDemand.get(pad_data.atomic_demand) + state = PadModel.set_data!(state, pad_data.ref, :demand, atomic_value) + + if atomic_value > 0 do state |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref)) - |> AutoFlowUtils.pop_queues_and_bump_demand() + |> AutoFlowController.pop_queues_and_bump_demand() else state end @@ -76,7 +85,7 @@ defmodule Membrane.Core.Element.DemandController do } ) - DemandHandler.handle_redemand(pad_data.ref, state) + ManualFlowController.handle_redemand(pad_data.ref, state) else _other -> state end @@ -109,4 +118,51 @@ defmodule Membrane.Core.Element.DemandController do atomic_demand: atomic_demand }) end + + @spec exec_handle_demand(Pad.ref(), State.t()) :: State.t() + def exec_handle_demand(pad_ref, state) do + with {:ok, pad_data} <- PadModel.get_data(state, pad_ref), + true <- exec_handle_demand?(pad_data) do + do_exec_handle_demand(pad_data, state) + else + _other -> state + end + end + + @spec do_exec_handle_demand(PadData.t(), State.t()) :: State.t() + defp do_exec_handle_demand(pad_data, state) do + context = &CallbackContext.from_state(&1, incoming_demand: pad_data.incoming_demand) + + CallbackHandler.exec_and_handle_callback( + :handle_demand, + ActionHandler, + %{ + split_continuation_arbiter: &exec_handle_demand?(PadModel.get_data!(&1, pad_data.ref)), + context: context + }, + [pad_data.ref, pad_data.demand, pad_data.demand_unit], + state + ) + end + + defp exec_handle_demand?(%{end_of_stream?: true}) do + Membrane.Logger.debug_verbose(""" + Demand controller: not executing handle_demand as :end_of_stream action has already been returned + """) + + false + end + + defp exec_handle_demand?(%{demand: demand}) when demand <= 0 do + Membrane.Logger.debug_verbose(""" + Demand controller: not executing handle_demand as demand is not greater than 0, + demand: #{inspect(demand)} + """) + + false + end + + defp exec_handle_demand?(_pad_data) do + true + end end diff --git a/lib/membrane/core/element/effective_flow_controller.ex b/lib/membrane/core/element/effective_flow_controller.ex index 6d0849439..639914e6e 100644 --- a/lib/membrane/core/element/effective_flow_controller.ex +++ b/lib/membrane/core/element/effective_flow_controller.ex @@ -18,9 +18,7 @@ defmodule Membrane.Core.Element.EffectiveFlowController do # Effective flow control of a single element can switch between :push and :pull many times during the element's lifetime. - alias Membrane.Core.Element.DemandController - alias Membrane.Core.Element.DemandController.AutoFlowUtils - alias Membrane.Core.Element.{AtomicDemand, State} + alias Membrane.Core.Element.{AtomicDemand, AutoFlowController, DemandController, State} require Membrane.Core.Child.PadModel, as: PadModel require Membrane.Core.Message, as: Message @@ -141,6 +139,6 @@ defmodule Membrane.Core.Element.EffectiveFlowController do state end) end - |> AutoFlowUtils.pop_queues_and_bump_demand() + |> AutoFlowController.pop_queues_and_bump_demand() end end diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index f6b59a34b..f97169308 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -11,14 +11,14 @@ defmodule Membrane.Core.Element.EventController do alias Membrane.Core.Element.{ ActionHandler, + AutoFlowController, CallbackContext, - DemandHandler, - InputQueue, + ManualFlowController, PlaybackQueue, State } - alias Membrane.Core.Element.DemandController.AutoFlowUtils + alias Membrane.Core.Element.ManualFlowController.InputQueue require Membrane.Core.Child.PadModel require Membrane.Core.Message @@ -55,7 +55,7 @@ defmodule Membrane.Core.Element.EventController do # event goes to the auto flow control queue not async? and MapSet.member?(state.awaiting_auto_input_pads, pad_ref) -> - AutoFlowUtils.store_event_in_queue(pad_ref, event, state) + AutoFlowController.store_event_in_queue(pad_ref, event, state) true -> exec_handle_event(pad_ref, event, state) @@ -109,7 +109,7 @@ defmodule Membrane.Core.Element.EventController do Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}") state = - DemandHandler.remove_pad_from_delayed_demands(pad_ref, state) + ManualFlowController.remove_pad_from_delayed_demands(pad_ref, state) |> PadModel.set_data!(pad_ref, :end_of_stream?, true) |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) |> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref)) diff --git a/lib/membrane/core/element/demand_handler.ex b/lib/membrane/core/element/manual_flow_controller.ex similarity index 72% rename from lib/membrane/core/element/demand_handler.ex rename to lib/membrane/core/element/manual_flow_controller.ex index d7d9ec8e6..be20a37d9 100644 --- a/lib/membrane/core/element/demand_handler.ex +++ b/lib/membrane/core/element/manual_flow_controller.ex @@ -1,21 +1,18 @@ -defmodule Membrane.Core.Element.DemandHandler do +defmodule Membrane.Core.Element.ManualFlowController do @moduledoc false # Module handling demands requested on output pads. - alias Membrane.Core.CallbackHandler - alias Membrane.Core.Element.{ - ActionHandler, BufferController, - CallbackContext, + DemandController, EventController, - InputQueue, State, StreamFormatController } - alias Membrane.Element.PadData + alias __MODULE__.InputQueue + alias Membrane.Pad require Membrane.Core.Child.PadModel, as: PadModel @@ -24,6 +21,11 @@ defmodule Membrane.Core.Element.DemandHandler do @handle_demand_loop_limit 20 + @spec delay_redemand(Pad.ref(), State.t()) :: State.t() + def delay_redemand(pad_ref, state) do + Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :redemand})) + end + @doc """ Called when redemand action was returned. * If element is currently supplying demand, it means that after finishing `supply_demand` it will call @@ -32,8 +34,8 @@ defmodule Membrane.Core.Element.DemandHandler do output, `handle_demand` is invoked right away, so that the demand can be synchronously supplied. """ @spec handle_redemand(Pad.ref(), State.t()) :: State.t() - def handle_redemand(pad_ref, %State{supplying_demand?: true} = state) do - Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :redemand})) + def handle_redemand(pad_ref, %State{delay_demands?: true} = state) do + delay_redemand(pad_ref, state) end def handle_redemand(pad_ref, %State{} = state) do @@ -42,9 +44,9 @@ defmodule Membrane.Core.Element.DemandHandler do end defp do_handle_redemand(pad_ref, state) do - state = %{state | supplying_demand?: true} - state = exec_handle_demand(pad_ref, state) - %{state | supplying_demand?: false} + state = %{state | delay_demands?: true} + state = DemandController.exec_handle_demand(pad_ref, state) + %{state | delay_demands?: false} end @doc """ @@ -65,17 +67,12 @@ defmodule Membrane.Core.Element.DemandHandler do """ @spec supply_demand( Pad.ref(), - size :: non_neg_integer | (non_neg_integer() -> non_neg_integer()), State.t() ) :: State.t() - def supply_demand(pad_ref, size, state) do - state = update_demand(pad_ref, size, state) - supply_demand(pad_ref, state) - end @spec supply_demand(Pad.ref(), State.t()) :: State.t() - def supply_demand(pad_ref, %State{supplying_demand?: true} = state) do - Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply})) + def supply_demand(pad_ref, %State{delay_demands?: true} = state) do + delay_supplying_demand(pad_ref, state) end def supply_demand(pad_ref, state) do @@ -85,7 +82,7 @@ defmodule Membrane.Core.Element.DemandHandler do defp do_supply_demand(pad_ref, state) do # marking is state that actual demand supply has been started (note changing back to false when finished) - state = %State{state | supplying_demand?: true} + state = %State{state | delay_demands?: true} pad_data = state |> PadModel.get_data!(pad_ref) @@ -94,14 +91,24 @@ defmodule Membrane.Core.Element.DemandHandler do state = PadModel.set_data!(state, pad_ref, :input_queue, new_input_queue) state = handle_input_queue_output(pad_ref, popped_data, state) - %State{state | supplying_demand?: false} + %State{state | delay_demands?: false} + end + + @spec delay_supplying_demand(Pad.ref(), State.t()) :: State.t() + def delay_supplying_demand(pad_ref, state) do + Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply})) end - defp update_demand(pad_ref, size, state) when is_integer(size) do + @spec update_demand( + Pad.ref(), + non_neg_integer() | (non_neg_integer() -> non_neg_integer()), + State.t() + ) :: State.t() + def update_demand(pad_ref, size, state) when is_integer(size) do PadModel.set_data!(state, pad_ref, :manual_demand_size, size) end - defp update_demand(pad_ref, size_fun, state) when is_function(size_fun) do + def update_demand(pad_ref, size_fun, state) when is_function(size_fun) do manual_demand_size = PadModel.get_data!(state, pad_ref, :manual_demand_size) new_manual_demand_size = size_fun.(manual_demand_size) @@ -127,8 +134,8 @@ defmodule Membrane.Core.Element.DemandHandler do # potentially for a long time. cond do - state.supplying_demand? -> - raise "Cannot handle delayed demands while already supplying demand" + state.delay_demands? -> + raise "Cannot handle delayed demands when delay_demands? flag is set to true" state.handle_demand_loop_counter >= @handle_demand_loop_limit -> state = @@ -198,51 +205,4 @@ defmodule Membrane.Core.Element.DemandHandler do BufferController.exec_buffer_callback(pad_ref, buffers, state) end - - @spec exec_handle_demand(Pad.ref(), State.t()) :: State.t() - defp exec_handle_demand(pad_ref, state) do - with {:ok, pad_data} <- PadModel.get_data(state, pad_ref), - true <- exec_handle_demand?(pad_data) do - do_exec_handle_demand(pad_data, state) - else - _other -> state - end - end - - @spec do_exec_handle_demand(PadData.t(), State.t()) :: State.t() - defp do_exec_handle_demand(pad_data, state) do - context = &CallbackContext.from_state(&1, incoming_demand: pad_data.incoming_demand) - - CallbackHandler.exec_and_handle_callback( - :handle_demand, - ActionHandler, - %{ - split_continuation_arbiter: &exec_handle_demand?(PadModel.get_data!(&1, pad_data.ref)), - context: context - }, - [pad_data.ref, pad_data.demand, pad_data.demand_unit], - state - ) - end - - defp exec_handle_demand?(%{end_of_stream?: true}) do - Membrane.Logger.debug_verbose(""" - Demand controller: not executing handle_demand as :end_of_stream action has already been returned - """) - - false - end - - defp exec_handle_demand?(%{demand: demand}) when demand <= 0 do - Membrane.Logger.debug_verbose(""" - Demand controller: not executing handle_demand as demand is not greater than 0, - demand: #{inspect(demand)} - """) - - false - end - - defp exec_handle_demand?(_pad_data) do - true - end end diff --git a/lib/membrane/core/element/input_queue.ex b/lib/membrane/core/element/manual_flow_controller/input_queue.ex similarity index 97% rename from lib/membrane/core/element/input_queue.ex rename to lib/membrane/core/element/manual_flow_controller/input_queue.ex index a3b3a2e15..8eb1d4ed4 100644 --- a/lib/membrane/core/element/input_queue.ex +++ b/lib/membrane/core/element/manual_flow_controller/input_queue.ex @@ -1,4 +1,4 @@ -defmodule Membrane.Core.Element.InputQueue do +defmodule Membrane.Core.Element.ManualFlowController.InputQueue do @moduledoc false # Queue that is attached to the `:input` pad when working in a `:manual` flow control mode. @@ -54,12 +54,12 @@ defmodule Membrane.Core.Element.InputQueue do defstruct @enforce_keys ++ [size: 0, demand: 0] - @default_target_size_factor 40 + @default_target_size_factor 100 @spec default_min_demand_factor() :: number() def default_min_demand_factor, do: 0.25 - @spec init(%{ + @spec new(%{ inbound_demand_unit: Buffer.Metric.unit(), outbound_demand_unit: Buffer.Metric.unit(), atomic_demand: AtomicDemand.t(), @@ -67,7 +67,7 @@ defmodule Membrane.Core.Element.InputQueue do log_tag: String.t(), target_size: pos_integer() | nil }) :: t() - def init(config) do + def new(config) do %{ inbound_demand_unit: inbound_demand_unit, outbound_demand_unit: outbound_demand_unit, @@ -105,7 +105,8 @@ defmodule Membrane.Core.Element.InputQueue do |> maybe_increase_atomic_demand() end - @spec store(t(), atom(), queue_item() | [queue_item()]) :: t() + @spec store(t(), :buffer | :buffers | :event | :stream_format, queue_item() | [queue_item()]) :: + t() def store(input_queue, type \\ :buffers, v) def store(input_queue, :buffers, v) when is_list(v) do diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index 61530f83f..633d26d9c 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -9,15 +9,16 @@ defmodule Membrane.Core.Element.PadController do alias Membrane.Core.Element.{ ActionHandler, AtomicDemand, + AutoFlowController, CallbackContext, EffectiveFlowController, EventController, - InputQueue, State, StreamFormatController } - alias Membrane.Core.Element.DemandController.AutoFlowUtils + alias Membrane.Core.Element.ManualFlowController.InputQueue + alias Membrane.Core.Parent.Link.Endpoint alias Membrane.LinkError @@ -241,7 +242,7 @@ defmodule Membrane.Core.Element.PadController do |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) |> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref)) - |> AutoFlowUtils.pop_queues_and_bump_demand() + |> AutoFlowController.pop_queues_and_bump_demand() else {:ok, %{availability: :always}} when state.terminating? -> state @@ -335,7 +336,7 @@ defmodule Membrane.Core.Element.PadController do Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_data.ref)) %{direction: :input, flow_control: :auto} -> - AutoFlowUtils.auto_adjust_atomic_demand(endpoint.pad_ref, state) + AutoFlowController.auto_adjust_atomic_demand(endpoint.pad_ref, state) |> Map.update!(:auto_input_pads, &[endpoint.pad_ref | &1]) _pad_data -> @@ -373,7 +374,7 @@ defmodule Membrane.Core.Element.PadController do } = pad_data input_queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: other_pad_info[:demand_unit] || this_demand_unit, outbound_demand_unit: this_demand_unit, atomic_demand: atomic_demand, diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index 102b98a40..d73c29335 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -23,7 +23,7 @@ defmodule Membrane.Core.Element.State do pads_info: PadModel.pads_info() | nil, pads_data: PadModel.pads_data() | nil, parent_pid: pid, - supplying_demand?: boolean(), + delay_demands?: boolean(), delayed_demands: MapSet.t({Pad.ref(), :supply | :redemand}), handle_demand_loop_counter: non_neg_integer(), synchronization: %{ @@ -42,7 +42,6 @@ defmodule Membrane.Core.Element.State do terminating?: boolean(), setup_incomplete?: boolean(), effective_flow_control: EffectiveFlowController.effective_flow_control(), - handling_action?: boolean(), popping_auto_flow_queue?: boolean(), pads_to_snapshot: MapSet.t(), stalker: Membrane.Core.Stalker.t(), @@ -74,8 +73,7 @@ defmodule Membrane.Core.Element.State do initialized?: false, terminating?: false, setup_incomplete?: false, - supplying_demand?: false, - handling_action?: false, + delay_demands?: false, popping_auto_flow_queue?: false, stalker: nil, resource_guard: nil, diff --git a/lib/membrane/core/element/stream_format_controller.ex b/lib/membrane/core/element/stream_format_controller.ex index b22637cc7..3075e9c15 100644 --- a/lib/membrane/core/element/stream_format_controller.ex +++ b/lib/membrane/core/element/stream_format_controller.ex @@ -8,8 +8,16 @@ defmodule Membrane.Core.Element.StreamFormatController do alias Membrane.{Pad, StreamFormat} alias Membrane.Core.{CallbackHandler, Telemetry} alias Membrane.Core.Child.PadModel - alias Membrane.Core.Element.{ActionHandler, CallbackContext, InputQueue, PlaybackQueue, State} - alias Membrane.Core.Element.DemandController.AutoFlowUtils + + alias Membrane.Core.Element.{ + ActionHandler, + AutoFlowController, + CallbackContext, + PlaybackQueue, + State + } + + alias Membrane.Core.Element.ManualFlowController.InputQueue require Membrane.Core.Child.PadModel require Membrane.Core.Telemetry @@ -41,7 +49,7 @@ defmodule Membrane.Core.Element.StreamFormatController do # stream format goes to the auto flow control queue pad_ref in state.awaiting_auto_input_pads -> - AutoFlowUtils.store_stream_format_in_queue(pad_ref, stream_format, state) + AutoFlowController.store_stream_format_in_queue(pad_ref, stream_format, state) true -> exec_handle_stream_format(pad_ref, stream_format, state) diff --git a/lib/membrane/core/pipeline/state.ex b/lib/membrane/core/pipeline/state.ex index 6f644947b..37f24ea2d 100644 --- a/lib/membrane/core/pipeline/state.ex +++ b/lib/membrane/core/pipeline/state.ex @@ -32,7 +32,6 @@ defmodule Membrane.Core.Pipeline.State do terminating?: boolean(), resource_guard: Membrane.ResourceGuard.t(), setup_incomplete?: boolean(), - handling_action?: boolean(), stalker: Membrane.Core.Stalker.t(), subprocess_supervisor: pid(), awaiting_setup_completition?: boolean() @@ -56,7 +55,6 @@ defmodule Membrane.Core.Pipeline.State do initialized?: false, terminating?: false, setup_incomplete?: false, - handling_action?: false, stalker: nil, resource_guard: nil, subprocess_supervisor: nil, diff --git a/lib/membrane/element/pad_data.ex b/lib/membrane/element/pad_data.ex index 3be09d473..98022cef1 100644 --- a/lib/membrane/element/pad_data.ex +++ b/lib/membrane/element/pad_data.ex @@ -56,7 +56,7 @@ defmodule Membrane.Element.PadData do # with input pad, but hasn't been sent yet by the element with output pad. Detects toilet overflow as well. atomic_demand: private_field, - # Field used in DemandController.AutoFlowUtils and InputQueue, to caluclate, how much AtomicDemand should be increased. + # Field used in AutoFlowController and InputQueue, to caluclate, how much AtomicDemand should be increased. # Contains amount of data (:buffers/:bytes), that has been demanded from the element on the other side of link, but # hasn't arrived yet. Unused for output pads. manual_demand_size: private_field, diff --git a/test/membrane/core/element/action_handler_test.exs b/test/membrane/core/element/action_handler_test.exs index 06b1913ae..48456f781 100644 --- a/test/membrane/core/element/action_handler_test.exs +++ b/test/membrane/core/element/action_handler_test.exs @@ -25,7 +25,6 @@ defmodule Membrane.Core.Element.ActionHandlerTest do playback: :stopped, synchronization: %{clock: nil, parent_clock: nil}, delayed_demands: MapSet.new(), - handling_action?: false, pads_to_snapshot: MapSet.new(), pads_data: %{ input: @@ -60,7 +59,7 @@ defmodule Membrane.Core.Element.ActionHandlerTest do setup :demand_test_filter test "delaying demand", %{state: state} do - state = %{state | playback: :playing, supplying_demand?: true} + state = %{state | playback: :playing, delay_demands?: true} state = @module.handle_action({:demand, {:input, 10}}, :handle_info, %{}, state) assert state.pads_data.input.manual_demand_size == 10 assert MapSet.new([{:input, :supply}]) == state.delayed_demands @@ -110,7 +109,6 @@ defmodule Membrane.Core.Element.ActionHandlerTest do synchronization: %{clock: nil, parent_clock: nil}, delayed_demands: MapSet.new(), playback: :stopped, - handling_action?: false, pads_to_snapshot: MapSet.new(), pads_data: %{ output: %{ @@ -489,7 +487,7 @@ defmodule Membrane.Core.Element.ActionHandlerTest do test "when pad works in auto or manual flow control mode", %{state: state} do state = - %{state | supplying_demand?: true, playback: :playing} + %{state | delay_demands?: true, playback: :playing} |> PadModel.set_data!(:output, :flow_control, :manual) new_state = @@ -512,7 +510,6 @@ defmodule Membrane.Core.Element.ActionHandlerTest do name: :elem_name, synchronization: %{clock: nil, parent_clock: nil}, type: :source, - handling_action?: false, pads_to_snapshot: MapSet.new(), pads_data: %{ output: %{ diff --git a/test/membrane/core/element/event_controller_test.exs b/test/membrane/core/element/event_controller_test.exs index 27928d271..ef32c93e7 100644 --- a/test/membrane/core/element/event_controller_test.exs +++ b/test/membrane/core/element/event_controller_test.exs @@ -1,7 +1,8 @@ defmodule Membrane.Core.Element.EventControllerTest do use ExUnit.Case, async: true - alias Membrane.Core.Element.{AtomicDemand, EventController, InputQueue, State} + alias Membrane.Core.Element.{AtomicDemand, EventController, State} + alias Membrane.Core.Element.ManualFlowController.InputQueue alias Membrane.Core.Events alias Membrane.Core.SubprocessSupervisor alias Membrane.Event @@ -32,7 +33,7 @@ defmodule Membrane.Core.Element.EventControllerTest do }) input_queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :buffers, pad_ref: :some_pad, @@ -50,8 +51,7 @@ defmodule Membrane.Core.Element.EventControllerTest do playback: :playing, parent_pid: self(), synchronization: %{clock: nil, parent_clock: nil, stream_sync: nil}, - handling_action?: false, - supplying_demand?: false, + delay_demands?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), handle_demand_loop_counter: 0, diff --git a/test/membrane/core/element/input_queue_test.exs b/test/membrane/core/element/input_queue_test.exs index efa95c490..0bef78f60 100644 --- a/test/membrane/core/element/input_queue_test.exs +++ b/test/membrane/core/element/input_queue_test.exs @@ -2,7 +2,8 @@ defmodule Membrane.Core.Element.InputQueueTest do use ExUnit.Case, async: true alias Membrane.Buffer - alias Membrane.Core.Element.{AtomicDemand, InputQueue} + alias Membrane.Core.Element.AtomicDemand + alias Membrane.Core.Element.ManualFlowController.InputQueue alias Membrane.Core.Message alias Membrane.Core.SubprocessSupervisor alias Membrane.Testing.Event @@ -24,7 +25,7 @@ defmodule Membrane.Core.Element.InputQueueTest do end test "return InputQueue struct and send demand message", context do - assert InputQueue.init(%{ + assert InputQueue.new(%{ inbound_demand_unit: context.inbound_demand_unit, outbound_demand_unit: context.outbound_demand_unit, pad_ref: context.pad_ref, @@ -186,7 +187,7 @@ defmodule Membrane.Core.Element.InputQueueTest do describe ".take/2 should" do setup do input_queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :buffers, pad_ref: :output_pad_ref, @@ -301,7 +302,7 @@ defmodule Membrane.Core.Element.InputQueueTest do atomic_demand = new_atomic_demand() queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :bytes, outbound_demand_unit: :buffers, atomic_demand: atomic_demand, @@ -338,7 +339,7 @@ defmodule Membrane.Core.Element.InputQueueTest do atomic_demand = new_atomic_demand() queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :bytes, atomic_demand: atomic_demand, diff --git a/test/membrane/core/element/lifecycle_controller_test.exs b/test/membrane/core/element/lifecycle_controller_test.exs index 3a9be602b..c7397f6fe 100644 --- a/test/membrane/core/element/lifecycle_controller_test.exs +++ b/test/membrane/core/element/lifecycle_controller_test.exs @@ -1,7 +1,8 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do use ExUnit.Case, async: true - alias Membrane.Core.Element.{AtomicDemand, InputQueue, LifecycleController, State} + alias Membrane.Core.Element.{AtomicDemand, LifecycleController, State} + alias Membrane.Core.Element.ManualFlowController.InputQueue alias Membrane.Core.{ Message, @@ -32,7 +33,7 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do }) input_queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :buffers, atomic_demand: atomic_demand, @@ -49,8 +50,7 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do playback: :playing, parent_pid: self(), synchronization: %{clock: nil, parent_clock: nil}, - handling_action?: false, - supplying_demand?: false, + delay_demands?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), pads_data: %{ diff --git a/test/membrane/core/element/pad_controller_test.exs b/test/membrane/core/element/pad_controller_test.exs index 82b6a5926..4537a21bf 100644 --- a/test/membrane/core/element/pad_controller_test.exs +++ b/test/membrane/core/element/pad_controller_test.exs @@ -18,8 +18,7 @@ defmodule Membrane.Core.Element.PadControllerTest do struct!(State, name: name, module: elem_module, - handling_action?: false, - supplying_demand?: false, + delay_demands?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), parent_pid: self(), diff --git a/test/membrane/core/element/stream_format_controller_test.exs b/test/membrane/core/element/stream_format_controller_test.exs index d94e93030..766bb01d2 100644 --- a/test/membrane/core/element/stream_format_controller_test.exs +++ b/test/membrane/core/element/stream_format_controller_test.exs @@ -3,7 +3,8 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do alias Membrane.Buffer alias Membrane.Core.Message - alias Membrane.Core.Element.{AtomicDemand, InputQueue, State} + alias Membrane.Core.Element.{AtomicDemand, State} + alias Membrane.Core.Element.ManualFlowController.InputQueue alias Membrane.Core.SubprocessSupervisor alias Membrane.StreamFormat.Mock, as: MockStreamFormat alias Membrane.Support.DemandsTest.Filter @@ -25,7 +26,7 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do }) input_queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :buffers, atomic_demand: atomic_demand, @@ -41,8 +42,7 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do type: :filter, playback: :playing, synchronization: %{clock: nil, parent_clock: nil}, - handling_action?: false, - supplying_demand?: false, + delay_demands?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), handle_demand_loop_counter: 0, diff --git a/test/membrane/core/pipeline_test.exs b/test/membrane/core/pipeline_test.exs index d59fc2b2c..5ef8f2f7d 100644 --- a/test/membrane/core/pipeline_test.exs +++ b/test/membrane/core/pipeline_test.exs @@ -79,7 +79,6 @@ defmodule Membrane.Core.PipelineTest do [], state ) - |> ActionHandler.handle_end_of_actions() end end @@ -93,7 +92,6 @@ defmodule Membrane.Core.PipelineTest do [], state ) - |> ActionHandler.handle_end_of_actions() end end end diff --git a/test/membrane/integration/auto_demands_test.exs b/test/membrane/integration/auto_demands_test.exs index 2a25604a2..44f5c7e93 100644 --- a/test/membrane/integration/auto_demands_test.exs +++ b/test/membrane/integration/auto_demands_test.exs @@ -339,7 +339,7 @@ defmodule Membrane.Integration.AutoDemandsTest do end test "when there is no demand on the output pad", %{pipeline: pipeline} do - manual_flow_queue_size = 40 + manual_flow_queue_size = 100 assert_pipeline_notified(pipeline, :filter, :playing) @@ -374,7 +374,7 @@ defmodule Membrane.Integration.AutoDemandsTest do test "when an element returns :pause_auto_demand and :resume_auto_demand action", %{ pipeline: pipeline } do - manual_flow_queue_size = 40 + manual_flow_queue_size = 100 auto_flow_demand_size = 400 assert_pipeline_notified(pipeline, :filter, :playing) diff --git a/test/membrane/integration/delayed_demands_loop_test.exs b/test/membrane/integration/delayed_demands_loop_test.exs index 7688cc744..da614c980 100644 --- a/test/membrane/integration/delayed_demands_loop_test.exs +++ b/test/membrane/integration/delayed_demands_loop_test.exs @@ -80,4 +80,100 @@ defmodule Membrane.Test.DelayedDemandsLoopTest do Testing.Pipeline.terminate(pipeline) end + + defmodule VariousFlowFilter do + use Membrane.Filter + + def_input_pad :manual_input, + accepted_format: _any, + flow_control: :manual, + demand_unit: :buffers + + def_input_pad :auto_input, accepted_format: _any, flow_control: :auto + + def_output_pad :manual_output, accepted_format: _any, flow_control: :manual + def_output_pad :auto_output, accepted_format: _any, flow_control: :auto + + defmodule StreamFormat do + defstruct [] + end + + @impl true + def handle_playing(_ctx, _state) do + actions = + [:manual_output, :auto_output] + |> Enum.map(&{:stream_format, {&1, %StreamFormat{}}}) + + {actions, %{}} + end + + @impl true + def handle_demand(:manual_output, size, :buffers, _ctx, state) do + {[demand: {:manual_input, size}], state} + end + + @impl true + def handle_buffer(_pad, buffer, _ctx, state) do + # Aim of this Process.sleep is to make VariousFlowFilter working slower than Testing.Sinks + Process.sleep(1) + + actions = + [:manual_output, :auto_output] + |> Enum.map(&{:buffer, {&1, buffer}}) + + {actions, state} + end + + @impl true + def handle_end_of_stream(_pad, _ctx, state) do + {[], state} + end + end + + test "manual pad doesn't starve auto pad" do + buffers_per_source = 10_000 + input_demand_size = 100 + + manual_source_buffers = + Stream.repeatedly(fn -> %Buffer{metadata: :manual, payload: <<>>} end) + |> Stream.take(buffers_per_source) + + auto_source_buffers = + Stream.repeatedly(fn -> %Buffer{metadata: :auto, payload: <<>>} end) + |> Stream.take(buffers_per_source) + + pipeline = + Testing.Pipeline.start_link_supervised!( + spec: [ + child(:manual_source, %Testing.Source{output: manual_source_buffers}) + |> via_in(:manual_input, target_queue_size: input_demand_size) + |> child(:filter, VariousFlowFilter) + |> via_out(:manual_output) + |> child(:manual_sink, Testing.Sink), + child(:auto_source, %Testing.Source{output: auto_source_buffers}) + |> via_in(:auto_input, auto_demand_size: input_demand_size) + |> get_child(:filter) + |> via_out(:auto_output) + |> child(:auto_sink, Testing.Sink) + ] + ) + + stats = %{manual: 0, auto: 0} + + Enum.reduce(1..10_000, stats, fn _i, stats -> + assert_sink_buffer(pipeline, :auto_sink, buffer) + stats = Map.update!(stats, buffer.metadata, &(&1 + 1)) + + difference_upperbound = + max(stats.auto, stats.manual) + |> div(2) + |> max(5 * input_demand_size) + + assert abs(stats.auto - stats.manual) <= difference_upperbound + + stats + end) + + Testing.Pipeline.terminate(pipeline) + end end