diff --git a/lib/ex_webrtc/rtp/jitter_buffer.ex b/lib/ex_webrtc/rtp/jitter_buffer.ex index 4166442..04a5a0a 100644 --- a/lib/ex_webrtc/rtp/jitter_buffer.ex +++ b/lib/ex_webrtc/rtp/jitter_buffer.ex @@ -7,177 +7,160 @@ defmodule ExWebRTC.RTP.JitterBuffer do # Heavily inspired by: # https://github.com/membraneframework/membrane_rtp_plugin/blob/23f3279540aea7dea3a194fd5a1680c2549aebae/lib/membrane/rtp/jitter_buffer.ex - use GenServer - alias ExWebRTC.RTP.JitterBuffer.PacketStore + alias ExRTP.Packet @default_latency_ms 200 @typedoc """ - Messages sent by the `#{inspect(__MODULE__)}` process to its controlling process. + Options that can be passed to `new/1`. - * `{:packet, packet}` - packet flushed from the buffer + * `latency` - latency introduced by the buffer, in milliseconds. `#{@default_latency_ms}` by default. """ - @type message() :: {:jitter_buffer, pid(), {:packet, ExRTP.Packet.t()}} + @type options :: [latency: non_neg_integer()] @typedoc """ - Options that can be passed to `start_link/1`. - - * `controlling_process` - a pid of a process where all messages will be sent. `self()` by default. - * `latency` - latency introduced by the buffer, in milliseconds. `#{@default_latency_ms}` by default. + Time (in milliseconds) after which `handle_timer/1` should be called. + Can be `nil`, in which case no timer needs to be set. """ - @type options :: [controlling_process: Process.dest(), latency: non_neg_integer()] - - @type jitter_buffer :: GenServer.server() + @type timer :: non_neg_integer() | nil - @doc """ - Starts a new `#{inspect(__MODULE__)}` process. + @typedoc """ + The 3-element tuple returned by all functions other than `new/1`. - `#{inspect(__MODULE__)}` is a `GenServer` under the hood, thus this function allows for - passing the generic `t:GenServer.options/0` as an argument. + * `buffer` - `t:#{inspect(__MODULE__)}.t/0`. + * `packets` - a list with packets flushed from the buffer as a result of the function call. May be empty. + * `timer_duration_ms` - see `t:timer/0`. - Note: The buffer *won't* output any packets - until `start_timer/1` is called. + Generally speaking, all results of this type can be handled in the same way. """ - @spec start(options(), GenServer.options()) :: GenServer.on_start() - def start(opts \\ [], gen_server_opts \\ []) do - opts = Keyword.put_new(opts, :controlling_process, self()) - GenServer.start(__MODULE__, opts, gen_server_opts) - end + @type result :: {buffer :: t(), packets :: [Packet.t()], timer_duration_ms :: timer()} - @doc """ - Starts and links to a new `#{inspect(__MODULE__)}` process. + @opaque t :: %__MODULE__{ + latency: non_neg_integer(), + store: PacketStore.t(), + state: :initial_wait | :timer_set | :timer_not_set + } - Works identically to `start/2`, but links to the calling process. - """ - @spec start_link(options(), GenServer.options()) :: GenServer.on_start() - def start_link(opts \\ [], gen_server_opts \\ []) do - opts = Keyword.put_new(opts, :controlling_process, self()) - GenServer.start_link(__MODULE__, opts, gen_server_opts) - end + @enforce_keys [:latency] + defstruct @enforce_keys ++ + [ + store: %PacketStore{}, + state: :initial_wait + ] @doc """ - Starts the initial latency timer. - - The buffer will start to output packets `latency` milliseconds after this function is called. + Creates a new `t:#{inspect(__MODULE__)}.t/0`. """ - @spec start_timer(jitter_buffer()) :: :ok - def start_timer(buffer) do - GenServer.call(buffer, :start_timer) + @spec new(options()) :: t() + def new(opts \\ []) do + %__MODULE__{ + latency: opts[:latency] || @default_latency_ms + } end @doc """ Places a packet in the JitterBuffer. - Returns `:ok` even if the packet was rejected due to being late. + Note: The initial latency timer will be set after the first packet is inserted into the buffer. + If you want to start it at your own discretion, schedule a `handle_timer/1` call prior to that. """ - @spec place_packet(jitter_buffer(), ExRTP.Packet.t()) :: :ok + @spec place_packet(t(), Packet.t()) :: result() + def place_packet(buffer, packet) + + def place_packet(%{state: :initial_wait} = buffer, packet) do + {buffer, timer} = maybe_set_timer(buffer) + {_result, buffer} = try_insert_packet(buffer, packet) + + {buffer, [], timer} + end + def place_packet(buffer, packet) do - GenServer.cast(buffer, {:packet, packet}) + case try_insert_packet(buffer, packet) do + {:ok, buffer} -> send_packets(buffer) + {:error, buffer} -> {buffer, [], nil} + end end @doc """ Flushes all remaining packets and resets the JitterBuffer. - After flushing, the rollover counter is set to `0` and the buffer *won't* output any packets - until `start_timer/1` is called again. + Note: After flushing, the rollover counter is reset to `0`. """ - @spec flush(jitter_buffer()) :: :ok + @spec flush(t()) :: result() def flush(buffer) do - GenServer.call(buffer, :flush) - end - - @impl true - def init(opts) do - owner = Keyword.fetch!(opts, :controlling_process) - latency = opts[:latency] || @default_latency_ms - - state = %{ - latency: latency, - owner: owner, - store: %PacketStore{}, - waiting?: true, - max_latency_timer: nil - } + packets = + buffer.store + |> PacketStore.dump() + |> records_to_packets() - {:ok, state} + {%__MODULE__{latency: buffer.latency}, packets, nil} end - @impl true - def handle_call(:start_timer, _from, state) do - Process.send_after(self(), :initial_latency_passed, state.latency) - {:reply, :ok, state} + @doc """ + Handles the end of a previously set timer. + """ + @spec handle_timer(t()) :: result() + def handle_timer(buffer) do + %__MODULE__{buffer | state: :timer_not_set} |> send_packets() end - @impl true - def handle_call(:flush, _from, %{store: store} = state) do - store - |> PacketStore.dump() - |> Enum.each(&process_flushed_record(&1, state.owner)) - - {:reply, :ok, %{state | store: %PacketStore{}, waiting?: true}} + @spec try_insert_packet(t(), Packet.t()) :: {:ok | :error, t()} + defp try_insert_packet(buffer, packet) do + case PacketStore.insert_packet(buffer.store, packet) do + {:ok, store} -> {:ok, %__MODULE__{buffer | store: store}} + {:error, :late_packet} -> {:error, buffer} + end end - @impl true - def handle_cast({:packet, packet}, state) do - state = - case PacketStore.insert_packet(state.store, packet) do - {:ok, result} -> - state = %{state | store: result} - - if state.waiting?, do: state, else: send_packets(state) + @spec send_packets(t()) :: result() + defp send_packets(%{store: store} = buffer) do + # Flush packets that stayed in queue longer than latency and any gaps before them + {too_old_records, store} = PacketStore.flush_older_than(store, buffer.latency) + # Additionally, flush packets as long as there are no gaps + {gapless_records, store} = PacketStore.flush_ordered(store) - {:error, :late_packet} -> - state - end + packets = + too_old_records + |> Stream.concat(gapless_records) + |> records_to_packets() - {:noreply, state} - end + {buffer, timer} = maybe_set_timer(%__MODULE__{buffer | store: store}) - @impl true - def handle_info(:initial_latency_passed, state) do - state = %{state | waiting?: false} |> send_packets() - {:noreply, state} + {buffer, packets, timer} end - @impl true - def handle_info(:send_packets, state) do - state = %{state | max_latency_timer: nil} |> send_packets() - {:noreply, state} + @spec records_to_packets(Enumerable.t(PacketStore.Record.t())) :: [Packet.t()] + defp records_to_packets(records) do + records + # TODO: nil -- missing packet (maybe owner should be notified about that) + |> Stream.reject(&is_nil/1) + |> Enum.map(& &1.packet) end - defp send_packets(%{store: store} = state) do - # Flushes packets that stayed in queue longer than latency and any gaps before them - {too_old_records, store} = PacketStore.flush_older_than(store, state.latency) - # Additionally, flush packets as long as there are no gaps - {gapless_records, store} = PacketStore.flush_ordered(store) - - Enum.each(too_old_records ++ gapless_records, &process_flushed_record(&1, state.owner)) + @spec maybe_set_timer(t()) :: {t(), timer()} + defp maybe_set_timer(buffer) - %{state | store: store} |> set_timer() + defp maybe_set_timer(%{state: :initial_wait} = buffer) do + case PacketStore.first_record_timestamp(buffer.store) do + # If we're inserting the very first packet, set the initial latency timer + nil -> {buffer, buffer.latency} + _ts -> {buffer, nil} + end end - # TODO: nil -- missing packet (maybe owner should be notified about that) - defp process_flushed_record(nil, _owner), do: :noop - defp process_flushed_record(%{packet: packet}, owner), do: notify(owner, {:packet, packet}) - - defp notify(owner, msg), do: send(owner, {:jitter_buffer, self(), msg}) - - defp set_timer(%{max_latency_timer: nil, latency: latency} = state) do - new_timer = - case PacketStore.first_record_timestamp(state.store) do - nil -> - nil - - timestamp_ms -> - since_insertion = System.monotonic_time(:millisecond) - timestamp_ms - send_after_time = max(0, latency - since_insertion) + defp maybe_set_timer(%{state: :timer_not_set} = buffer) do + case PacketStore.first_record_timestamp(buffer.store) do + nil -> + {buffer, nil} - Process.send_after(self(), :send_packets, send_after_time) - end + timestamp_ms -> + since_insertion = System.monotonic_time(:millisecond) - timestamp_ms + send_after_time = max(0, buffer.latency - since_insertion) - %{state | max_latency_timer: new_timer} + {%__MODULE__{buffer | state: :timer_set}, send_after_time} + end end - defp set_timer(%{max_latency_timer: timer} = state) when timer != nil, do: state + defp maybe_set_timer(%{state: :timer_set} = buffer), do: {buffer, nil} end