From 7d691fa7ccc8cba1b9cf27ab6b92b3de3cb8e791 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Thu, 14 Sep 2023 16:57:24 +0200 Subject: [PATCH] Release v0.12.9 --- CHANGELOG.md | 46 +++++-- README.md | 2 +- benchmark/run/branched_filter.ex | 4 +- benchmark/run/linear_filter.ex | 4 +- guides/upgrading/v0.11.md | 2 +- guides/upgrading/v0.12.md | 65 ++++++++++ guides/upgrading/v1.0.0-rc0.md | 2 +- lib/membrane/bin.ex | 6 +- lib/membrane/bin/action.ex | 8 ++ lib/membrane/bin/callback_context.ex | 10 +- lib/membrane/clock.ex | 18 +-- lib/membrane/core/bin.ex | 3 +- lib/membrane/core/bin/action_handler.ex | 5 + lib/membrane/core/bin/callback_context.ex | 5 +- lib/membrane/core/bin/pad_controller.ex | 2 +- lib/membrane/core/child/pads_specs.ex | 100 ++++++++++----- lib/membrane/core/element/action_handler.ex | 8 +- lib/membrane/core/element/callback_context.ex | 3 +- lib/membrane/core/element/event_controller.ex | 117 +++++++++--------- lib/membrane/core/element/pad_controller.ex | 2 +- lib/membrane/core/options_specs.ex | 2 +- .../core/parent/child_life_controller.ex | 8 ++ .../core/parent/lifecycle_controller.ex | 5 +- lib/membrane/core/pipeline.ex | 13 +- lib/membrane/core/pipeline/action_handler.ex | 13 +- .../core/pipeline/callback_context.ex | 1 - lib/membrane/core/timer.ex | 13 +- lib/membrane/element/base.ex | 4 +- lib/membrane/element/callback_context.ex | 10 +- lib/membrane/element/with_input_pads.ex | 34 ++++- lib/membrane/endpoint.ex | 20 +++ lib/membrane/filter.ex | 35 +++++- lib/membrane/pad.ex | 2 +- lib/membrane/pipeline.ex | 10 +- lib/membrane/pipeline/action.ex | 17 ++- lib/membrane/pipeline/callback_context.ex | 6 +- lib/membrane/sink.ex | 20 +++ lib/membrane/testing/assertions.ex | 4 + lib/membrane/testing/pipeline.ex | 23 ++-- lib/membrane/time.ex | 47 ++++++- mix.exs | 5 +- mix.lock | 4 +- test/membrane/api_backcompability_test.exs | 52 ++++++++ test/membrane/clock_test.exs | 11 +- .../core/element/event_controller_test.exs | 6 + test/membrane/core/element_test.exs | 5 +- .../integration/auto_demands_test.exs | 10 +- ...effective_flow_control_resolution_test.exs | 15 +-- .../integration/end_of_stream_test.exs | 48 ------- test/membrane/integration/linking_test.exs | 3 +- .../integration/links_validation_test.exs | 14 ++- 51 files changed, 585 insertions(+), 287 deletions(-) create mode 100644 guides/upgrading/v0.12.md create mode 100644 test/membrane/api_backcompability_test.exs delete mode 100644 test/membrane/integration/end_of_stream_test.exs diff --git a/CHANGELOG.md b/CHANGELOG.md index 420c9e0f8..71d1d6d1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,27 +1,53 @@ # Changelog -## 1.0.0 +# 0.12.9 + * Add `:pause_auto_demand` and `:resume_auto_demand` actions. [#586](https://github.com/membraneframework/membrane_core/pull/586) + * Fix process leak in starting clocks. [#594](https://github.com/membraneframework/membrane_core/pull/594) + * Add child exit reason to the supervisor exit reason. [#595](https://github.com/membraneframework/membrane_core/pull/595) + +# 0.12.8 + * Fix race condition in links deletion. [#591](https://github.com/membraneframework/membrane_core/pull/591) + * Improve error message, when stream format does not match the accepted format pattern. [#591](https://github.com/membraneframework/membrane_core/pull/591) + * Fix bug in default pipeline functions specs. [#585](https://github.com/membraneframework/membrane_core/pull/585) + +# 0.12.7 + * Fix bug in generating processes names. [#584](https://github.com/membraneframework/membrane_core/pull/584) + * Fix bug in registering metrics. [#582](https://github.com/membraneframework/membrane_core/pull/582) + * Improve error message on lack of `handle_child_pad_removed/4` callback implementation. [#581](https://github.com/membraneframework/membrane_core/pull/581) + * Improve actions docs. [#580](https://github.com/membraneframework/membrane_core/pull/580) + +# 0.12.6 + * Implement functionalities needed for integration with `membrane_kino_dashboard`. [#571](https://github.com/membraneframework/membrane_core/pull/571) + +# 0.12.5 + * Fix compilation error occurring with Elixir 1.15. [#573](https://github.com/membraneframework/membrane_core/pull/573) + +# 0.12.4 + * Fix compilation error occurring with Elixir 1.15. [#570](https://github.com/membraneframework/membrane_core/pull/570) + +# 0.12.3 + * Fix bug in fields naming in callback contexts. [#569](https://github.com/membraneframework/membrane_core/pull/569) + * Update exit reasons of Membrane Components and their supervisors. [#567](https://github.com/membraneframework/membrane_core/pull/567) + +## 0.12.2 + * Fix bug in order of handling actions returned from callbacks. + +## 0.12.1 * Introduce `:remove_link` action in pipelines and bins. * Add children groups - a mechanism that allows refering to multiple children with a single identifier. - * Rename `remove_child` action into `remove_children` and allow for removing a children group with a single action. * Add an ability to spawn anonymous children. - * Replace `Membrane.Time.round_to_` with `Membrane.Time.as_/2` with second argument equal `:round`. Rename `Membrane.Time.round_to_timebase` to `Membrane.Time.divide_by_timebase/2`. [#494](https://github.com/membraneframework/membrane_core/pull/494) * Remove `:playback` action. Introduce `:setup` action. [#496](https://github.com/membraneframework/membrane_core/pull/496) * Add `Membrane.Testing.Pipeline.get_child_pid/2`. [#497](https://github.com/membraneframework/membrane_core/pull/497) * Make callback contexts to be maps. [#504](https://github.com/membraneframework/membrane_core/pull/504) * All Membrane Elements can be compatible till now on - pads working in `:pull` mode, handling different `demand_units`, can be now linked. * Output pads working in `:pull` mode should have their `demand_unit` specified. If case it's not available, it's assumed that the pad handles demands in both `:bytes` and `:buffers` units. - * Rename callbacks `handle_process/4` and `handle_write/4` to `handle_buffer/4` in [#506](https://github.com/membraneframework/membrane_core/pull/506) - * The flow control of the pad is now set with a single `:flow_control` option instead of `:mode` and `:demand_mode` options. * Remove _t suffix from types [#509](https://github.com/membraneframework/membrane_core/pull/509) * Implement automatic demands in Membrane Sinks and Endpoints. [#512](https://github.com/membraneframework/membrane_core/pull/512) * Add `handle_child_pad_removed/4` callback in Bins and Pipelines. [#513](https://github.com/membraneframework/membrane_core/pull/513) * Introduce support for crash groups in Bins. [#521](https://github.com/membraneframework/membrane_core/pull/521) - * Remove `assert_pipeline_play/2` from `Membrane.Testing.Assertions`. [#528](https://github.com/membraneframework/membrane_core/pull/528) * Make sure enumerable with all elements being `Membrane.Buffer.t()`, passed as `:output` parameter for `Membrane.Testing.Source` won't get rewrapped in `Membrane.Buffer.t()` struct. * Implement `Membrane.Debug.Filter` and `Membrane.Debug.Sink`. [#552](https://github.com/membraneframework/membrane_core/pull/552) * Add `:pause_auto_demand` and `:resume_auto_demand` actions. [#586](https://github.com/membraneframework/membrane_core/pull/586) - * Send `:end_of_stream`, even if it is not preceded by `:start_of_stream`. [#557](https://github.com/membraneframework/membrane_core/pull/577) * Fix process leak in starting clocks. [#594](https://github.com/membraneframework/membrane_core/pull/594) * Add child exit reason to the supervisor exit reason. [#595](https://github.com/membraneframework/membrane_core/pull/595) @@ -35,9 +61,9 @@ * New `spec` action syntax - the structure of pipeline is now defined with the use of `Membrane.ChildrenSpec` * Rename `:caps` to `:stream_format`. * Use Elixir patterns as `:accepted_format` in pad definition. - * Delete `:ok` from tuples returned from callbacks. - * Remove `:type` from specs passed to `def_options/1` macro in bins and elements. - * Add `Membrane.Testing.MockResourceGuard`. + * Delete `:ok` from tuples returned from callbacks + * Remove `:type` from specs passed to `def_options/1` macro in bins and elements. + * Add `Membrane.Testing.MockResourceGuard` ## 0.10.0 * Remove all deprecated stuff [#399](https://github.com/membraneframework/membrane_core/pull/399) diff --git a/README.md b/README.md index 2a71cf546..f4295c86b 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ This package provides core of the [Membrane Multimedia Framework](https://membra Add the following line to your `deps` in `mix.exs`. Run `mix deps.get`. ```elixir -{:membrane_core, "~> 0.11.0"} +{:membrane_core, "~> 0.12.9"} ``` Or, if you'd like to try the latest release candidate, use this version: diff --git a/benchmark/run/branched_filter.ex b/benchmark/run/branched_filter.ex index d7669fe83..952f980b0 100644 --- a/benchmark/run/branched_filter.ex +++ b/benchmark/run/branched_filter.ex @@ -4,8 +4,8 @@ defmodule Benchmark.Run.BranchedFilter do alias Benchmark.Run.Reductions - def_input_pad :input, accepted_format: _any, availability: :on_request - def_output_pad :output, accepted_format: _any, availability: :on_request + def_input_pad :input, accepted_format: _any, availability: :on_request, flow_control: :auto + def_output_pad :output, accepted_format: _any, availability: :on_request, flow_control: :auto def_options number_of_reductions: [spec: integer()], generator: [spec: (integer() -> integer())], diff --git a/benchmark/run/linear_filter.ex b/benchmark/run/linear_filter.ex index 4d47f09be..f41e56700 100644 --- a/benchmark/run/linear_filter.ex +++ b/benchmark/run/linear_filter.ex @@ -4,8 +4,8 @@ defmodule Benchmark.Run.LinearFilter do alias Benchmark.Run.Reductions - def_input_pad :input, accepted_format: _any - def_output_pad :output, accepted_format: _any + def_input_pad :input, accepted_format: _any, flow_control: :auto + def_output_pad :output, accepted_format: _any, flow_control: :auto def_options number_of_reductions: [spec: integer()], generator: [spec: (integer() -> integer())] diff --git a/guides/upgrading/v0.11.md b/guides/upgrading/v0.11.md index ed6334187..0c40ee201 100644 --- a/guides/upgrading/v0.11.md +++ b/guides/upgrading/v0.11.md @@ -1,6 +1,6 @@ # Upgrading to v0.11 -Improvements in v0.11 required some breaking changes, so here comes the guide that will help you adjust your code to the new API. See the [changelog](https://github.com/membraneframework/membrane_core/releases/tag/v0.11.0) for details. +Improvements in v0.11 required some breaking changes, so here comes the guide that will help you adjust your code to the new API. See the [release notes](https://github.com/membraneframework/membrane_core/releases/tag/v0.11.0) for details. ## Deps upgrade diff --git a/guides/upgrading/v0.12.md b/guides/upgrading/v0.12.md new file mode 100644 index 000000000..d1bda611f --- /dev/null +++ b/guides/upgrading/v0.12.md @@ -0,0 +1,65 @@ +# Upgrading to v0.12 + +Between v0.11 and v0.12 some breaking changes have occurred, so here comes the guide that will help you adjust your code to the new API. See the [release notes](https://github.com/membraneframework/membrane_core/releases/tag/v0.12.1) for details. + +## Deps upgrade + +Upgrade `membrane_core` to `v0.12.1`. + +```elixir +defp deps do + [ + {:membrane_core, "~> 0.12.1"}, + ... + ] +end +``` + +## Implement `handle_child_pad_removed/4` callback in bins and pipelines, if it is needed + +Now, if bin removes its pad (e.g. by removing an element linked to the bin's inner pad), bin's parent has to have implemented proper `handle_child_pad_removed/4` callback, to handle it. If there is no such a callback, default behaviour is to raise an error. + +```elixir +@impl true +def handle_child_pad_removed(:rtp, Pad.ref(:rtp_input, _ssrc), _ctx, state) do + # ... +end +``` + +## Remove `:playback` action + +Now, membrane pipelines enter the playing playback by default and they don't have to return a `:playback` action to do it. + +```diff +- @impl true +- def handle_setup(_ctx, state) do +- {[playback: :playing], state} +- end +``` +Instead of it, there is a new action introduced in `membrane_core` v0.12, `setup: :incomplete | :complete`. If you want to defer a moment when a component enters the playing playback, you can return `{:setup, :incomplete}` action from `handle_setup` callback. If you do that, a component will enter the playing playback only when you return `{:setup, :complete}` action from another callback, e.g. `handle_info`. + +```diff +- @impl true +- def handle_setup(_ctx, state) do +- Process.send_after(self(), :play, 1000) +- {[], state} +- end +- +- @impl true +- def handle_info(:play, _ctx, state) do +- {[playback: :playing], state} +- end + ++ @impl true ++ def handle_setup(_ctx, state) do ++ Process.send_after(self(), :play, 1000) ++ {[setup: :incomplete], state} ++ end ++ ++ @impl true ++ def handle_info(:play, _ctx, state) do ++ {[setup: :complete], state} ++ end +``` + +`:setup` action is available not only in pipelines but in bins and elements as well. diff --git a/guides/upgrading/v1.0.0-rc0.md b/guides/upgrading/v1.0.0-rc0.md index c617e0f97..97b146c29 100644 --- a/guides/upgrading/v1.0.0-rc0.md +++ b/guides/upgrading/v1.0.0-rc0.md @@ -1,6 +1,6 @@ # Upgrading to v1.0.0-rc0 -Between v0.11 and v1.0.0-rc0 some breaking changes have occurred, so here comes the guide that will help you adjust your code to the new API. See the [changelog](https://github.com/membraneframework/membrane_core/releases/tag/v1.0.0-rc0) for detailed description of the changes. +Between v0.11 and v1.0.0-rc0 some breaking changes have occurred, so here comes the guide that will help you adjust your code to the new API. See the [release notes](https://github.com/membraneframework/membrane_core/releases/tag/v1.0.0-rc0) for detailed description of the changes. ### Deps upgrade diff --git a/lib/membrane/bin.ex b/lib/membrane/bin.ex index 3937eeb38..33ca3211d 100644 --- a/lib/membrane/bin.ex +++ b/lib/membrane/bin.ex @@ -51,7 +51,7 @@ defmodule Membrane.Bin do Callback that is called when new pad has been added to bin. Executed ONLY for dynamic pads. - Context passed to this callback contains additional field `:pad_options`. + Context passed to this callback contains additional field `:options`. By default, it does nothing. """ @callback handle_pad_added( @@ -64,7 +64,7 @@ defmodule Membrane.Bin do Callback that is called when some pad of the bin has been removed. Executed ONLY for dynamic pads. - Context passed to this callback contains additional field `:pad_options`. + Context passed to this callback contains additional field `:options`. By default, it does nothing. """ @callback handle_pad_removed( @@ -98,7 +98,7 @@ defmodule Membrane.Bin do Callback invoked when a child removes its pad. The callback won't be invoked, when you have initiated the pad removal, - eg. when you have returned `t:Membrane.Bin.Action.remove_link()` action + e.g. when you have returned `t:Membrane.Bin.Action.remove_link()` action which made one of your children's pads be removed. By default, it does nothing. """ diff --git a/lib/membrane/bin/action.ex b/lib/membrane/bin/action.ex index f813861d5..c3e848adf 100644 --- a/lib/membrane/bin/action.ex +++ b/lib/membrane/bin/action.ex @@ -53,6 +53,13 @@ defmodule Membrane.Bin.Action do | Membrane.Child.group() | [Membrane.Child.group()]} + @type remove_child :: + {:remove_child, + Child.name() + | [Child.name()] + | Membrane.Child.group() + | [Membrane.Child.group()]} + @typedoc """ Action that removes link, which relates to specified child and pad. @@ -134,6 +141,7 @@ defmodule Membrane.Bin.Action do | notify_parent | spec | remove_children + | remove_child | remove_link | start_timer | timer_interval diff --git a/lib/membrane/bin/callback_context.ex b/lib/membrane/bin/callback_context.ex index 9608bb3c5..c9d9e7fca 100644 --- a/lib/membrane/bin/callback_context.ex +++ b/lib/membrane/bin/callback_context.ex @@ -6,12 +6,9 @@ defmodule Membrane.Bin.CallbackContext do @typedoc """ Type describing context passed to the `Membrane.Bin` callbacks. - Field `:pad_options` is present only in `c:Membrane.Bin.handle_pad_added/3` + Field `:options` is present only in `c:Membrane.Bin.handle_pad_added/3` and `c:Membrane.Bin.handle_pad_removed/3`. - Field `:start_of_stream_received?` is present only in - `c:Membrane.Bin.handle_element_end_of_stream/4`. - Fields `:members` and `:crash_initiator` are present only in `c:Membrane.Pipeline.handle_crash_group_down/3`. """ @@ -24,9 +21,8 @@ defmodule Membrane.Bin.CallbackContext do :playback => Membrane.Playback.t(), :resource_guard => Membrane.ResourceGuard.t(), :utility_supervisor => Membrane.UtilitySupervisor.t(), - optional(:pad_options) => map(), + optional(:options) => map(), optional(:members) => [Membrane.Child.name()], - optional(:crash_initiator) => Membrane.Child.name(), - optional(:start_of_stream_received?) => boolean() + optional(:crash_initiator) => Membrane.Child.name() } end diff --git a/lib/membrane/clock.ex b/lib/membrane/clock.ex index 5f1097a5c..027e70018 100644 --- a/lib/membrane/clock.ex +++ b/lib/membrane/clock.ex @@ -50,7 +50,6 @@ defmodule Membrane.Clock do non_neg_integer | ratio | {numerator :: non_neg_integer, denominator :: pos_integer()}} - @typedoc """ Ratio message sent by the Clock to all its subscribers. It contains the ratio of the custom clock time to the reference time. @@ -119,7 +118,7 @@ defmodule Membrane.Clock do state = %{ - ratio: Ratio.new(1), + ratio: 1, subscribers: %{}, time_provider: options |> Keyword.get(:time_provider, fn -> Time.monotonic_time() end) } @@ -141,7 +140,7 @@ defmodule Membrane.Clock do subscribe(proxy_for) state else - broadcast_and_update_ratio(Ratio.new(1), state) + broadcast_and_update_ratio(1, state) end {:noreply, state} @@ -201,7 +200,7 @@ defmodule Membrane.Clock do defp get_proxy_options(true, _proxy_for), do: %{proxy: true, proxy_for: nil} defp get_proxy_options(_proxy, _proxy_for), - do: %{init_time: nil, clock_time: Ratio.new(0), till_next: nil, proxy: false} + do: %{init_time: nil, clock_time: 0, till_next: nil, proxy: false} defp handle_unsubscribe(pid, state) do Process.demonitor(state.subscribers[pid].monitor, [:flush]) @@ -214,13 +213,13 @@ defmodule Membrane.Clock do end defp handle_clock_update(till_next, state) do - till_next = Ratio.new(till_next) + use Ratio - if Ratio.lt?(till_next, 0) do + if till_next < 0 do raise "Clock update time cannot be negative, received: #{inspect(till_next)}" end - till_next = Ratio.mult(till_next, Ratio.new(Time.millisecond())) + till_next = till_next * Time.millisecond() case state.init_time do nil -> %{state | init_time: state.time_provider.(), till_next: till_next} @@ -229,9 +228,10 @@ defmodule Membrane.Clock do end defp do_handle_clock_update(till_next, state) do + use Ratio %{till_next: from_previous, clock_time: clock_time} = state - clock_time = Ratio.add(clock_time, from_previous) - ratio = Ratio.div(clock_time, Ratio.new(state.time_provider.() - state.init_time)) + clock_time = clock_time + from_previous + ratio = clock_time / (state.time_provider.() - state.init_time) state = %{state | clock_time: clock_time, till_next: till_next} broadcast_and_update_ratio(ratio, state) end diff --git a/lib/membrane/core/bin.ex b/lib/membrane/core/bin.ex index 834541621..4cac74d4e 100644 --- a/lib/membrane/core/bin.ex +++ b/lib/membrane/core/bin.ex @@ -176,7 +176,7 @@ defmodule Membrane.Core.Bin do end defp do_handle_info( - Message.new(:stream_management_event, [element_name, pad_ref, event, event_params]), + Message.new(:stream_management_event, [element_name, pad_ref, event]), state ) do state = @@ -184,7 +184,6 @@ defmodule Membrane.Core.Bin do event, element_name, pad_ref, - event_params, state ) diff --git a/lib/membrane/core/bin/action_handler.ex b/lib/membrane/core/bin/action_handler.ex index a4888c529..986fd0486 100644 --- a/lib/membrane/core/bin/action_handler.ex +++ b/lib/membrane/core/bin/action_handler.ex @@ -43,6 +43,11 @@ defmodule Membrane.Core.Bin.ActionHandler do Parent.ChildLifeController.handle_remove_children(children, state) end + @impl CallbackHandler + def handle_action({:remove_child, children}, _cb, _params, state) do + Parent.ChildLifeController.handle_remove_children(children, state) + end + @impl CallbackHandler def handle_action({:remove_link, {child_name, pad_ref}}, _cb, _params, state) do Parent.ChildLifeController.handle_remove_link(child_name, pad_ref, state) diff --git a/lib/membrane/core/bin/callback_context.ex b/lib/membrane/core/bin/callback_context.ex index 704822720..0a6b4e8c8 100644 --- a/lib/membrane/core/bin/callback_context.ex +++ b/lib/membrane/core/bin/callback_context.ex @@ -1,10 +1,7 @@ defmodule Membrane.Core.Bin.CallbackContext do @moduledoc false - @type optional_fields :: - [pad_options: map()] - | [members: [Membrane.Child.name()], crash_initiator: Membrane.Child.name()] - | [start_of_stream_received?: boolean()] + @type optional_fields :: [options: map()] @spec from_state(Membrane.Core.Bin.State.t(), optional_fields()) :: Membrane.Bin.CallbackContext.t() diff --git a/lib/membrane/core/bin/pad_controller.ex b/lib/membrane/core/bin/pad_controller.ex index 39dc10a75..ac2ca43d3 100644 --- a/lib/membrane/core/bin/pad_controller.ex +++ b/lib/membrane/core/bin/pad_controller.ex @@ -317,7 +317,7 @@ defmodule Membrane.Core.Bin.PadController do %{options: pad_opts, availability: availability} = PadModel.get_data!(state, ref) if Pad.availability_mode(availability) == :dynamic do - context = &CallbackContext.from_state(&1, pad_options: pad_opts) + context = &CallbackContext.from_state(&1, options: pad_opts) CallbackHandler.exec_and_handle_callback( :handle_pad_added, diff --git a/lib/membrane/core/child/pads_specs.ex b/lib/membrane/core/child/pads_specs.ex index c22e2db32..694db1362 100644 --- a/lib/membrane/core/child/pads_specs.ex +++ b/lib/membrane/core/child/pads_specs.ex @@ -188,44 +188,82 @@ defmodule Membrane.Core.Child.PadsSpecs do |> Bunch.Config.parse( availability: [in: [:always, :on_request], default: :always], accepted_formats_str: [], - flow_control: fn _config -> - cond do - component == :bin -> - nil - - direction == :output and component != :filter -> - [in: [:manual, :push]] - - direction == :input or component == :filter -> - [in: [:auto, :manual, :push], default: :auto] - end - end, - demand_unit: - &cond do - component == :bin or &1[:flow_control] != :manual -> - nil - - direction == :input -> - [in: [:buffers, :bytes]] - - direction == :output -> - [in: [:buffers, :bytes, nil], default: nil] - - true -> - nil - end, - options: [default: nil] + options: [default: nil], + demand_unit: [in: [:buffers, :bytes, nil], default: nil], + flow_control: flow_control_parsing_options(config, direction, component), + mode: mode_parsing_options(config, component), + demand_mode: demand_mode_parsing_options(config, direction, component) ) do - config - |> Map.put(:direction, direction) - |> Map.put(:name, name) - ~> {:ok, {name, &1}} + case config do + _config when component == :bin -> + config + |> Map.drop([:demand_unit, :mode, :demand_mode, :flow_control]) + + %{mode: :push} -> + config + |> Map.drop([:mode, :demand_mode]) + |> Map.put(:flow_control, :push) + + %{mode: :pull, demand_mode: demand_mode} -> + config + |> Map.drop([:mode, :demand_mode]) + |> Map.put(:flow_control, demand_mode) + + %{flow_control: _flow_control} -> + config + end + |> Map.merge(%{direction: direction, name: name}) + |> then(&{:ok, {name, &1}}) else spec: spec -> {:error, {:invalid_pad_spec, spec}} config: {:error, reason} -> {:error, {reason, pad: name}} end end + defp mode_parsing_options(config, component) do + old? = old_api?(config) + + fn _config -> + if old? or component == :bin do + [in: [:pull, :push], default: :pull] + else + nil + end + end + end + + defp demand_mode_parsing_options(config, direction, component) do + old? = old_api?(config) + + fn _config -> + cond do + not old? -> nil + auto_allowed?(direction, component) -> [in: [:manual, :auto], default: :manual] + true -> [in: [:manual], default: :manual] + end + end + end + + defp flow_control_parsing_options(config, direction, component) do + old? = old_api?(config) + + fn _config -> + cond do + old? -> nil + auto_allowed?(direction, component) -> [in: [:auto, :manual, :push], default: :manual] + true -> [in: [:manual, :push], default: :manual] + end + end + end + + defp old_api?(config) do + Keyword.has_key?(config, :mode) or Keyword.has_key?(config, :demand_mode) + end + + defp auto_allowed?(direction, component) do + direction == :input or component in [:filter, :bin] + end + @doc """ Generates docs describing pads based on pads specification. """ diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 93e1d8f81..83a45ad7a 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -454,7 +454,7 @@ defmodule Membrane.Core.Element.ActionHandler do if Event.event?(event) do %{pid: pid, other_ref: other_ref} = PadModel.get_data!(state, pad_ref) - state = handle_outgoing_event(pad_ref, event, state) + state = handle_event(pad_ref, event, state) Message.send(pid, :event, event, for_pad: other_ref) state else @@ -463,8 +463,8 @@ defmodule Membrane.Core.Element.ActionHandler do end end - @spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t() - defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do + @spec handle_event(Pad.ref(), Event.t(), State.t()) :: State.t() + defp handle_event(pad_ref, %Events.EndOfStream{}, state) do with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do state = PadController.remove_pad_associations(pad_ref, state) PadModel.set_data!(state, pad_ref, :end_of_stream?, true) @@ -477,5 +477,5 @@ defmodule Membrane.Core.Element.ActionHandler do end end - defp handle_outgoing_event(_pad_ref, _event, state), do: state + defp handle_event(_pad_ref, _event, state), do: state end diff --git a/lib/membrane/core/element/callback_context.ex b/lib/membrane/core/element/callback_context.ex index 71cee7c9b..ef8dd83b7 100644 --- a/lib/membrane/core/element/callback_context.ex +++ b/lib/membrane/core/element/callback_context.ex @@ -3,9 +3,8 @@ defmodule Membrane.Core.Element.CallbackContext do @type optional_fields :: [incoming_demand: non_neg_integer()] - | [pad_options: map()] + | [options: map()] | [old_stream_format: Membrane.StreamFormat.t()] - | [start_of_stream_received?: boolean()] @spec from_state(Membrane.Core.Element.State.t(), optional_fields()) :: Membrane.Element.CallbackContext.t() diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index 377b70a30..3e73da04d 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -60,90 +60,58 @@ defmodule Membrane.Core.Element.EventController do end @spec exec_handle_event(Pad.ref(), Event.t(), params :: map, State.t()) :: State.t() - def exec_handle_event(pad_ref, event, params \\ %{}, state) + def exec_handle_event(pad_ref, event, params \\ %{}, state) do + case handle_special_event(pad_ref, event, state) do + {:handle, state} -> + :ok = check_sync(event, state) + do_exec_handle_event(pad_ref, event, params, state) - def exec_handle_event(pad_ref, %Events.StartOfStream{} = event, params, state) do - state = PadModel.set_data!(state, pad_ref, :start_of_stream?, true) - :ok = check_sync(state) + {:ignore, state} -> + state + end + end + + @spec do_exec_handle_event(Pad.ref(), Event.t(), params :: map, State.t()) :: State.t() + defp do_exec_handle_event(pad_ref, %event_type{} = event, params, state) + when event_type in [Events.StartOfStream, Events.EndOfStream] do + data = PadModel.get_data!(state, pad_ref) + callback = stream_event_to_callback(event) new_params = Map.merge(params, %{ context: &CallbackContext.from_state/1, - direction: PadModel.get_data!(state, pad_ref, :direction) + direction: data.direction }) state = CallbackHandler.exec_and_handle_callback( - :handle_start_of_stream, + callback, ActionHandler, new_params, [pad_ref], state ) - Message.send( - state.parent_pid, - :stream_management_event, - [state.name, pad_ref, event, []] - ) + Message.send(state.parent_pid, :stream_management_event, [ + state.name, + pad_ref, + event + ]) state end - def exec_handle_event(pad_ref, %Events.EndOfStream{} = event, params, state) do - if PadModel.get_data!(state, pad_ref, :end_of_stream?) do - Membrane.Logger.debug("Ignoring end of stream as it has already arrived before") - state - else - Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}") - - state = PadModel.set_data!(state, pad_ref, :end_of_stream?, true) - state = PadController.remove_pad_associations(pad_ref, state) - - %{ - start_of_stream?: start_of_stream?, - direction: direction - } = PadModel.get_data!(state, pad_ref) - - event_params = [start_of_stream_received?: start_of_stream?] - - new_params = - Map.merge(params, %{ - context: &CallbackContext.from_state(&1, event_params), - direction: direction - }) - - state = - CallbackHandler.exec_and_handle_callback( - :handle_end_of_stream, - ActionHandler, - new_params, - [pad_ref], - state - ) - - Message.send( - state.parent_pid, - :stream_management_event, - [state.name, pad_ref, event, event_params] - ) - - state - end - end - - def exec_handle_event(pad_ref, event, params, state) do + defp do_exec_handle_event(pad_ref, event, params, state) do data = PadModel.get_data!(state, pad_ref) params = - %{context: &CallbackContext.from_state/1, direction: data.direction} - |> Map.merge(params) + %{context: &CallbackContext.from_state/1, direction: data.direction} |> Map.merge(params) args = [pad_ref, event] CallbackHandler.exec_and_handle_callback(:handle_event, ActionHandler, params, args, state) end - defp check_sync(state) do + defp check_sync(%Events.StartOfStream{}, state) do if state.pads_data |> Map.values() |> Enum.filter(&(&1.direction == :input)) @@ -154,7 +122,42 @@ defmodule Membrane.Core.Element.EventController do :ok end + defp check_sync(_event, _state) do + :ok + end + + @spec handle_special_event(Pad.ref(), Event.t(), State.t()) :: + {:handle | :ignore, State.t()} + defp handle_special_event(pad_ref, %Events.StartOfStream{}, state) do + Membrane.Logger.debug("received start of stream") + state = PadModel.set_data!(state, pad_ref, :start_of_stream?, true) + {:handle, state} + end + + defp handle_special_event(pad_ref, %Events.EndOfStream{}, state) do + pad_data = PadModel.get_data!(state, pad_ref) + + with %{start_of_stream?: true, end_of_stream?: false} <- pad_data do + state = PadModel.set_data!(state, pad_ref, :end_of_stream?, true) + state = PadController.remove_pad_associations(pad_ref, state) + {:handle, state} + else + %{end_of_stream?: true} -> + Membrane.Logger.debug("Ignoring end of stream as it has already arrived before") + {:ignore, state} + + %{start_of_stream?: false} -> + Membrane.Logger.debug("Ignoring end of stream as start of stream hasn't arrived yet") + {:ignore, state} + end + end + + defp handle_special_event(_pad_ref, _event, state), do: {:handle, state} + defp buffers_before_event_present?(pad_data) do pad_data.input_queue && not InputQueue.empty?(pad_data.input_queue) end + + defp stream_event_to_callback(%Events.StartOfStream{}), do: :handle_start_of_stream + defp stream_event_to_callback(%Events.EndOfStream{}), do: :handle_end_of_stream end diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index bcbf904d3..e9ac9f171 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -493,7 +493,7 @@ defmodule Membrane.Core.Element.PadController do %{options: pad_opts, availability: availability} = PadModel.get_data!(state, ref) if Pad.availability_mode(availability) == :dynamic do - context = &CallbackContext.from_state(&1, pad_options: pad_opts) + context = &CallbackContext.from_state(&1, options: pad_opts) CallbackHandler.exec_and_handle_callback( :handle_pad_added, diff --git a/lib/membrane/core/options_specs.ex b/lib/membrane/core/options_specs.ex index 1487d967f..3a3ee1e48 100644 --- a/lib/membrane/core/options_specs.ex +++ b/lib/membrane/core/options_specs.ex @@ -16,7 +16,7 @@ defmodule Membrane.Core.OptionsSpecs do * `default:` default value for option. If not present, value for this option will have to be provided each time options struct is created * `inspector:` function converting fields' value to a string. Used when - creating documentation instead of `inspect/1`, eg. `inspector: &Membrane.Time.inspect/1` + creating documentation instead of `inspect/1`, e.g. `inspector: &Membrane.Time.inspect/1` * `description:` string describing an option. It will be used for generating the docs """ end diff --git a/lib/membrane/core/parent/child_life_controller.ex b/lib/membrane/core/parent/child_life_controller.ex index 794c5ea64..2c1358943 100644 --- a/lib/membrane/core/parent/child_life_controller.ex +++ b/lib/membrane/core/parent/child_life_controller.ex @@ -56,6 +56,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do @children_spec_options_fields_specs [ group: [require?: false], crash_group_mode: [require?: false], + crash_group: [require?: false], stream_sync: [require?: false], clock_provider: [require?: false], node: [require?: false], @@ -207,6 +208,13 @@ defmodule Membrane.Core.Parent.ChildLifeController do {:ok, options} = Bunch.Config.parse(options_keywords_list, @children_spec_options_fields_specs) + options = + with %{crash_group: {group_name, :temporary}} <- options do + options + |> Map.delete(:crash_group) + |> Map.merge(%{group: group_name, crash_group_mode: :temporary}) + end + options = Map.merge(defaults, options) options_to_pass_to_nested = diff --git a/lib/membrane/core/parent/lifecycle_controller.ex b/lib/membrane/core/parent/lifecycle_controller.ex index 311bc4a88..c08ece522 100644 --- a/lib/membrane/core/parent/lifecycle_controller.ex +++ b/lib/membrane/core/parent/lifecycle_controller.ex @@ -134,10 +134,9 @@ defmodule Membrane.Core.Parent.LifecycleController do Membrane.Event.t(), Child.name(), Pad.ref(), - [start_of_stream_received?: boolean()], Parent.state() ) :: Parent.state() - def handle_stream_management_event(%event_type{}, element_name, pad_ref, event_params, state) + def handle_stream_management_event(%event_type{}, element_name, pad_ref, state) when event_type in [Events.StartOfStream, Events.EndOfStream] do callback = case event_type do @@ -148,7 +147,7 @@ defmodule Membrane.Core.Parent.LifecycleController do CallbackHandler.exec_and_handle_callback( callback, Component.action_handler(state), - %{context: &Component.context_from_state(&1, event_params)}, + %{context: &Component.context_from_state/1}, [element_name, pad_ref], state ) diff --git a/lib/membrane/core/pipeline.ex b/lib/membrane/core/pipeline.ex index 32e40f72f..725a762a3 100644 --- a/lib/membrane/core/pipeline.ex +++ b/lib/membrane/core/pipeline.ex @@ -79,18 +79,9 @@ defmodule Membrane.Core.Pipeline do end @impl GenServer - def handle_info( - Message.new(:stream_management_event, [element_name, pad_ref, event, event_params]), - state - ) do + def handle_info(Message.new(:stream_management_event, [element_name, pad_ref, event]), state) do state = - LifecycleController.handle_stream_management_event( - event, - element_name, - pad_ref, - event_params, - state - ) + LifecycleController.handle_stream_management_event(event, element_name, pad_ref, state) {:noreply, state} end diff --git a/lib/membrane/core/pipeline/action_handler.ex b/lib/membrane/core/pipeline/action_handler.ex index ea1196486..712a5ff0f 100644 --- a/lib/membrane/core/pipeline/action_handler.ex +++ b/lib/membrane/core/pipeline/action_handler.ex @@ -9,7 +9,8 @@ defmodule Membrane.Core.Pipeline.ActionHandler do alias Membrane.Core.Pipeline.State @impl CallbackHandler - def handle_action({action, _args}, :handle_init, _params, _state) when action != :spec do + def handle_action({action, _args}, :handle_init, _params, _state) + when action not in [:spec, :playback] do raise ActionError, action: action, reason: {:invalid_callback, :handle_init} end @@ -30,6 +31,11 @@ defmodule Membrane.Core.Pipeline.ActionHandler do Core.LifecycleController.handle_setup_operation(operation, state) end + @impl true + def handle_action({:playback, _playback}, _cb, _params, state) do + state + end + @impl CallbackHandler def handle_action({:notify_child, notification}, _cb, _params, state) do Parent.ChildLifeController.handle_notify_child(notification, state) @@ -46,6 +52,11 @@ defmodule Membrane.Core.Pipeline.ActionHandler do Parent.ChildLifeController.handle_remove_children(children, state) end + @impl CallbackHandler + def handle_action({:remove_child, children}, _cb, _params, state) do + Parent.ChildLifeController.handle_remove_children(children, state) + end + @impl CallbackHandler def handle_action({:remove_link, {child_name, pad_ref}}, _cb, _params, state) do Parent.ChildLifeController.handle_remove_link(child_name, pad_ref, state) diff --git a/lib/membrane/core/pipeline/callback_context.ex b/lib/membrane/core/pipeline/callback_context.ex index d2537b760..ac85b3aad 100644 --- a/lib/membrane/core/pipeline/callback_context.ex +++ b/lib/membrane/core/pipeline/callback_context.ex @@ -4,7 +4,6 @@ defmodule Membrane.Core.Pipeline.CallbackContext do @type optional_fields :: [from: GenServer.from()] | [members: [Membrane.Child.name()], crash_initiator: Membrane.Child.name()] - | [start_of_stream_received?: boolean()] @spec from_state(Membrane.Core.Pipeline.State.t(), optional_fields()) :: Membrane.Pipeline.CallbackContext.t() diff --git a/lib/membrane/core/timer.ex b/lib/membrane/core/timer.ex index 5a23c957c..a304d7375 100644 --- a/lib/membrane/core/timer.ex +++ b/lib/membrane/core/timer.ex @@ -19,7 +19,12 @@ defmodule Membrane.Core.Timer do } @enforce_keys [:interval, :clock, :init_time, :id] - defstruct @enforce_keys ++ [next_tick_time: 0, ratio: Ratio.new(1), timer_ref: nil] + defstruct @enforce_keys ++ + [ + next_tick_time: 0, + ratio: %Ratio{denominator: 1, numerator: 1}, + timer_ref: nil + ] @spec start(id, interval, Clock.t()) :: t def start(id, interval, clock) do @@ -48,6 +53,8 @@ defmodule Membrane.Core.Timer do end def tick(timer) do + use Ratio + %__MODULE__{ id: id, interval: interval, @@ -56,7 +63,7 @@ defmodule Membrane.Core.Timer do ratio: ratio } = timer - next_tick_time = Ratio.add(Ratio.new(next_tick_time), Ratio.new(interval)) + next_tick_time = next_tick_time + interval # Next tick time converted to BEAM clock time beam_next_tick_time = @@ -67,7 +74,7 @@ defmodule Membrane.Core.Timer do timer_ref = Process.send_after(self(), Message.new(:timer_tick, id), beam_next_tick_time, abs: true) - %__MODULE__{timer | next_tick_time: next_tick_time |> Ratio.floor(), timer_ref: timer_ref} + %__MODULE__{timer | next_tick_time: next_tick_time, timer_ref: timer_ref} end @spec set_interval(t, interval) :: t diff --git a/lib/membrane/element/base.ex b/lib/membrane/element/base.ex index ef49ae086..7d834e393 100644 --- a/lib/membrane/element/base.ex +++ b/lib/membrane/element/base.ex @@ -90,7 +90,7 @@ defmodule Membrane.Element.Base do Callback that is called when new pad has beed added to element. Executed ONLY for dynamic pads. - Context passed to this callback contains additional field `:pad_options`. + Context passed to this callback contains additional field `:options`. By default, it does nothing. """ @callback handle_pad_added( @@ -103,7 +103,7 @@ defmodule Membrane.Element.Base do Callback that is called when some pad of the element has beed removed. Executed ONLY for dynamic pads. - Context passed to this callback contains additional field `:pad_options`. + Context passed to this callback contains additional field `:options`. By default, it does nothing. """ @callback handle_pad_removed( diff --git a/lib/membrane/element/callback_context.ex b/lib/membrane/element/callback_context.ex index 8e7a783fd..f93404063 100644 --- a/lib/membrane/element/callback_context.ex +++ b/lib/membrane/element/callback_context.ex @@ -9,12 +9,9 @@ defmodule Membrane.Element.CallbackContext do Field `:incoming_demand` is present only in `c:Membrane.Element.WithOutputPads.handle_demand/5`. - Field `:pad_options` is present only in `c:Membrane.Element.Base.handle_pad_added/3` + Field `:options` is present only in `c:Membrane.Element.Base.handle_pad_added/3` and `c:Membrane.Element.Base.handle_pad_removed/3`. - Field `:start_of_stream_received?` is present only in - `c:Membrane.Element.WithInputPads.handle_end_of_stream/3`. - Field `:old_stream_format` is present only in `c:Membrane.Element.WithInputPads.handle_stream_format/4`. """ @@ -27,8 +24,7 @@ defmodule Membrane.Element.CallbackContext do :resource_guard => Membrane.ResourceGuard.t(), :utility_supervisor => Membrane.UtilitySupervisor.t(), optional(:incoming_demand) => non_neg_integer(), - optional(:pad_options) => map(), - optional(:old_stream_format) => Membrane.StreamFormat.t(), - optional(:start_of_stream_received?) => boolean() + optional(:options) => map(), + optional(:old_stream_format) => Membrane.StreamFormat.t() } end diff --git a/lib/membrane/element/with_input_pads.ex b/lib/membrane/element/with_input_pads.ex index 257bf11f9..65f0302fb 100644 --- a/lib/membrane/element/with_input_pads.ex +++ b/lib/membrane/element/with_input_pads.ex @@ -76,7 +76,39 @@ defmodule Membrane.Element.WithInputPads do state :: Element.state() ) :: Membrane.Element.Base.callback_return() - @optional_callbacks handle_buffer: 4, handle_stream_format: 4 + @callback handle_write( + pad :: Pad.ref(), + buffer :: Buffer.t(), + context :: CallbackContext.t(), + state :: Element.state() + ) :: Membrane.Element.Base.callback_return() + + @callback handle_write_list( + pad :: Pad.ref(), + buffers :: list(Buffer.t()), + context :: CallbackContext.t(), + state :: Element.state() + ) :: Membrane.Element.Base.callback_return() + + @callback handle_process( + pad :: Pad.ref(), + buffer :: Buffer.t(), + context :: CallbackContext.t(), + state :: Element.state() + ) :: Membrane.Element.Base.callback_return() + + @callback handle_process_list( + pad :: Pad.ref(), + buffers :: list(Buffer.t()), + context :: CallbackContext.t(), + state :: Element.state() + ) :: Membrane.Element.Base.callback_return() + + @optional_callbacks handle_stream_format: 4, + handle_write: 4, + handle_write_list: 4, + handle_process: 4, + handle_process_list: 4 @doc PadsSpecs.def_pad_docs(:input, :element) defmacro def_input_pad(name, spec) do diff --git a/lib/membrane/endpoint.ex b/lib/membrane/endpoint.ex index df060d153..3ef1fcbd8 100644 --- a/lib/membrane/endpoint.ex +++ b/lib/membrane/endpoint.ex @@ -37,6 +37,26 @@ defmodule Membrane.Endpoint do @doc false @spec membrane_element_type() :: Membrane.Element.type() def membrane_element_type, do: :endpoint + + @impl true + def handle_buffer(pad, buffer, ctx, state) do + apply(__MODULE__, :handle_write, [pad, buffer, ctx, state]) + end + + @impl true + def handle_buffers_batch(pad, buffers, ctx, state) do + apply(__MODULE__, :handle_write_list, [pad, buffers, ctx, state]) + end + + @impl true + def handle_write_list(pad, buffers, ctx, state) do + args_list = buffers |> Enum.map(&[pad, &1]) + {[split: {:handle_buffer, args_list}], state} + end + + defoverridable handle_buffer: 4, + handle_buffers_batch: 4, + handle_write_list: 4 end end diff --git a/lib/membrane/filter.ex b/lib/membrane/filter.ex index 45de242c4..2ebc69ad7 100644 --- a/lib/membrane/filter.ex +++ b/lib/membrane/filter.ex @@ -39,19 +39,42 @@ defmodule Membrane.Filter do def membrane_element_type, do: :filter @impl true - def handle_stream_format(_pad, stream_format, _context, state), - do: {[forward: stream_format], state} + def handle_stream_format(_pad, stream_format, _context, state) do + {[forward: stream_format], state} + end @impl true - def handle_event(_pad, event, _context, state), do: {[forward: event], state} + def handle_event(_pad, event, _context, state) do + {[forward: event], state} + end @impl true - def handle_end_of_stream(pad, _context, state), - do: {[forward: :end_of_stream], state} + def handle_end_of_stream(pad, _context, state) do + {[forward: :end_of_stream], state} + end + + @impl true + def handle_buffer(pad, buffer, ctx, state) do + apply(__MODULE__, :handle_process, [pad, buffer, ctx, state]) + end + + @impl true + def handle_buffers_batch(pad, buffers, ctx, state) do + apply(__MODULE__, :handle_process_list, [pad, buffers, ctx, state]) + end + + @impl true + def handle_process_list(pad, buffers, ctx, state) do + args_list = buffers |> Enum.map(&[pad, &1]) + {[split: {:handle_buffer, args_list}], state} + end defoverridable handle_stream_format: 4, handle_event: 4, - handle_end_of_stream: 3 + handle_end_of_stream: 3, + handle_buffer: 4, + handle_buffers_batch: 4, + handle_process_list: 4 end end diff --git a/lib/membrane/pad.ex b/lib/membrane/pad.ex index 3e054239f..7a2a7b1e2 100644 --- a/lib/membrane/pad.ex +++ b/lib/membrane/pad.ex @@ -94,7 +94,7 @@ defmodule Membrane.Pad do Can be a module name, pattern describing struct, or call to `any_of` function, which arguments are such patterns or modules names. If a module name is passed to the `:accepted_format` option or is passed to `any_of`, - it will be converted to the match on a struct defined in that module, eg. + it will be converted to the match on a struct defined in that module, e.g. `accepted_format: My.Format` will have this same effect, as `accepted_format: %My.Format{}` and `accepted_format: any_of(My.Format, %My.Another.Format{field: value} when value in [:some, :enumeration])` will have this same effect, as `accepted_format: any_of(%My.Format{}, diff --git a/lib/membrane/pipeline.ex b/lib/membrane/pipeline.ex index 4df6df3ee..374ea8709 100644 --- a/lib/membrane/pipeline.ex +++ b/lib/membrane/pipeline.ex @@ -142,7 +142,7 @@ defmodule Membrane.Pipeline do Callback invoked when a child removes its pad. The callback won't be invoked, when you have initiated the pad removal, - eg. when you have returned `t:Membrane.Pipeline.Action.remove_link()` + e.g. when you have returned `t:Membrane.Pipeline.Action.remove_link()` action which made one of your children's pads be removed. By default, it does nothing. """ @@ -351,6 +351,14 @@ defmodule Membrane.Pipeline do ) :: :ok | {:ok, pid()} | {:error, :timeout} def terminate(pipeline, opts \\ []) do + opts = + if Keyword.has_key?(opts, :blocking?) do + {blocking?, opts} = Keyword.pop!(opts, :blocking?) + Keyword.put(opts, :asynchronous?, not blocking?) + else + opts + end + [asynchronous?: asynchronous?] ++ opts = Keyword.validate!(opts, asynchronous?: false, diff --git a/lib/membrane/pipeline/action.ex b/lib/membrane/pipeline/action.ex index f41809f6e..f551b5d78 100644 --- a/lib/membrane/pipeline/action.ex +++ b/lib/membrane/pipeline/action.ex @@ -21,6 +21,8 @@ defmodule Membrane.Pipeline.Action do """ @type setup :: {:setup, :incomplete | :complete} + @type playback :: {:playback, :playing, :stopped, :prepared} + @typedoc """ Action that sends a message to a child identified by name. """ @@ -41,7 +43,18 @@ defmodule Membrane.Pipeline.Action do as an argument. """ @type remove_children :: - {:remove_children, Child.name() | [Child.name()]} + {:remove_children, + Child.name() + | [Child.name()] + | Child.group() + | [Child.group()]} + + @type remove_child :: + {:remove_child, + Child.name() + | [Child.name()] + | Child.group() + | [Child.group()]} @typedoc """ Action that removes link, which relates to specified child and pad. @@ -132,9 +145,11 @@ defmodule Membrane.Pipeline.Action do """ @type t :: setup + | playback | notify_child | spec | remove_children + | remove_child | remove_link | start_timer | timer_interval diff --git a/lib/membrane/pipeline/callback_context.ex b/lib/membrane/pipeline/callback_context.ex index b352c4896..c47c1077d 100644 --- a/lib/membrane/pipeline/callback_context.ex +++ b/lib/membrane/pipeline/callback_context.ex @@ -8,9 +8,6 @@ defmodule Membrane.Pipeline.CallbackContext do Field `:from` is present only in `c:Membrane.Pipeline.handle_call/3`. - Field `:start_of_stream_received?` is present only in - `c:Membrane.Pipeline.handle_element_end_of_stream/4`. - Fields `:members` and `:crash_initiator` are present only in `c:Membrane.Pipeline.handle_crash_group_down/3`. """ @@ -22,7 +19,6 @@ defmodule Membrane.Pipeline.CallbackContext do :utility_supervisor => Membrane.UtilitySupervisor.t(), optional(:from) => [GenServer.from()], optional(:members) => [Membrane.Child.name()], - optional(:crash_initiator) => Membrane.Child.name(), - optional(:start_of_stream_received?) => boolean() + optional(:crash_initiator) => Membrane.Child.name() } end diff --git a/lib/membrane/sink.ex b/lib/membrane/sink.ex index 6d1c79cd1..cdbca6f52 100644 --- a/lib/membrane/sink.ex +++ b/lib/membrane/sink.ex @@ -32,6 +32,26 @@ defmodule Membrane.Sink do @doc false @spec membrane_element_type() :: Membrane.Element.type() def membrane_element_type, do: :sink + + @impl true + def handle_buffer(pad, buffer, ctx, state) do + apply(__MODULE__, :handle_write, [pad, buffer, ctx, state]) + end + + @impl true + def handle_buffers_batch(pad, buffers, ctx, state) do + apply(__MODULE__, :handle_write_list, [pad, buffers, ctx, state]) + end + + @impl true + def handle_write_list(pad, buffers, ctx, state) do + args_list = buffers |> Enum.map(&[pad, &1]) + {[split: {:handle_buffer, args_list}], state} + end + + defoverridable handle_buffer: 4, + handle_buffers_batch: 4, + handle_write_list: 4 end end diff --git a/lib/membrane/testing/assertions.ex b/lib/membrane/testing/assertions.ex index 496f2bf0f..fe7b0d598 100644 --- a/lib/membrane/testing/assertions.ex +++ b/lib/membrane/testing/assertions.ex @@ -171,6 +171,10 @@ defmodule Membrane.Testing.Assertions do assert_receive_from_pipeline(pipeline, :setup, timeout) end + defmacro assert_pipeline_play(pipeline, timeout \\ @default_timeout) do + assert_receive_from_pipeline(pipeline, :play, timeout) + end + @doc """ Asserts that pipeline received or will receive a message matching `message_pattern` from another process within the `timeout` period specified diff --git a/lib/membrane/testing/pipeline.ex b/lib/membrane/testing/pipeline.ex index d7ae2132c..d1f029916 100644 --- a/lib/membrane/testing/pipeline.ex +++ b/lib/membrane/testing/pipeline.ex @@ -145,7 +145,13 @@ defmodule Membrane.Testing.Pipeline do end defp do_start(type, options) do - :ok = validate_options!(options) + options = + if Keyword.has_key?(options, :structure) do + {spec, options} = Keyword.pop(options, :structure) + [spec: spec] ++ options + else + options + end {process_options, options} = Keyword.split(options, [:name]) options = Keyword.put_new(options, :test_process, self()) @@ -530,19 +536,4 @@ defmodule Membrane.Testing.Pipeline do defp combine_results({custom_actions, custom_state}, {actions, state}) do {Enum.concat(custom_actions, actions), Map.put(state, :custom_pipeline_state, custom_state)} end - - defp validate_options!(options) do - allowed_keys_1 = [:module, :mode, :spec, :test_process, :name, :raise_on_child_pad_removed?] - allowed_keys_2 = [:module, :custom_args, :test_process, :name] - - with {:error, _keys} <- Keyword.validate(options, allowed_keys_1), - {:error, _keys} <- Keyword.validate(options, allowed_keys_2) do - raise """ - Options passed to #{inspect(__MODULE__)} start function has to fulfill type - #{inspect(__MODULE__)}.options, while they are #{inspect(options)} - """ - else - {:ok, _keyword} -> :ok - end - end end diff --git a/lib/membrane/time.ex b/lib/membrane/time.ex index 798247939..455024c5a 100644 --- a/lib/membrane/time.ex +++ b/lib/membrane/time.ex @@ -11,8 +11,6 @@ defmodule Membrane.Time do that do not touch hardware clock, you should use Membrane units for consistency. """ - require Ratio - @compile {:inline, native_units: 1, native_unit: 0, nanoseconds: 1, nanosecond: 0, second: 0, seconds: 1} @@ -35,7 +33,7 @@ defmodule Membrane.Time do # Difference between 01.01.1900 (start of NTP epoch) and 01.01.1970 (start of Unix epoch) in seconds @ntp_unix_epoch_diff 2_208_988_800 - @two_to_pow_32 Ratio.pow(Ratio.new(2, 1), 32) |> Ratio.trunc() + @two_to_pow_32 Ratio.pow(2, 32) @doc """ Checks whether a value is `Membrane.Time.t`. @@ -245,6 +243,21 @@ defmodule Membrane.Time do Ratio.new(timestamp, timebase) |> round_rational() end + @doc """ + Divides timestamp by a timebase. The result is rounded to the nearest integer. + Works this same as `divide_by_timebase/2`. + + ## Examples: + iex> timestamp = 10 |> Membrane.Time.seconds() + iex> timebase = Ratio.new(Membrane.Time.second(), 30) + iex> Membrane.Time.round_to_timebase(timestamp, timebase) + 300 + """ + @spec round_to_timebase(number | Ratio.t(), number | Ratio.t()) :: integer + def round_to_timebase(timestamp, timebase) do + divide_by_timebase(timestamp, timebase) + end + Enum.map(@units, fn unit -> @doc """ Returns one #{unit.singular} in `#{inspect(__MODULE__)}` units. @@ -265,8 +278,12 @@ defmodule Membrane.Time do end # credo:disable-for-next-line Credo.Check.Readability.Specs - def unquote(unit.plural)(number) when Ratio.is_rational(number) do - Ratio.mult(number, Ratio.new(unquote(unit.duration))) + def unquote(unit.plural)(number) do + if not Ratio.is_rational?(number) do + raise "Only integers and rationals can be converted with Membrane.Time.#{unquote(unit.plural)}" + end + + Ratio.*(number, unquote(unit.duration)) |> round_rational() end @@ -287,6 +304,17 @@ defmodule Membrane.Time do :round -> Ratio.new(time, unquote(unit.duration)) |> round_rational() end end + + round_fun_name = :"round_to_#{unit.plural}" + + @doc """ + Works as #{as_fun_name}/2 with `mode` argument set to `:round`. + """ + @spec unquote(round_fun_name)(t) :: integer + # credo:disable-for-next-line Credo.Check.Readability.Specs + def unquote(round_fun_name)(time) when is_time(time) do + unquote(as_fun_name)(time, :round) + end end) defp best_unit(time) do @@ -295,6 +323,7 @@ defmodule Membrane.Time do end defp round_rational(ratio) do + ratio = make_rational(ratio) trunced = Ratio.trunc(ratio) if 2 * sign_of_rational(ratio) * @@ -304,6 +333,14 @@ defmodule Membrane.Time do else: trunced end + defp make_rational(number) do + if Ratio.is_rational?(number) do + number + else + %Ratio{numerator: number, denominator: 1} + end + end + defp sign_of_rational(ratio) do if ratio.numerator == 0, do: 0, else: Ratio.sign(ratio) end diff --git a/mix.exs b/mix.exs index 70c5ff039..d59191784 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.Mixfile do use Mix.Project - @version "1.0.0-rc0" + @version "0.12.9" @source_ref "v#{@version}" def project do @@ -63,6 +63,7 @@ defmodule Membrane.Mixfile do "README.md", "CHANGELOG.md", "CONTRIBUTING.md", + "guides/upgrading/v0.12.md", "guides/upgrading/v0.11.md", "guides/upgrading/v1.0.0-rc0.md", LICENSE: [title: "License"] @@ -141,7 +142,7 @@ defmodule Membrane.Mixfile do {:qex, "~> 0.3"}, {:telemetry, "~> 1.0"}, {:bunch, "~> 1.6"}, - {:ratio, "~> 3.0"}, + {:ratio, "~> 2.0"}, # Development {:ex_doc, "~> 0.28", only: :dev, runtime: false}, {:makeup_diff, "~> 0.1", only: :dev, runtime: false}, diff --git a/mix.lock b/mix.lock index 861da1eb6..837084f9a 100644 --- a/mix.lock +++ b/mix.lock @@ -4,7 +4,7 @@ "coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"}, "credo": {:hex, :credo, "1.7.0", "6119bee47272e85995598ee04f2ebbed3e947678dee048d10b5feca139435f75", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "6839fcf63d1f0d1c0f450abc8564a57c43d644077ab96f2934563e68b8a769d7"}, "dialyxir": {:hex, :dialyxir, "1.4.1", "a22ed1e7bd3a3e3f197b68d806ef66acb61ee8f57b3ac85fc5d57354c5482a93", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "84b795d6d7796297cca5a3118444b80c7d94f7ce247d49886e7c291e1ae49801"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.33", "3c3fd9673bb5dcc9edc28dd90f50c87ce506d1f71b70e3de69aa8154bc695d44", [:mix], [], "hexpm", "2d526833729b59b9fdb85785078697c72ac5e5066350663e5be6a1182da61b8f"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.35", "437773ca9384edf69830e26e9e7b2e0d22d2596c4a6b17094a3b29f01ea65bb8", [:mix], [], "hexpm", "8652ba3cb85608d0d7aa2d21b45c6fad4ddc9a1f9a1f1b30ca3a246f0acc33f6"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.30.6", "5f8b54854b240a2b55c9734c4b1d0dd7bdd41f71a095d42a70445c03cf05a281", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bd48f2ddacf4e482c727f9293d9498e0881597eae6ddc3d9562bd7923375109f"}, "excoveralls": {:hex, :excoveralls, "0.17.1", "83fa7906ef23aa7fc8ad7ee469c357a63b1b3d55dd701ff5b9ce1f72442b2874", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "95bc6fda953e84c60f14da4a198880336205464e75383ec0f570180567985ae0"}, @@ -19,6 +19,6 @@ "nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"}, "numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"}, "qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"}, - "ratio": {:hex, :ratio, "3.0.2", "60a5976872a4dc3d873ecc57eed1738589e99d1094834b9c935b118231297cfb", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "3a13ed5a30ad0bfd7e4a86bf86d93d2b5a06f5904417d38d3f3ea6406cdfc7bb"}, + "ratio": {:hex, :ratio, "2.4.2", "c8518f3536d49b1b00d88dd20d49f8b11abb7819638093314a6348139f14f9f9", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "441ef6f73172a3503de65ccf1769030997b0d533b1039422f1e5e0e0b4cbf89e"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, } diff --git a/test/membrane/api_backcompability_test.exs b/test/membrane/api_backcompability_test.exs new file mode 100644 index 000000000..ae145f558 --- /dev/null +++ b/test/membrane/api_backcompability_test.exs @@ -0,0 +1,52 @@ +defmodule Membrane.APIBackCompabilityTest do + # this module tests if API in membrane_core v0.12 has no breaking changes comparing to api in v0.11 + use ExUnit.Case, async: true + + import Membrane.ChildrenSpec + + alias Membrane.Testing + + test "if action :remove_child works" do + defmodule Filter do + use Membrane.Filter + end + + pipeline = Testing.Pipeline.start_link_supervised!(spec: child(:filter, Filter)) + Process.sleep(100) + filter_pid = Testing.Pipeline.get_child_pid!(pipeline, :filter) + monitor_ref = Process.monitor(filter_pid) + Testing.Pipeline.execute_actions(pipeline, remove_child: :filter) + + assert_receive {:DOWN, ^monitor_ref, _process, _pid, :normal} + + Testing.Pipeline.terminate(pipeline) + end + + test "if `Membrane.Time.round_to_*` functions work" do + module = Membrane.Time + + for {old_function, new_function} <- [ + round_to_days: :as_days, + round_to_hours: :as_hours, + round_to_minutes: :as_minutes, + round_to_seconds: :as_seconds, + round_to_milliseconds: :as_milliseconds, + round_to_microseconds: :as_microseconds, + round_to_nanoseconds: :as_nanoseconds + ], + timestamp_generator <- [:days, :microseconds] do + timestamp = apply(module, timestamp_generator, [3]) + + old_function_result = apply(module, old_function, [timestamp]) + new_function_result = apply(module, new_function, [timestamp, :round]) + + assert old_function_result == new_function_result + end + + timestamp = Membrane.Time.days(15) + Membrane.Time.nanoseconds(13) + timebase = Membrane.Time.milliseconds(2) + + assert Membrane.Time.round_to_timebase(timestamp, timebase) == + Membrane.Time.divide_by_timebase(timestamp, timebase) + end +end diff --git a/test/membrane/clock_test.exs b/test/membrane/clock_test.exs index 85a20b9eb..3019358a3 100644 --- a/test/membrane/clock_test.exs +++ b/test/membrane/clock_test.exs @@ -3,7 +3,7 @@ defmodule Membrane.ClockTest do @module Membrane.Clock - @initial_ratio Ratio.new(1) + @initial_ratio 1 test "should calculate proper ratio and send it to subscribers on each (but the first) update" do {:ok, clock} = @@ -16,8 +16,7 @@ defmodule Membrane.ClockTest do refute_receive {:membrane_clock_ratio, ^clock, _ratio} send(clock, {:membrane_clock_update, 2}) send(clock, time: 13) - two = Ratio.new(2) - assert_receive {:membrane_clock_ratio, ^clock, ^two} + assert_receive {:membrane_clock_ratio, ^clock, 2} send(clock, {:membrane_clock_update, random_time()}) send(clock, time: 33) ratio = Ratio.new(20 + 2, 33 - 3) @@ -25,6 +24,8 @@ defmodule Membrane.ClockTest do end test "should handle different ratio formats" do + use Ratio + {:ok, clock} = @module.start_link(time_provider: fn -> receive do: (time: t -> ms_to_ns(t)) end) @@ -37,9 +38,7 @@ defmodule Membrane.ClockTest do send(clock, {:membrane_clock_update, 5}) send(clock, time: 20) @module.subscribe(clock) - two = Ratio.new(2) - five = Ratio.new(5) - ratio = Ratio.div(Ratio.add(five, Ratio.mult(Ratio.new(1, 3), two)), Ratio.new(20 - 5)) + ratio = (5 + Ratio.new(1, 3) * 2) / (20 - 5) assert_receive {:membrane_clock_ratio, ^clock, ^ratio} end diff --git a/test/membrane/core/element/event_controller_test.exs b/test/membrane/core/element/event_controller_test.exs index 198d13e4e..64a689aeb 100644 --- a/test/membrane/core/element/event_controller_test.exs +++ b/test/membrane/core/element/event_controller_test.exs @@ -83,6 +83,12 @@ defmodule Membrane.Core.Element.EventControllerTest do assert state.pads_data.input.start_of_stream? end + test "ignoring end of stream when there was no start of stream prior", %{state: state} do + state = EventController.handle_event(:input, %Events.EndOfStream{}, state) + refute state.pads_data.input.end_of_stream? + refute state.pads_data.input.start_of_stream? + end + test "end of stream successfully", %{state: state} do state = put_start_of_stream(state, :input) diff --git a/test/membrane/core/element_test.exs b/test/membrane/core/element_test.exs index 61bc87870..3a964ad64 100644 --- a/test/membrane/core/element_test.exs +++ b/test/membrane/core/element_test.exs @@ -324,10 +324,9 @@ defmodule Membrane.Core.ElementTest do {:ok, clock} = Membrane.Clock.start_link() state = Membrane.Core.TimerController.start_timer(:timer, 1000, clock, get_state()) - assert {:noreply, state} = - Element.handle_info({:membrane_clock_ratio, clock, Ratio.new(123)}, state) + assert {:noreply, state} = Element.handle_info({:membrane_clock_ratio, clock, 123}, state) - assert state.synchronization.timers.timer.ratio == Ratio.new(123) + assert state.synchronization.timers.timer.ratio == 123 end test "should set stream sync" do diff --git a/test/membrane/integration/auto_demands_test.exs b/test/membrane/integration/auto_demands_test.exs index 18bdd7b71..4f6e19240 100644 --- a/test/membrane/integration/auto_demands_test.exs +++ b/test/membrane/integration/auto_demands_test.exs @@ -9,8 +9,8 @@ defmodule Membrane.Integration.AutoDemandsTest do defmodule AutoDemandFilter do use Membrane.Filter - def_input_pad :input, accepted_format: _any - def_output_pad :output, accepted_format: _any + def_input_pad :input, accepted_format: _any, flow_control: :auto + def_output_pad :output, accepted_format: _any, flow_control: :auto def_options factor: [default: 1], direction: [default: :up] @@ -38,8 +38,8 @@ defmodule Membrane.Integration.AutoDemandsTest do defmodule AutoDemandTee do use Membrane.Filter - def_input_pad :input, accepted_format: _any - def_output_pad :output, accepted_format: _any, availability: :on_request + def_input_pad :input, accepted_format: _any, flow_control: :auto + def_output_pad :output, accepted_format: _any, availability: :on_request, flow_control: :auto @impl true def handle_buffer(:input, buffer, _ctx, state), do: {[forward: buffer], state} @@ -158,7 +158,7 @@ defmodule Membrane.Integration.AutoDemandsTest do |> Enum.map(fn opts -> test "buffers pass to auto-demand #{opts.name}" do %{name: name, module: module} = unquote(Macro.escape(opts)) - payloads = Enum.map(1..1000, &inspect/1) + payloads = Enum.map(1..100_000, &inspect/1) pipeline = Pipeline.start_link_supervised!( diff --git a/test/membrane/integration/effective_flow_control_resolution_test.exs b/test/membrane/integration/effective_flow_control_resolution_test.exs index 894207fef..da0370ca9 100644 --- a/test/membrane/integration/effective_flow_control_resolution_test.exs +++ b/test/membrane/integration/effective_flow_control_resolution_test.exs @@ -9,10 +9,10 @@ defmodule Membrane.Integration.EffectiveFlowControlResolutionTest do defmodule AutoFilter do use Membrane.Filter - def_input_pad :input, availability: :on_request, accepted_format: _any - def_output_pad :output, availability: :on_request, accepted_format: _any + def_input_pad :input, availability: :on_request, accepted_format: _any, flow_control: :auto + def_output_pad :output, availability: :on_request, accepted_format: _any, flow_control: :auto - def_options lazy?: [spec: boolean(), default: false] + def_options sleep_on_handle_buffer?: [spec: boolean(), default: false] @impl true def handle_playing(_ctx, state) do @@ -21,14 +21,9 @@ defmodule Membrane.Integration.EffectiveFlowControlResolutionTest do @impl true def handle_buffer(_pad, buffer, _ctx, state) do - if state.lazy?, do: Process.sleep(100) + if state.sleep_on_handle_buffer?, do: Process.sleep(100) {[forward: buffer], state} end - - @impl true - def handle_end_of_stream(_pad, _ctx, state) do - {[], state} - end end defmodule DoubleFlowControlSource do @@ -197,7 +192,7 @@ defmodule Membrane.Integration.EffectiveFlowControlResolutionTest do test "Toilet overflows, when it should" do spec = { child(:pull_source, PullSource) - |> child(:filter, %AutoFilter{lazy?: true}), + |> child(:filter, %AutoFilter{sleep_on_handle_buffer?: true}), group: :group, crash_group_mode: :temporary } diff --git a/test/membrane/integration/end_of_stream_test.exs b/test/membrane/integration/end_of_stream_test.exs deleted file mode 100644 index 7bae62b86..000000000 --- a/test/membrane/integration/end_of_stream_test.exs +++ /dev/null @@ -1,48 +0,0 @@ -defmodule Membrane.EndOfStreamTest do - use ExUnit.Case, async: true - - import Membrane.ChildrenSpec - import Membrane.Testing.Assertions - - alias Membrane.Testing - - defmodule EOSSource do - use Membrane.Source - - def_output_pad :output, flow_control: :push, accepted_format: _any - - @impl true - def handle_playing(_ctx, state) do - {[end_of_stream: :output], state} - end - end - - defmodule EOSSink do - use Membrane.Sink - - def_input_pad :input, flow_control: :push, accepted_format: _any - - @impl true - def handle_start_of_stream(_pad, _ctx, _state) do - raise "This callback shouldn't be invoked" - end - - @impl true - def handle_end_of_stream(:input, %{start_of_stream_received?: false}, state) do - {[], state} - end - end - - test "send end of stream without start of stream" do - pipeline = - Testing.Pipeline.start_link_supervised!( - spec: - child(:source, EOSSource) - |> child(:sink, EOSSink) - ) - - assert_end_of_stream(pipeline, :sink) - - Testing.Pipeline.terminate(pipeline) - end -end diff --git a/test/membrane/integration/linking_test.exs b/test/membrane/integration/linking_test.exs index b23cde06c..12ab9aa90 100644 --- a/test/membrane/integration/linking_test.exs +++ b/test/membrane/integration/linking_test.exs @@ -526,8 +526,7 @@ defmodule Membrane.Integration.LinkingTest do defmodule Sink do use Membrane.Sink - def_input_pad :input, - accepted_format: _any + def_input_pad :input, accepted_format: _any, flow_control: :auto @impl true def handle_init(_ctx, _opts) do diff --git a/test/membrane/integration/links_validation_test.exs b/test/membrane/integration/links_validation_test.exs index 9f69e2f38..05c443640 100644 --- a/test/membrane/integration/links_validation_test.exs +++ b/test/membrane/integration/links_validation_test.exs @@ -96,27 +96,29 @@ defmodule Membrane.LinksValidationTest do describe "returning a spec with links to already used" do test "static pads" do - spec = child(:source, StaticPads.Source) |> child(:sink, StaticPads.Sink) - - pipeline = Pipeline.start_supervised!(spec: spec) + pipeline = Pipeline.start_supervised!() ref = Process.monitor(pipeline) - spec = get_child(:source) |> get_child(:sink) + spec = child(:source, StaticPads.Source) |> child(:sink, StaticPads.Sink) + Pipeline.execute_actions(pipeline, spec: spec) + spec = get_child(:source) |> get_child(:sink) Pipeline.execute_actions(pipeline, spec: spec) assert_receive({:DOWN, ^ref, :process, ^pipeline, {%Membrane.LinkError{}, _stacktrace}}) end test "dynamic pads" do + pipeline = Pipeline.start_supervised!() + ref = Process.monitor(pipeline) + spec = child(:source, DynamicPads.Source) |> via_out(Pad.ref(:output, 1)) |> via_in(Pad.ref(:input, 1)) |> child(:sink, DynamicPads.Sink) - pipeline = Pipeline.start_supervised!(spec: spec) - ref = Process.monitor(pipeline) + Pipeline.execute_actions(pipeline, spec: spec) spec = get_child(:source)