Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into pull-diamonds
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Nov 27, 2024
2 parents f02d57e + 8505648 commit 2327b24
Show file tree
Hide file tree
Showing 11 changed files with 106 additions and 14 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## 1.2.0
* Add `:max_instances` option for dynamic pads. [#876](https://github.com/membraneframework/membrane_core/pull/876)
* Implement `Membrane.ForwardingFilter`. [#904](https://github.com/membraneframework/membrane_core/pull/904)
* Implement `Membrane.Connector`. [#904](https://github.com/membraneframework/membrane_core/pull/904)

## 1.1.2
* Add new callback `handle_child_terminated/3` along with new assertions. [#894](https://github.com/membraneframework/membrane_core/pull/894)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Membrane.ForwardingFilter do
defmodule Membrane.Connector do
@moduledoc """
Membrane Filter with input and output dynamic pads, that forwards incoming data to the opposite
side than the one from which it came.
Expand Down
71 changes: 71 additions & 0 deletions lib/membrane/core/child/pads_specs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ defmodule Membrane.Core.Child.PadsSpecs do
pads = Module.get_attribute(env.module, :membrane_pads, []) |> Enum.reverse()
:ok = validate_pads!(pads, env)

if Module.get_attribute(env.module, :__membrane_flow_control_hints__) do
:ok = generate_flow_control_hints(pads, env)
end

alias Membrane.Pad

quote do
Expand All @@ -150,6 +154,73 @@ defmodule Membrane.Core.Child.PadsSpecs do
end
end

@spec generate_flow_control_hints(
pads :: [{Pad.name(), Pad.description()}],
env :: Macro.Env.t()
) :: :ok
defp generate_flow_control_hints(pads, env) do
used_module =
env.module
|> Module.get_attribute(:__membrane_element_type__)
|> case do
:source -> Membrane.Source
:filter -> Membrane.Filter
:sink -> Membrane.Sink
:endpoint -> Membrane.Endpoint
end

{auto_pads, push_pads} =
pads
|> Enum.filter(fn {_pad, info} -> info[:flow_control] in [:auto, :push] end)
|> Enum.split_with(fn {_pad, info} -> info.flow_control == :auto end)

if auto_pads != [] and push_pads != [] do
IO.warn("""
#{inspect(env.module)} defines pads with `flow_control: :auto` and pads with `flow_control: :push` \
at the same time. Please, consider if this what you want to do - flow control of these pads won't be \
integrated. Setting `flow_control` to `:auto` in the places where it has been set to `:push` might be \
a good idea.
Pads with `flow_control: :auto`: #{auto_pads |> Keyword.keys() |> Enum.join(", ")}.
Pads with `flow_control: :push`: #{push_pads |> Keyword.keys() |> Enum.join(", ")}.
If you want get rid of this warning, pass `flow_control_hints?: false` option to \
`use #{inspect(used_module)}`.
""")
end

{auto_input_pads, auto_output_pads} =
auto_pads
|> Enum.split_with(fn {_pad, info} -> info.direction == :input end)

[auto_input_pads_possible_number, auto_output_pads_possible_number] =
[auto_input_pads, auto_output_pads]
|> Enum.map(fn pads ->
pads
|> Enum.map(fn
{_pad, %{availability: :always}} -> 1
{_pad, %{max_instances: number}} when is_number(number) -> number
{_pad, %{max_instances: :infinity}} -> 2
end)
|> Enum.sum()
end)

if auto_input_pads_possible_number >= 2 and auto_output_pads_possible_number >= 2 do
Membrane.Logger.warning("""
#{inspect(env.module)} might have multiple input pads with `flow_control: :auto` and multiple output \
pads with `flow_control: :auto` at the same time. Notice, that lack of demand on any of output pads with \
`flow_control: :auto` will cause stoping demand on every input pad with `flow_control: :auto`.
Input pads with `flow_control: :auto`: #{auto_input_pads |> Keyword.keys() |> inspect()}.
Output pads with `flow_control: :auto`: #{auto_output_pads |> Keyword.keys() |> inspect()}.
If you want get rid of this warning, pass `flow_control_hints?: false` option to \
`use #{inspect(used_module)}` or add upperbound on the number of possible instances of pads with \
`availability: :on_request` using `max_instances: non_neg_number` option.
""")
end

:ok
end

@spec parse_pad_specs!(
specs :: Pad.spec(),
direction :: Pad.direction(),
Expand Down
9 changes: 9 additions & 0 deletions lib/membrane/element/base.ex
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ defmodule Membrane.Element.Base do
Options:
- `:bring_pad?` - if true (default) requires and aliases `Membrane.Pad`
- `:flow_control_hints?` - if true (default) generates compile-time warnings \
if the number, direction, and type of flow control of pads are likely to cause unintended \
behaviours.
"""
defmacro __using__(options) do
bring_pad =
Expand All @@ -247,6 +250,12 @@ defmodule Membrane.Element.Base do
end
end

Module.put_attribute(
__CALLER__.module,
:__membrane_flow_control_hints__,
Keyword.get(options, :flow_control_hints?, true)
)

quote location: :keep do
@behaviour unquote(__MODULE__)

Expand Down
3 changes: 3 additions & 0 deletions lib/membrane/endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ defmodule Membrane.Endpoint do
Options:
- `:bring_pad?` - if true (default) requires and aliases `Membrane.Pad`
- `:flow_control_hints?` - if true (default) generates compile-time warnings \
if the number, direction, and type of flow control of pads are likely to cause unintended \
behaviours.
"""
defmacro __using__(options) do
Module.put_attribute(__CALLER__.module, :__membrane_element_type__, :endpoint)
Expand Down
3 changes: 3 additions & 0 deletions lib/membrane/filter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ defmodule Membrane.Filter do
Options:
- `:bring_pad?` - if true (default) requires and aliases `Membrane.Pad`
- `:flow_control_hints?` - if true (default) generates compile-time warnings \
if the number, direction, and type of flow control of pads are likely to cause unintended \
behaviours.
"""

defmacro __using__(options) do
Expand Down
3 changes: 3 additions & 0 deletions lib/membrane/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ defmodule Membrane.Sink do
Options:
- `:bring_pad?` - if true (default) requires and aliases `Membrane.Pad`
- `:flow_control_hints?` - if true (default) generates compile-time warnings \
if the number, direction, and type of flow control of pads are likely to cause unintended \
behaviours.
"""
defmacro __using__(options) do
Module.put_attribute(__CALLER__.module, :__membrane_element_type__, :sink)
Expand Down
3 changes: 3 additions & 0 deletions lib/membrane/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ defmodule Membrane.Source do
Options:
- `:bring_pad?` - if true (default) requires and aliases `Membrane.Pad`
- `:flow_control_hints?` - if true (default) generates compile-time warnings \
if the number, direction, and type of flow control of pads are likely to cause unintended \
behaviours.
"""
alias Membrane.Core.DocsHelper

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
defmodule Membrane.Integration.ForwardingFilterTest do
defmodule Membrane.Integration.ConnectorTest do
use ExUnit.Case, async: true

import Membrane.ChildrenSpec
import Membrane.Testing.Assertions

alias Membrane.Buffer
alias Membrane.ForwardingFilter
alias Membrane.Connector
alias Membrane.Testing

require Membrane.Pad, as: Pad
Expand All @@ -23,12 +23,12 @@ defmodule Membrane.Integration.ForwardingFilterTest do
do: {[{action, {:output, item}}], state}
end

test "Membrane.ForwardingFilter buffers data until output pad is linked" do
test "Membrane.Connector buffers data until output pad is linked" do
pipeline =
Testing.Pipeline.start_link_supervised!(
spec:
child(:source, Source)
|> child(:filter, %ForwardingFilter{
|> child(:connector, %Connector{
notify_on_event?: true,
notify_on_stream_format?: true
})
Expand All @@ -41,11 +41,11 @@ defmodule Membrane.Integration.ForwardingFilterTest do
Testing.Pipeline.notify_child(pipeline, :source, {type, item})

if type in [:stream_format, :event] do
assert_pipeline_notified(pipeline, :filter, {^type, Pad.ref(:input, _id), ^item})
assert_pipeline_notified(pipeline, :connector, {^type, Pad.ref(:input, _id), ^item})
end
end)

spec = get_child(:filter) |> child(:sink, Testing.Sink)
spec = get_child(:connector) |> child(:sink, Testing.Sink)
Testing.Pipeline.execute_actions(pipeline, spec: spec)

data
Expand All @@ -62,7 +62,7 @@ defmodule Membrane.Integration.ForwardingFilterTest do
Testing.Pipeline.notify_child(pipeline, :source, {type, item})

if type in [:stream_format, :event] do
assert_pipeline_notified(pipeline, :filter, {^type, Pad.ref(:input, _id), ^item})
assert_pipeline_notified(pipeline, :connector, {^type, Pad.ref(:input, _id), ^item})
end

case type do
Expand All @@ -75,7 +75,7 @@ defmodule Membrane.Integration.ForwardingFilterTest do
Testing.Pipeline.terminate(pipeline)
end

test "Membrane.ForwardingFilter pauses input demand if output pad is not linked" do
test "Membrane.Connector pauses input demand if output pad is not linked" do
atomics_ref = :atomics.new(1, [])
:atomics.put(atomics_ref, 1, 0)

Expand All @@ -90,15 +90,15 @@ defmodule Membrane.Integration.ForwardingFilterTest do
spec =
child(:source, %Testing.Source{output: {nil, generator}})
|> via_in(:input, auto_demand_size: auto_demand_size)
|> child(:forwarding_filter, ForwardingFilter)
|> child(:connector, Connector)

pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

Process.sleep(500)

assert :atomics.get(atomics_ref, 1) == auto_demand_size

spec = get_child(:forwarding_filter) |> child(:sink, Testing.Sink)
spec = get_child(:connector) |> child(:sink, Testing.Sink)
Testing.Pipeline.execute_actions(pipeline, spec: spec)

Process.sleep(500)
Expand Down
2 changes: 1 addition & 1 deletion test/support/child_removal_test/filter_to_be_removed.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Membrane.Support.ChildRemovalTest.FilterToBeRemoved do
@moduledoc false
use Membrane.Filter
use Membrane.Filter, flow_control_hints?: false

def_input_pad :input, accepted_format: _any, flow_control: :auto, availability: :on_request
def_output_pad :output, accepted_format: _any, flow_control: :auto, availability: :on_request
Expand Down
2 changes: 1 addition & 1 deletion test/support/dynamic_filter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Membrane.Support.Element.DynamicFilter do
"""

use Bunch
use Membrane.Filter
use Membrane.Filter, flow_control_hints?: false

def_input_pad :input, accepted_format: _any, availability: :on_request, flow_control: :auto
def_output_pad :output, accepted_format: _any, availability: :on_request, flow_control: :auto
Expand Down

0 comments on commit 2327b24

Please sign in to comment.