Skip to content

Commit

Permalink
Modify 5s linking timeout constraint (#810)
Browse files Browse the repository at this point in the history
* Modify 5s linking timeout constraint

* Make dialyzer happy

* Maybe fix bug occuring in FishJam

* Fix linking timeout mechanism

* Remove leftover

* Implement suggestions from CR & delete unnecessary state field

* Remove leftover

* Satisfy lint

* with -> case
  • Loading branch information
FelonEkonom authored Jun 3, 2024
1 parent e2c282f commit 14d8018
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 55 deletions.
8 changes: 6 additions & 2 deletions lib/membrane/bin/pad_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ defmodule Membrane.Bin.PadData do
link_id: private_field,
endpoint: private_field,
linked?: private_field,
response_received?: private_field
response_received?: private_field,
linking_timeout_id: private_field,
linked_in_spec?: private_field
}

@enforce_keys [
Expand All @@ -40,7 +42,9 @@ defmodule Membrane.Bin.PadData do
:endpoint,
:linked?,
:response_received?,
:spec_ref
:spec_ref,
:linking_timeout_id,
:linked_in_spec?
]

defstruct @enforce_keys
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ defmodule Membrane.Core.Bin do
{:noreply, state}
end

defp do_handle_info(Message.new(:linking_timeout, pad_ref), state) do
PadController.handle_linking_timeout(pad_ref, state)
defp do_handle_info(Message.new(:linking_timeout, [pad_ref, linking_timeout_id]), state) do
:ok = PadController.handle_linking_timeout(pad_ref, linking_timeout_id, state)
{:noreply, state}
end

Expand Down
69 changes: 38 additions & 31 deletions lib/membrane/core/bin/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ defmodule Membrane.Core.Bin.PadController do
alias Membrane.{Core, LinkError, Pad}
alias Membrane.Core.Bin.{ActionHandler, CallbackContext, State}
alias Membrane.Core.{CallbackHandler, Child, Message}
alias Membrane.Core.Child.PadModel
alias Membrane.Core.Element.StreamFormatController
alias Membrane.Core.Parent.{ChildLifeController, Link, SpecificationParser}

require Membrane.Core.Child.PadModel
require Membrane.Core.Child.PadModel, as: PadModel
require Membrane.Core.Message
require Membrane.Logger
require Membrane.Pad
Expand Down Expand Up @@ -50,8 +49,7 @@ defmodule Membrane.Core.Bin.PadController do
state =
case PadModel.get_data(state, pad_ref) do
{:error, :unknown_pad} ->
init_pad_data(pad_ref, pad_info, state)
|> Map.update!(:pad_refs, &[pad_ref | &1])
init_pad_data(pad_ref, state)

# This case is for pads that were instantiated before the external link request,
# that is in the internal link request (see `handle_internal_link_request/4`).
Expand All @@ -69,17 +67,19 @@ defmodule Membrane.Core.Bin.PadController do
state
end

state = PadModel.update_data!(state, pad_ref, &%{&1 | link_id: link_id, options: pad_options})
state = maybe_handle_pad_added(pad_ref, state)
linking_timeout_id = make_ref()

unless PadModel.get_data!(state, pad_ref, :endpoint) do
# If there's no endpoint associated to the pad, no internal link to the pad
# has been requested in the bin yet
_ref = Process.send_after(self(), Message.new(:linking_timeout, pad_ref), 5000)
:ok
end
state =
PadModel.update_data!(
state,
pad_ref,
&%{&1 | link_id: link_id, linking_timeout_id: linking_timeout_id, options: pad_options}
)

state
message = Message.new(:linking_timeout, [pad_ref, linking_timeout_id])
_ref = Process.send_after(self(), message, 5000)

maybe_handle_pad_added(pad_ref, state)
end

@spec remove_pad(Pad.ref(), State.t()) :: State.t()
Expand All @@ -102,16 +102,16 @@ defmodule Membrane.Core.Bin.PadController do
end
end

@spec handle_linking_timeout(Pad.ref(), State.t()) :: :ok | no_return()
def handle_linking_timeout(pad_ref, state) do
case PadModel.get_data(state, pad_ref) do
{:ok, %{endpoint: nil} = pad_data} ->
raise Membrane.LinkError,
"Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{inspect(pad_data, pretty: true)}"

_other ->
:ok
@spec handle_linking_timeout(Pad.ref(), reference(), State.t()) :: :ok | no_return()
def handle_linking_timeout(pad_ref, linking_timeout_id, state) do
with {:ok, pad_data} <- PadModel.get_data(state, pad_ref),
%{linking_timeout_id: ^linking_timeout_id, linked_in_spec?: false} <- pad_data do
raise Membrane.LinkError, """
Bin pad #{inspect(pad_ref)} wasn't linked internally within timeout. Pad data: #{PadModel.get_data(state, pad_ref) |> inspect(pretty: true)}
"""
end

:ok
end

@doc """
Expand Down Expand Up @@ -139,7 +139,7 @@ defmodule Membrane.Core.Bin.PadController do

# Static pads can be linked internally before the external link request
pad_info.availability == :always ->
init_pad_data(pad_ref, pad_info, state)
init_pad_data(pad_ref, state)

true ->
raise LinkError, "Dynamic pads must be firstly linked externally, then internally"
Expand Down Expand Up @@ -284,7 +284,6 @@ defmodule Membrane.Core.Bin.PadController do
with {:ok, %{availability: :on_request}} <- PadModel.get_data(state, pad_ref) do
{pad_data, state} =
maybe_handle_pad_removed(pad_ref, state)
|> Map.update!(:pad_refs, &List.delete(&1, pad_ref))
|> PadModel.pop_data!(pad_ref)

if pad_data.endpoint do
Expand Down Expand Up @@ -316,8 +315,8 @@ defmodule Membrane.Core.Bin.PadController do
end

@spec maybe_handle_pad_added(Pad.ref(), Core.Bin.State.t()) :: Core.Bin.State.t()
defp maybe_handle_pad_added(ref, state) do
%{options: pad_opts, availability: availability} = PadModel.get_data!(state, ref)
defp maybe_handle_pad_added(pad_ref, state) do
%{options: pad_opts, availability: availability} = PadModel.get_data!(state, pad_ref)

if Pad.availability_mode(availability) == :dynamic do
context = &CallbackContext.from_state(&1, pad_options: pad_opts)
Expand All @@ -326,7 +325,7 @@ defmodule Membrane.Core.Bin.PadController do
:handle_pad_added,
ActionHandler,
%{context: context},
[ref],
[pad_ref],
state
)
else
Expand All @@ -351,9 +350,15 @@ defmodule Membrane.Core.Bin.PadController do
end
end

defp init_pad_data(pad_ref, pad_info, state) do
@spec init_pad_data(Pad.ref(), State.t()) :: State.t()
def init_pad_data(pad_ref, state) do
if PadModel.assert_instance(state, pad_ref) == :ok do
raise "Cannot init pad data for pad #{inspect(pad_ref)}, because it already exists"
end

pad_data =
pad_info
state.pads_info
|> Map.get(Pad.name_by_ref(pad_ref))
|> Map.delete(:accepted_formats_str)
|> Map.merge(%{
ref: pad_ref,
Expand All @@ -362,10 +367,12 @@ defmodule Membrane.Core.Bin.PadController do
linked?: false,
response_received?: false,
spec_ref: nil,
options: nil
options: nil,
linking_timeout_id: nil,
linked_in_spec?: false
})
|> then(&struct!(Membrane.Bin.PadData, &1))

put_in(state, [:pads_data, pad_ref], pad_data)
put_in(state.pads_data[pad_ref], pad_data)
end
end
4 changes: 1 addition & 3 deletions lib/membrane/core/bin/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Membrane.Core.Bin.State do
use Bunch
use Bunch.Access

alias Membrane.{Child, Clock, Pad, Sync}
alias Membrane.{Child, Clock, Sync}
alias Membrane.Core.Child.PadModel
alias Membrane.Core.Parent.ChildLifeController
alias Membrane.Core.Parent.{ChildrenModel, CrashGroup, Link}
Expand All @@ -20,7 +20,6 @@ defmodule Membrane.Core.Bin.State do
children: ChildrenModel.children(),
subprocess_supervisor: pid(),
name: Membrane.Bin.name() | nil,
pad_refs: [Pad.ref()],
pads_info: PadModel.pads_info() | nil,
pads_data: PadModel.pads_data() | nil,
parent_pid: pid,
Expand Down Expand Up @@ -62,7 +61,6 @@ defmodule Membrane.Core.Bin.State do
parent_pid: nil,
playback: :stopped,
internal_state: nil,
pad_refs: [],
pads_info: nil,
children: %{},
links: %{},
Expand Down
3 changes: 1 addition & 2 deletions lib/membrane/core/child/pad_spec_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ defmodule Membrane.Core.Child.PadSpecHandler do
| pads_info:
get_pads(state)
|> Map.new(),
pads_data: %{},
pad_refs: []
pads_data: %{}
}
end

Expand Down
8 changes: 2 additions & 6 deletions lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,7 @@ defmodule Membrane.Core.Element.PadController do
state = generate_eos_if_needed(pad_ref, state)
state = maybe_handle_pad_removed(pad_ref, state)

{pad_data, state} =
Map.update!(state, :pad_refs, &List.delete(&1, pad_ref))
|> PadModel.pop_data!(pad_ref)
{pad_data, state} = PadModel.pop_data!(state, pad_ref)

with %{direction: :input, flow_control: :auto, other_effective_flow_control: :pull} <-
pad_data do
Expand Down Expand Up @@ -321,9 +319,7 @@ defmodule Membrane.Core.Element.PadController do
|> then(&struct!(Membrane.Element.PadData, &1))

state =
state
|> put_in([:pads_data, endpoint.pad_ref], pad_data)
|> Map.update!(:pad_refs, &[endpoint.pad_ref | &1])
put_in(state, [:pads_data, endpoint.pad_ref], pad_data)

:ok =
AtomicDemand.set_sender_status(
Expand Down
2 changes: 0 additions & 2 deletions lib/membrane/core/element/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ defmodule Membrane.Core.Element.State do
type: Element.type(),
name: Element.name(),
internal_state: Element.state() | nil,
pad_refs: [Pad.ref()] | nil,
pads_info: PadModel.pads_info() | nil,
pads_data: PadModel.pads_data() | nil,
parent_pid: pid,
Expand Down Expand Up @@ -65,7 +64,6 @@ defmodule Membrane.Core.Element.State do
playback: :stopped,
type: nil,
internal_state: nil,
pad_refs: [],
pads_info: %{},
synchronization: nil,
delayed_demands: MapSet.new(),
Expand Down
35 changes: 30 additions & 5 deletions lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
alias __MODULE__.{CrashGroupUtils, LinkUtils, StartupUtils}
alias Membrane.{Child, ChildrenSpec}
alias Membrane.Core.{Bin, CallbackHandler, Component, Parent, Pipeline}
alias Membrane.Core.Bin.PadController

alias Membrane.Core.Parent.{
ChildEntryParser,
Expand All @@ -17,6 +18,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
alias Membrane.Pad
alias Membrane.ParentError

require Membrane.Core.Child.PadModel, as: PadModel
require Membrane.Core.Component
require Membrane.Core.Message, as: Message
require Membrane.Logger
Expand Down Expand Up @@ -154,7 +156,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do

state =
put_in(state, [:pending_specs, spec_ref], %{
status: :initializing,
status: :created,
children_names: MapSet.new(all_children_names),
links_ids: Enum.map(links, & &1.id),
dependent_specs: dependent_specs,
Expand Down Expand Up @@ -309,14 +311,37 @@ defmodule Membrane.Core.Parent.ChildLifeController do
end
end

defp do_proceed_spec_startup(spec_ref, %{status: :created} = spec_data, state) do
state =
with %Bin.State{} <- state do
bin_pads_linked_in_spec =
spec_data.links_ids
|> Enum.map(&Map.fetch!(state.links, &1))
|> Enum.flat_map(&[&1.from, &1.to])
|> Enum.flat_map(fn
%{child: {Membrane.Bin, :itself}, pad_ref: pad_ref} -> [pad_ref]
_other -> []
end)

bin_pads_linked_in_spec
|> Enum.reduce(state, fn pad_ref, state ->
case PadModel.assert_instance(state, pad_ref) do
:ok -> state
{:error, :unknown_pad} -> PadController.init_pad_data(pad_ref, state)
end
|> PadModel.set_data!(pad_ref, :linked_in_spec?, true)
end)
end

do_proceed_spec_startup(spec_ref, %{spec_data | status: :initializing}, state)
end

defp do_proceed_spec_startup(spec_ref, %{status: :initializing} = spec_data, state) do
Membrane.Logger.debug(
"Proceeding spec #{inspect(spec_ref)} startup: initializing, dependent specs: #{inspect(MapSet.to_list(spec_data.dependent_specs))}"
)

%{children: children} = state

if Enum.all?(spec_data.children_names, &Map.fetch!(children, &1).initialized?) and
if Enum.all?(spec_data.children_names, &Map.fetch!(state.children, &1).initialized?) and
Enum.empty?(spec_data.dependent_specs) do
Membrane.Logger.debug("Spec #{inspect(spec_ref)} status changed to initialized")

Expand Down Expand Up @@ -353,7 +378,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
end

defp do_proceed_spec_startup(spec_ref, %{status: :linking_internally} = spec_data, state) do
if Enum.empty?(spec_data.awaiting_responses) do
if MapSet.size(spec_data.awaiting_responses) == 0 do
state =
spec_data.links_ids
|> Enum.map(&Map.fetch!(state.links, &1))
Expand Down
4 changes: 2 additions & 2 deletions test/membrane/core/element/pad_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ defmodule Membrane.Core.Element.PadControllerTest do
state
)

assert Map.drop(new_state, [:pads_data, :pad_refs]) ==
Map.drop(state, [:pads_data, :pad_refs])
assert Map.delete(new_state, :pads_data) ==
Map.delete(state, :pads_data)

assert PadModel.assert_instance(new_state, :input) == :ok
end
Expand Down

0 comments on commit 14d8018

Please sign in to comment.