Skip to content

Commit

Permalink
Merge pull request #15 from membraneframework/auto-demand
Browse files Browse the repository at this point in the history
switch to auto demands
  • Loading branch information
mat-hek authored Feb 16, 2022
2 parents f7ef74b + c712620 commit 00920bf
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 49 deletions.
10 changes: 2 additions & 8 deletions lib/membrane_tee_plugin/master.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ defmodule Membrane.Tee.Master do

def_input_pad :input,
availability: :always,
mode: :pull,
demand_unit: :buffers,
demand_mode: :auto,
caps: :any

def_output_pad :master,
availability: :always,
mode: :pull,
demand_mode: :auto,
caps: :any

def_output_pad :copy,
Expand All @@ -32,9 +31,4 @@ defmodule Membrane.Tee.Master do
def handle_process(:input, %Membrane.Buffer{} = buffer, _ctx, state) do
{{:ok, forward: buffer}, state}
end

@impl true
def handle_demand(:master, size, :buffers, _ctx, state) do
{{:ok, demand: {:input, size}}, state}
end
end
38 changes: 3 additions & 35 deletions lib/membrane_tee_plugin/parallel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,57 +6,25 @@ defmodule Membrane.Tee.Parallel do
To use, link this element to one preceding element via `input` pad and multiple
succesive elements via `output` pads. Each buffer is forwarded only when demand for
it comes in via each output.
it comes in via each output. If there are no outputs, buffers are dropped.
"""

use Membrane.Filter

def_input_pad :input,
availability: :always,
mode: :pull,
demand_unit: :buffers,
demand_mode: :auto,
caps: :any

def_output_pad :output,
availability: :on_request,
mode: :pull,
demand_mode: :auto,
caps: :any

@impl true
def handle_init(_) do
state = %{}
{:ok, state}
end

@impl true
def handle_process(:input, %Membrane.Buffer{} = buffer, _ctx, state) do
{{:ok, forward: buffer}, state}
end

@impl true
def handle_demand(Pad.ref(:output, _id), _size, :buffers, ctx, state) do
{{:ok, make_demands(ctx)}, state}
end

@impl true
def handle_pad_removed(Pad.ref(:output, _id), %{playback_state: :playing} = ctx, state) do
{{:ok, make_demands(ctx)}, state}
end

@impl true
def handle_pad_removed(Pad.ref(:output, _id), _ctx, state) do
{:ok, state}
end

defp make_demands(ctx) do
minimal_size =
ctx.pads
|> Bunch.KVEnum.values()
|> Enum.filter(&(&1.direction == :output))
|> Enum.map(& &1.demand)
|> Enum.min(fn -> 0 end)
|> max(0)

[demand: {:input, minimal_size}]
end
end
6 changes: 0 additions & 6 deletions test/membrane_tee_plugin/master/master_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,5 @@ defmodule Membrane.Tee.MasterTest do
assert {{:ok, actions}, _state} = Master.handle_event(:input, event, nil, %{})
assert actions == [forward: event]
end

test "accepts demands from master" do
size = 100
assert {{:ok, actions}, _state} = Master.handle_demand(:master, size, :buffers, nil, %{})
assert actions == [demand: {:input, size}]
end
end
end

0 comments on commit 00920bf

Please sign in to comment.