Skip to content

Commit

Permalink
JitterBuffer is no longer a process
Browse files Browse the repository at this point in the history
  • Loading branch information
sgfn committed Aug 21, 2024
1 parent 5a2976b commit 9f86562
Showing 1 changed file with 102 additions and 119 deletions.
221 changes: 102 additions & 119 deletions lib/ex_webrtc/rtp/jitter_buffer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,177 +7,160 @@ defmodule ExWebRTC.RTP.JitterBuffer do
# Heavily inspired by:
# https://github.com/membraneframework/membrane_rtp_plugin/blob/23f3279540aea7dea3a194fd5a1680c2549aebae/lib/membrane/rtp/jitter_buffer.ex

use GenServer

alias ExWebRTC.RTP.JitterBuffer.PacketStore
alias ExRTP.Packet

@default_latency_ms 200

@typedoc """
Messages sent by the `#{inspect(__MODULE__)}` process to its controlling process.
Options that can be passed to `new/1`.
* `{:packet, packet}` - packet flushed from the buffer
* `latency` - latency introduced by the buffer, in milliseconds. `#{@default_latency_ms}` by default.
"""
@type message() :: {:jitter_buffer, pid(), {:packet, ExRTP.Packet.t()}}
@type options :: [latency: non_neg_integer()]

@typedoc """
Options that can be passed to `start_link/1`.
* `controlling_process` - a pid of a process where all messages will be sent. `self()` by default.
* `latency` - latency introduced by the buffer, in milliseconds. `#{@default_latency_ms}` by default.
Time (in milliseconds) after which `handle_timer/1` should be called.
Can be `nil`, in which case no timer needs to be set.
"""
@type options :: [controlling_process: Process.dest(), latency: non_neg_integer()]

@type jitter_buffer :: GenServer.server()
@type timer :: non_neg_integer() | nil

@doc """
Starts a new `#{inspect(__MODULE__)}` process.
@typedoc """
The 3-element tuple returned by all functions other than `new/1`.
`#{inspect(__MODULE__)}` is a `GenServer` under the hood, thus this function allows for
passing the generic `t:GenServer.options/0` as an argument.
* `buffer` - `t:#{inspect(__MODULE__)}.t/0`.
* `packets` - a list with packets flushed from the buffer as a result of the function call. May be empty.
* `timer_duration_ms` - see `t:timer/0`.
Note: The buffer *won't* output any packets
until `start_timer/1` is called.
Generally speaking, all results of this type can be handled in the same way.
"""
@spec start(options(), GenServer.options()) :: GenServer.on_start()
def start(opts \\ [], gen_server_opts \\ []) do
opts = Keyword.put_new(opts, :controlling_process, self())
GenServer.start(__MODULE__, opts, gen_server_opts)
end
@type result :: {buffer :: t(), packets :: [Packet.t()], timer_duration_ms :: timer()}

@doc """
Starts and links to a new `#{inspect(__MODULE__)}` process.
@opaque t :: %__MODULE__{
latency: non_neg_integer(),
store: PacketStore.t(),
state: :initial_wait | :timer_set | :timer_not_set
}

Works identically to `start/2`, but links to the calling process.
"""
@spec start_link(options(), GenServer.options()) :: GenServer.on_start()
def start_link(opts \\ [], gen_server_opts \\ []) do
opts = Keyword.put_new(opts, :controlling_process, self())
GenServer.start_link(__MODULE__, opts, gen_server_opts)
end
@enforce_keys [:latency]
defstruct @enforce_keys ++
[
store: %PacketStore{},
state: :initial_wait
]

@doc """
Starts the initial latency timer.
The buffer will start to output packets `latency` milliseconds after this function is called.
Creates a new `t:#{inspect(__MODULE__)}.t/0`.
"""
@spec start_timer(jitter_buffer()) :: :ok
def start_timer(buffer) do
GenServer.call(buffer, :start_timer)
@spec new(options()) :: t()
def new(opts \\ []) do
%__MODULE__{
latency: opts[:latency] || @default_latency_ms
}
end

@doc """
Places a packet in the JitterBuffer.
Returns `:ok` even if the packet was rejected due to being late.
Note: The initial latency timer will be set after the first packet is inserted into the buffer.
If you want to start it at your own discretion, schedule a `handle_timer/1` call prior to that.
"""
@spec place_packet(jitter_buffer(), ExRTP.Packet.t()) :: :ok
@spec place_packet(t(), Packet.t()) :: result()
def place_packet(buffer, packet)

def place_packet(%{state: :initial_wait} = buffer, packet) do
{buffer, timer} = maybe_set_timer(buffer)
{_result, buffer} = try_insert_packet(buffer, packet)

{buffer, [], timer}
end

def place_packet(buffer, packet) do
GenServer.cast(buffer, {:packet, packet})
case try_insert_packet(buffer, packet) do
{:ok, buffer} -> send_packets(buffer)
{:error, buffer} -> {buffer, [], nil}
end
end

@doc """
Flushes all remaining packets and resets the JitterBuffer.
After flushing, the rollover counter is set to `0` and the buffer *won't* output any packets
until `start_timer/1` is called again.
Note: After flushing, the rollover counter is reset to `0`.
"""
@spec flush(jitter_buffer()) :: :ok
@spec flush(t()) :: result()
def flush(buffer) do
GenServer.call(buffer, :flush)
end

@impl true
def init(opts) do
owner = Keyword.fetch!(opts, :controlling_process)
latency = opts[:latency] || @default_latency_ms

state = %{
latency: latency,
owner: owner,
store: %PacketStore{},
waiting?: true,
max_latency_timer: nil
}
packets =
buffer.store
|> PacketStore.dump()
|> records_to_packets()

{:ok, state}
{%__MODULE__{latency: buffer.latency}, packets, nil}
end

@impl true
def handle_call(:start_timer, _from, state) do
Process.send_after(self(), :initial_latency_passed, state.latency)
{:reply, :ok, state}
@doc """
Handles the end of a previously set timer.
"""
@spec handle_timer(t()) :: result()
def handle_timer(buffer) do
%__MODULE__{buffer | state: :timer_not_set} |> send_packets()
end

@impl true
def handle_call(:flush, _from, %{store: store} = state) do
store
|> PacketStore.dump()
|> Enum.each(&process_flushed_record(&1, state.owner))

{:reply, :ok, %{state | store: %PacketStore{}, waiting?: true}}
@spec try_insert_packet(t(), Packet.t()) :: {:ok | :error, t()}
defp try_insert_packet(buffer, packet) do
case PacketStore.insert_packet(buffer.store, packet) do
{:ok, store} -> {:ok, %__MODULE__{buffer | store: store}}
{:error, :late_packet} -> {:error, buffer}
end
end

@impl true
def handle_cast({:packet, packet}, state) do
state =
case PacketStore.insert_packet(state.store, packet) do
{:ok, result} ->
state = %{state | store: result}

if state.waiting?, do: state, else: send_packets(state)
@spec send_packets(t()) :: result()
defp send_packets(%{store: store} = buffer) do
# Flush packets that stayed in queue longer than latency and any gaps before them
{too_old_records, store} = PacketStore.flush_older_than(store, buffer.latency)
# Additionally, flush packets as long as there are no gaps
{gapless_records, store} = PacketStore.flush_ordered(store)

{:error, :late_packet} ->
state
end
packets =
too_old_records
|> Stream.concat(gapless_records)
|> records_to_packets()

{:noreply, state}
end
{buffer, timer} = maybe_set_timer(%__MODULE__{buffer | store: store})

@impl true
def handle_info(:initial_latency_passed, state) do
state = %{state | waiting?: false} |> send_packets()
{:noreply, state}
{buffer, packets, timer}
end

@impl true
def handle_info(:send_packets, state) do
state = %{state | max_latency_timer: nil} |> send_packets()
{:noreply, state}
@spec records_to_packets(Enumerable.t(PacketStore.Record.t())) :: [Packet.t()]
defp records_to_packets(records) do
records
# TODO: nil -- missing packet (maybe owner should be notified about that)
|> Stream.reject(&is_nil/1)
|> Enum.map(& &1.packet)
end

defp send_packets(%{store: store} = state) do
# Flushes packets that stayed in queue longer than latency and any gaps before them
{too_old_records, store} = PacketStore.flush_older_than(store, state.latency)
# Additionally, flush packets as long as there are no gaps
{gapless_records, store} = PacketStore.flush_ordered(store)

Enum.each(too_old_records ++ gapless_records, &process_flushed_record(&1, state.owner))
@spec maybe_set_timer(t()) :: {t(), timer()}
defp maybe_set_timer(buffer)

%{state | store: store} |> set_timer()
defp maybe_set_timer(%{state: :initial_wait} = buffer) do
case PacketStore.first_record_timestamp(buffer.store) do
# If we're inserting the very first packet, set the initial latency timer
nil -> {buffer, buffer.latency}
_ts -> {buffer, nil}
end
end

# TODO: nil -- missing packet (maybe owner should be notified about that)
defp process_flushed_record(nil, _owner), do: :noop
defp process_flushed_record(%{packet: packet}, owner), do: notify(owner, {:packet, packet})

defp notify(owner, msg), do: send(owner, {:jitter_buffer, self(), msg})

defp set_timer(%{max_latency_timer: nil, latency: latency} = state) do
new_timer =
case PacketStore.first_record_timestamp(state.store) do
nil ->
nil

timestamp_ms ->
since_insertion = System.monotonic_time(:millisecond) - timestamp_ms
send_after_time = max(0, latency - since_insertion)
defp maybe_set_timer(%{state: :timer_not_set} = buffer) do
case PacketStore.first_record_timestamp(buffer.store) do
nil ->
{buffer, nil}

Process.send_after(self(), :send_packets, send_after_time)
end
timestamp_ms ->
since_insertion = System.monotonic_time(:millisecond) - timestamp_ms
send_after_time = max(0, buffer.latency - since_insertion)

%{state | max_latency_timer: new_timer}
{%__MODULE__{buffer | state: :timer_set}, send_after_time}
end
end

defp set_timer(%{max_latency_timer: timer} = state) when timer != nil, do: state
defp maybe_set_timer(%{state: :timer_set} = buffer), do: {buffer, nil}
end

0 comments on commit 9f86562

Please sign in to comment.