Skip to content

Commit

Permalink
Refactor demand mechanism (#783)
Browse files Browse the repository at this point in the history
* Comment out handling_action? flag

* Add new delayed demands loop tests

* supplying_demand? -> delay_demands?

* Delete unncessary flags, rename some modules

* Introduce ManualFlowController

* Introduce AutoFlowController

* Format

* InputQueue -> ManualFlowController.InputQueue

* Bump default input queue size to 100

* Fix tests

* Remove unnecessary comments

* Remove leftovers

* Clean up aliases

* format

* Implement comments from CR

* Fix test performance

* Fix performance test

* Implement suggestion from CR
  • Loading branch information
FelonEkonom authored Apr 3, 2024
1 parent ef91c4d commit fb10564
Show file tree
Hide file tree
Showing 29 changed files with 310 additions and 222 deletions.
5 changes: 3 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,14 @@ jobs:
steps:
- attach_workspace:
at: .
- run: cd app; git add -A; git reset --hard; cd ..
- run: cp -r benchmark/ ~/benchmark_backup/
- run: cp mix.exs ~/benchmark_backup/
- run: docker pull membraneframeworklabs/docker_membrane
- run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, run benchmark/run.exs /root/results/feature_branch_results
- run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, compile --force, run benchmark/run.exs /root/results/feature_branch_results
- run: git checkout -f master
- run: cp ~/benchmark_backup/mix.exs ~/app
- run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -v ~/benchmark_backup/benchmark:/root/app/benchmark -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, run benchmark/run.exs /root/results/master_results
- run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -v ~/benchmark_backup/benchmark:/root/app/benchmark -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, compile --force, run benchmark/run.exs /root/results/master_results
- run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -v ~/benchmark_backup/benchmark:/root/app/benchmark -w /root/app membraneframeworklabs/docker_membrane mix run benchmark/compare.exs /root/results/feature_branch_results /root/results/master_results
- run:
command: rm ~/results/*
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/children_spec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ defmodule Membrane.ChildrenSpec do
Membrane won't send smaller demand than `minimal demand`, to reduce demands' overhead. However, the user will always receive
as many buffers, as demanded, all excess buffers will be queued internally.
Used only for pads working in `:manual` flow control mode. See `t:Membrane.Pad.flow_control/0`
for more info. Defaults to `#{Membrane.Core.Element.InputQueue.default_min_demand_factor()}` (the default may change in the future).
for more info. Defaults to `#{Membrane.Core.Element.ManualFlowController.InputQueue.default_min_demand_factor()}` (the default may change in the future).
- `auto_demand_size` - Size of automatically generated demands. Used only for pads working in `:auto` flow control mode.
See `t:Membrane.Pad.flow_control/0` for more info.
- `throttling_factor` - an integer specifying how frequently should a sender update the number of buffers in the `Toilet`. Defaults to 1,
Expand Down
2 changes: 0 additions & 2 deletions lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ defmodule Membrane.Core.Bin do

if node do
result = :rpc.call(node, GenServer, :start, [__MODULE__, options])

# TODO: use an atomic way of linking once https://github.com/erlang/otp/issues/6375 is solved
with {:start_link, {:ok, pid}} <- {method, result}, do: Process.link(pid)
result
else
Expand Down
2 changes: 0 additions & 2 deletions lib/membrane/core/bin/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ defmodule Membrane.Core.Bin.State do
terminating?: boolean(),
resource_guard: Membrane.ResourceGuard.t(),
setup_incomplete?: boolean(),
handling_action?: boolean(),
stalker: Membrane.Core.Stalker.t()
}

Expand Down Expand Up @@ -73,7 +72,6 @@ defmodule Membrane.Core.Bin.State do
initialized?: false,
terminating?: false,
setup_incomplete?: false,
handling_action?: false,
stalker: nil,
resource_guard: nil,
subprocess_supervisor: nil,
Expand Down
21 changes: 0 additions & 21 deletions lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ defmodule Membrane.Core.CallbackHandler do
use Bunch

alias Membrane.CallbackError
alias Membrane.Core.Component

require Membrane.Logger

Expand Down Expand Up @@ -189,16 +188,6 @@ defmodule Membrane.Core.CallbackHandler do
reraise e, __STACKTRACE__
end

was_handling_action? = state.handling_action?
state = %{state | handling_action?: true}

# Updating :supplying_demand? flag value here is a temporal fix.
# Setting it to `true` while handling actions causes postponing calls
# of handle_redemand/2 and supply_demand/2 until a moment, when all
# actions returned from the callback are handled
was_supplying_demand? = Map.get(state, :supplying_demand?, false)
state = if Component.is_element?(state), do: %{state | supplying_demand?: true}, else: state

state =
Enum.reduce(actions, state, fn action, state ->
try do
Expand All @@ -213,16 +202,6 @@ defmodule Membrane.Core.CallbackHandler do
end
end)

state =
if was_handling_action?,
do: state,
else: %{state | handling_action?: false}

state =
if Component.is_element?(state) and not was_supplying_demand?,
do: %{state | supplying_demand?: false},
else: state

handler_module.handle_end_of_actions(state)
end
end
3 changes: 2 additions & 1 deletion lib/membrane/core/child/pad_model.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Membrane.Core.Child.PadModel do

alias Membrane.Core.Child
alias Membrane.Core.Element.EffectiveFlowController
alias Membrane.Core.Element.ManualFlowController.InputQueue
alias Membrane.{Pad, UnknownPadError}

@type bin_pad_data :: %Membrane.Bin.PadData{
Expand Down Expand Up @@ -39,7 +40,7 @@ defmodule Membrane.Core.Child.PadModel do
pid: pid,
other_ref: Pad.ref(),
sticky_messages: [Membrane.Event.t()],
input_queue: Membrane.Core.Element.InputQueue.t() | nil,
input_queue: InputQueue.t() | nil,
options: %{optional(atom) => any},
auto_demand_size: pos_integer() | nil,
sticky_events: [Membrane.Event.t()],
Expand Down
8 changes: 3 additions & 5 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ defmodule Membrane.Core.Element do
alias Membrane.Core.Element.{
BufferController,
DemandController,
DemandHandler,
EffectiveFlowController,
EventController,
LifecycleController,
ManualFlowController,
PadController,
State,
StreamFormatController
Expand Down Expand Up @@ -85,8 +85,6 @@ defmodule Membrane.Core.Element do
# rpc if necessary
if node do
result = :rpc.call(node, GenServer, :start, [__MODULE__, options])

# TODO: use an atomic way of linking once https://github.com/erlang/otp/issues/6375 is solved
with {:start_link, {:ok, pid}} <- {method, result}, do: Process.link(pid)
result
else
Expand Down Expand Up @@ -211,13 +209,13 @@ defmodule Membrane.Core.Element do
end

defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do
state = DemandHandler.resume_delayed_demands_loop(state)
state = ManualFlowController.resume_delayed_demands_loop(state)
{:noreply, state}
end

defp do_handle_info(Message.new(:buffer, buffers, _opts) = msg, state) do
pad_ref = Message.for_pad(msg)
state = BufferController.handle_buffer(pad_ref, buffers, state)
state = BufferController.handle_incoming_buffers(pad_ref, buffers, state)
{:noreply, state}
end

Expand Down
64 changes: 30 additions & 34 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ defmodule Membrane.Core.Element.ActionHandler do
}

alias Membrane.Core.Element.{
AutoFlowController,
DemandController,
DemandHandler,
ManualFlowController,
State,
StreamFormatController
}

alias Membrane.Core.Element.DemandController.AutoFlowUtils
alias Membrane.Core.{Events, TimerController}
alias Membrane.Element.Action

Expand All @@ -50,34 +50,29 @@ defmodule Membrane.Core.Element.ActionHandler do
# Fixed order of handling demand of manual and auto pads would lead to
# favoring manual pads over auto pads (or vice versa), especially after
# introducting auto flow queues.
manual_demands_first? = Enum.random([1, 2]) == 1

state =
if manual_demands_first?,
do: maybe_handle_delayed_demands(state),
else: state

state = maybe_handle_pads_to_snapshot(state)

state =
if manual_demands_first?,
do: state,
else: maybe_handle_delayed_demands(state)

state
if Enum.random([true, false]) do
state
|> handle_pads_to_snapshot()
|> maybe_handle_delayed_demands()
else
state
|> maybe_handle_delayed_demands()
|> handle_pads_to_snapshot()
end
end

defp maybe_handle_delayed_demands(state) do
with %{supplying_demand?: false} <- state do
DemandHandler.handle_delayed_demands(state)
with %{delay_demands?: false} <- state do
ManualFlowController.handle_delayed_demands(state)
end
end

defp maybe_handle_pads_to_snapshot(state) do
with %{handling_action?: false} <- state do
Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2)
|> Map.put(:pads_to_snapshot, MapSet.new())
end
defp handle_pads_to_snapshot(state) do
state.pads_to_snapshot
|> Enum.shuffle()
|> Enum.reduce(state, &DemandController.snapshot_atomic_demand/2)
|> Map.put(:pads_to_snapshot, MapSet.new())
end

@impl CallbackHandler
Expand Down Expand Up @@ -178,13 +173,13 @@ defmodule Membrane.Core.Element.ActionHandler do
@impl CallbackHandler
def handle_action({:pause_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] do
DemandController.AutoFlowUtils.pause_demands(in_ref, state)
AutoFlowController.pause_demands(in_ref, state)
end

@impl CallbackHandler
def handle_action({:resume_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] do
DemandController.AutoFlowUtils.resume_demands(in_ref, state)
AutoFlowController.resume_demands(in_ref, state)
end

@impl CallbackHandler
Expand Down Expand Up @@ -239,7 +234,7 @@ defmodule Membrane.Core.Element.ActionHandler do
%State{type: type} = state
)
when is_pad_ref(pad_ref) and is_demand_size(size) and type in [:sink, :filter, :endpoint] do
supply_demand(pad_ref, size, state)
delay_supplying_demand(pad_ref, size, state)
end

@impl CallbackHandler
Expand Down Expand Up @@ -396,26 +391,27 @@ defmodule Membrane.Core.Element.ActionHandler do
end
end

@spec supply_demand(
@spec delay_supplying_demand(
Pad.ref(),
Action.demand_size(),
State.t()
) :: State.t()
defp supply_demand(pad_ref, 0, state) do
defp delay_supplying_demand(pad_ref, 0, state) do
Membrane.Logger.debug_verbose("Ignoring demand of size of 0 on pad #{inspect(pad_ref)}")
state
end

defp supply_demand(pad_ref, size, _state)
defp delay_supplying_demand(pad_ref, size, _state)
when is_integer(size) and size < 0 do
raise ElementError,
"Tried to request a negative demand of size #{inspect(size)} on pad #{inspect(pad_ref)}"
end

defp supply_demand(pad_ref, size, state) do
defp delay_supplying_demand(pad_ref, size, state) do
with %{direction: :input, flow_control: :manual} <-
PadModel.get_data!(state, pad_ref) do
DemandHandler.supply_demand(pad_ref, size, state)
state = ManualFlowController.update_demand(pad_ref, size, state)
ManualFlowController.delay_supplying_demand(pad_ref, state)
else
%{direction: :output} ->
raise PadDirectionError, action: :demand, direction: :output, pad: pad_ref
Expand All @@ -435,7 +431,7 @@ defmodule Membrane.Core.Element.ActionHandler do
when type in [:source, :filter, :endpoint] do
with %{direction: :output, flow_control: :manual} <-
PadModel.get_data!(state, pad_ref) do
DemandHandler.handle_redemand(pad_ref, state)
ManualFlowController.delay_redemand(pad_ref, state)
else
%{direction: :input} ->
raise ElementError, "Tried to make a redemand on input pad #{inspect(pad_ref)}"
Expand Down Expand Up @@ -471,10 +467,10 @@ defmodule Membrane.Core.Element.ActionHandler do
@spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t()
defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do
with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do
DemandHandler.remove_pad_from_delayed_demands(pad_ref, state)
ManualFlowController.remove_pad_from_delayed_demands(pad_ref, state)
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
|> PadModel.set_data!(pad_ref, :end_of_stream?, true)
|> AutoFlowUtils.pop_queues_and_bump_demand()
|> AutoFlowController.pop_queues_and_bump_demand()
else
%{direction: :input} ->
raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
defmodule Membrane.Core.Element.AutoFlowController do
@moduledoc false

alias Membrane.Buffer
Expand Down
38 changes: 22 additions & 16 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,28 @@ defmodule Membrane.Core.Element.BufferController do

alias Membrane.Core.Element.{
ActionHandler,
AutoFlowController,
CallbackContext,
DemandHandler,
EventController,
InputQueue,
ManualFlowController,
PlaybackQueue,
State
}

alias Membrane.Core.Element.DemandController.AutoFlowUtils
alias Membrane.Core.Element.ManualFlowController.InputQueue

alias Membrane.Core.Telemetry

require Membrane.Core.Child.PadModel
require Membrane.Core.Telemetry

@doc """
Handles incoming buffer: either stores it in InputQueue, or executes element's
callback. Also calls `Membrane.Core.Element.DemandHandler.supply_demand/2`
callback. Also calls `Membrane.Core.Element.ManualFlowController.supply_demand/2`
to check if there are any unsupplied demands.
"""
@spec handle_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t()
def handle_buffer(pad_ref, buffers, state) do
@spec handle_incoming_buffers(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t()
def handle_incoming_buffers(pad_ref, buffers, state) do
withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref),
playback: %State{playback: :playing} <- state do
%{
Expand All @@ -49,48 +50,53 @@ defmodule Membrane.Core.Element.BufferController do
EventController.handle_start_of_stream(pad_ref, state)
end

do_handle_buffer(pad_ref, data, buffers, state)
do_handle_incoming_buffers(pad_ref, data, buffers, state)
else
pad: {:error, :unknown_pad} ->
# We've got a buffer from already unlinked pad
state

playback: _playback ->
PlaybackQueue.store(&handle_buffer(pad_ref, buffers, &1), state)
PlaybackQueue.store(&handle_incoming_buffers(pad_ref, buffers, &1), state)
end
end

@spec do_handle_buffer(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) ::
@spec do_handle_incoming_buffers(
Pad.ref(),
PadModel.pad_data(),
[Buffer.t()] | Buffer.t(),
State.t()
) ::
State.t()
defp do_handle_buffer(pad_ref, %{flow_control: :auto} = data, buffers, state) do
defp do_handle_incoming_buffers(pad_ref, %{flow_control: :auto} = data, buffers, state) do
%{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data
buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers)

state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
:atomics.put(stalker_metrics.demand, 1, demand - buf_size)

if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do
AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state)
AutoFlowController.store_buffers_in_queue(pad_ref, buffers, state)
else
state = exec_buffer_callback(pad_ref, buffers, state)
AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
AutoFlowController.auto_adjust_atomic_demand(pad_ref, state)
end
end

defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do
defp do_handle_incoming_buffers(pad_ref, %{flow_control: :manual} = data, buffers, state) do
%{input_queue: old_input_queue} = data

input_queue = InputQueue.store(old_input_queue, buffers)
state = PadModel.set_data!(state, pad_ref, :input_queue, input_queue)

if old_input_queue |> InputQueue.empty?() do
DemandHandler.supply_demand(pad_ref, state)
if InputQueue.empty?(old_input_queue) do
ManualFlowController.supply_demand(pad_ref, state)
else
state
end
end

defp do_handle_buffer(pad_ref, %{flow_control: :push}, buffers, state) do
defp do_handle_incoming_buffers(pad_ref, %{flow_control: :push}, buffers, state) do
exec_buffer_callback(pad_ref, buffers, state)
end

Expand Down
Loading

0 comments on commit fb10564

Please sign in to comment.