diff --git a/lib/ex_webrtc/rtp/jitter_buffer.ex b/lib/ex_webrtc/rtp/jitter_buffer.ex new file mode 100644 index 00000000..1e4bebaa --- /dev/null +++ b/lib/ex_webrtc/rtp/jitter_buffer.ex @@ -0,0 +1,181 @@ +defmodule ExWebRTC.RTP.JitterBuffer do + @moduledoc """ + Buffers and reorders RTP packets based on `sequence_number`, introducing controlled latency + in order to combat network jitter and improve the QoS. + """ + + # Heavily inspired by: + # https://github.com/membraneframework/membrane_rtp_plugin/blob/23f3279540aea7dea3a194fd5a1680c2549aebae/lib/membrane/rtp/jitter_buffer.ex + + use GenServer + + alias ExWebRTC.RTP.JitterBuffer.PacketStore + + @default_latency_ms 200 + + @typedoc """ + Messages sent by the `#{inspect(__MODULE__)}` process to its controlling process. + + * `{:packet, packet}` - packet flushed from the buffer + """ + @type message() :: {:jitter_buffer, pid(), {:packet, ExRTP.Packet.t()}} + + @typedoc """ + Options that can be passed to `#{inspect(__MODULE__)}.start_link/1`. + + * `controlling_process` - a pid of a process where all messages will be sent. `self()` by default. + * `latency` - latency introduced by the buffer, in milliseconds. `#{@default_latency_ms}` by default. + """ + @type options :: [{:controlling_process, Process.dest()}, {:latency, non_neg_integer()}] + + @doc """ + Starts a new `#{inspect(__MODULE__)}` process. + + `#{inspect(__MODULE__)}` is a `GenServer` under the hood, thus this function allows for + passing the generic `t:GenServer.options/0` as an argument. + + Note: The buffer *won't* output any packets + until `#{inspect(__MODULE__)}.start_timer/1` is called. + """ + @spec start(options(), GenServer.options()) :: GenServer.on_start() + def start(opts \\ [], gen_server_opts \\ []) do + opts = Keyword.put_new(opts, :controlling_process, self()) + GenServer.start(__MODULE__, opts, gen_server_opts) + end + + @doc """ + Starts and links to a new `#{inspect(__MODULE__)}` process. + + Works identically to `start/2`, but links to the calling process. + """ + @spec start_link(options(), GenServer.options()) :: GenServer.on_start() + def start_link(opts \\ [], gen_server_opts \\ []) do + opts = Keyword.put_new(opts, :controlling_process, self()) + GenServer.start_link(__MODULE__, opts, gen_server_opts) + end + + @doc """ + Starts the initial latency timer. + + The buffer will start to output packets `latency` milliseconds after this function is called. + """ + @spec start_timer(GenServer.server()) :: :ok + def start_timer(buffer) do + GenServer.cast(buffer, :start_timer) + end + + @doc """ + Places a packet in the JitterBuffer. + + Returns `:ok` even if the packet was rejected due to being late. + """ + @spec place_packet(GenServer.server(), ExRTP.Packet.t()) :: :ok + def place_packet(buffer, packet) do + GenServer.cast(buffer, {:packet, packet}) + end + + @doc """ + Flushes all remaining packets and resets the JitterBuffer. + + After flushing, the rollover counter is set to `0` and the buffer *won't* output any packets + until `#{inspect(__MODULE__)}.start_timer/1` is called again. + """ + @spec flush(GenServer.server()) :: :ok + def flush(buffer) do + GenServer.cast(buffer, :flush) + end + + @impl true + def init(opts) do + owner = Keyword.fetch!(opts, :controlling_process) + latency = opts[:latency] || @default_latency_ms + + state = %{ + latency: latency, + owner: owner, + store: %PacketStore{}, + waiting?: true, + max_latency_timer: nil + } + + {:ok, state} + end + + @impl true + def handle_cast(:start_timer, state) do + Process.send_after(self(), :initial_latency_passed, state.latency) + {:noreply, state} + end + + @impl true + def handle_cast({:packet, packet}, state) do + state = + case PacketStore.insert_packet(state.store, packet) do + {:ok, result} -> + state = %{state | store: result} + + if state.waiting?, do: state, else: send_packets(state) + + {:error, :late_packet} -> + state + end + + {:noreply, state} + end + + @impl true + def handle_cast(:flush, %{store: store} = state) do + store + |> PacketStore.dump() + |> Enum.each(&process_flushed_record(&1, state.owner)) + + {:noreply, %{state | store: %PacketStore{}, waiting?: true}} + end + + @impl true + def handle_info(:initial_latency_passed, state) do + state = %{state | waiting?: false} |> send_packets() + {:noreply, state} + end + + @impl true + def handle_info(:send_packets, state) do + state = %{state | max_latency_timer: nil} |> send_packets() + {:noreply, state} + end + + defp send_packets(%{store: store} = state) do + # Flushes packets that stayed in queue longer than latency and any gaps before them + {too_old_records, store} = PacketStore.flush_older_than(store, state.latency) + # Additionally, flush packets as long as there are no gaps + {gapless_records, store} = PacketStore.flush_ordered(store) + + Enum.each(too_old_records ++ gapless_records, &process_flushed_record(&1, state.owner)) + + %{state | store: store} |> set_timer() + end + + # TODO: nil -- missing packet (maybe owner should be notified about that) + defp process_flushed_record(nil, _owner), do: :noop + defp process_flushed_record(%{packet: packet}, owner), do: notify(owner, {:packet, packet}) + + defp notify(owner, msg), do: send(owner, {:jitter_buffer, self(), msg}) + + defp set_timer(%{max_latency_timer: nil, latency: latency} = state) do + new_timer = + case PacketStore.first_record_timestamp(state.store) do + nil -> + nil + + timestamp_ms -> + since_insertion = System.monotonic_time(:millisecond) - timestamp_ms + send_after_time = max(0, latency - since_insertion) + + Process.send_after(self(), :send_packets, send_after_time) + end + + %{state | max_latency_timer: new_timer} + end + + defp set_timer(%{max_latency_timer: timer} = state) when timer != nil, do: state +end diff --git a/lib/ex_webrtc/rtp/jitter_buffer/packet_store.ex b/lib/ex_webrtc/rtp/jitter_buffer/packet_store.ex new file mode 100644 index 00000000..ba8efeb0 --- /dev/null +++ b/lib/ex_webrtc/rtp/jitter_buffer/packet_store.ex @@ -0,0 +1,262 @@ +defmodule ExWebRTC.RTP.JitterBuffer.PacketStore do + @moduledoc false + + # Store for RTP packets. Packets are stored in `Heap` ordered by packet index. Packet index is + # defined in RFC 3711 (SRTP) as: 2^16 * rollover count + sequence number. + + import Bitwise + + defmodule Record do + @moduledoc false + # Describes a structure that is stored in the PacketStore. + + @enforce_keys [:index, :timestamp_ms, :packet] + defstruct @enforce_keys + + @type t :: %__MODULE__{ + index: non_neg_integer(), + timestamp_ms: integer(), + packet: ExRTP.Packet.t() + } + + @spec new(ExRTP.Packet.t(), non_neg_integer()) :: t() + def new(packet, index) do + %__MODULE__{ + index: index, + timestamp_ms: System.monotonic_time(:millisecond), + packet: packet + } + end + + @doc """ + Compares two records. + + Returns true if the first record is older than the second one. + """ + @spec comparator(t(), t()) :: boolean() + # Designed to be used with Heap: https://gitlab.com/jimsy/heap/blob/master/lib/heap.ex#L71 + def comparator(%__MODULE__{index: l_index}, %__MODULE__{index: r_index}), + do: l_index < r_index + end + + @seq_number_limit bsl(1, 16) + + defstruct flush_index: nil, + highest_incoming_index: nil, + heap: Heap.new(&Record.comparator/2), + set: MapSet.new(), + rollover_count: 0 + + @typedoc """ + Type describing PacketStore structure. + + Fields: + - `flush_index` - index of the last packet that has been emitted (or would have been + emitted, but never arrived) as a result of a call to one of the `flush` functions + - `highest_incoming_index` - the highest index in the buffer so far, mapping to the most recently produced + RTP packet placed in JitterBuffer + - `rollover_count` - count of all performed rollovers (cycles of sequence number) + - `heap` - contains records containing packets + - `set` - helper structure for faster read operations; content is the same as in `heap` + """ + @type t :: %__MODULE__{ + flush_index: non_neg_integer() | nil, + highest_incoming_index: non_neg_integer() | nil, + heap: Heap.t(), + set: MapSet.t(), + rollover_count: non_neg_integer() + } + + @doc """ + Inserts a packet into the Store. + + Each subsequent packet must have sequence number greater than the previously returned + one or be part of a rollover. + """ + @spec insert_packet(t(), ExRTP.Packet.t()) :: {:ok, t()} | {:error, :late_packet} + def insert_packet(store, %{sequence_number: seq_num} = packet) do + do_insert_packet(store, packet, seq_num) + end + + defp do_insert_packet(%__MODULE__{flush_index: nil} = store, packet, 0) do + store = add_record(store, Record.new(packet, @seq_number_limit), :next) + {:ok, %__MODULE__{store | flush_index: @seq_number_limit - 1}} + end + + defp do_insert_packet(%__MODULE__{flush_index: nil} = store, packet, seq_num) do + store = add_record(store, Record.new(packet, seq_num), :current) + {:ok, %__MODULE__{store | flush_index: seq_num - 1}} + end + + defp do_insert_packet( + %__MODULE__{ + flush_index: flush_index, + highest_incoming_index: highest_incoming_index, + rollover_count: roc + } = store, + packet, + seq_num + ) do + highest_seq_num = rem(highest_incoming_index, @seq_number_limit) + + {rollover, index} = + case from_which_rollover(highest_seq_num, seq_num, @seq_number_limit) do + :current -> {:current, seq_num + roc * @seq_number_limit} + :previous -> {:previous, seq_num + (roc - 1) * @seq_number_limit} + :next -> {:next, seq_num + (roc + 1) * @seq_number_limit} + end + + if fresh_packet?(flush_index, index) do + record = Record.new(packet, index) + {:ok, add_record(store, record, rollover)} + else + {:error, :late_packet} + end + end + + @doc """ + Flushes the store to the packet with the next sequence number. + + If this packet is present, it will be returned. + Otherwise it will be treated as late and rejected on attempt to insert into the store. + """ + @spec flush_one(t()) :: {Record.t() | nil, t()} + def flush_one(store) + + def flush_one(%__MODULE__{flush_index: nil} = store) do + {nil, store} + end + + def flush_one(%__MODULE__{flush_index: flush_index, heap: heap, set: set} = store) do + record = Heap.root(heap) + + expected_next_index = flush_index + 1 + + {result, store} = + if record != nil and record.index == expected_next_index do + updated_heap = Heap.pop(heap) + updated_set = MapSet.delete(set, record.index) + + updated_store = %__MODULE__{store | heap: updated_heap, set: updated_set} + + {record, updated_store} + else + # TODO: instead of nil use expected_next_index to notify owner about missing packet + {nil, store} + end + + {result, %__MODULE__{store | flush_index: expected_next_index}} + end + + @doc """ + Flushes the store until the first gap in sequence numbers of records + """ + @spec flush_ordered(t()) :: {[Record.t() | nil], t()} + def flush_ordered(store) do + flush_while(store, fn %__MODULE__{flush_index: flush_index}, %Record{index: index} -> + index == flush_index + 1 + end) + end + + @doc """ + Flushes the store as long as it contains a packet with the timestamp older than provided duration + """ + @spec flush_older_than(t(), non_neg_integer()) :: {[Record.t() | nil], t()} + def flush_older_than(store, max_age_ms) do + max_age_timestamp = System.monotonic_time(:millisecond) - max_age_ms + + flush_while(store, fn _store, %Record{timestamp_ms: timestamp} -> + timestamp <= max_age_timestamp + end) + end + + @doc """ + Returns all packets that are stored in the `PacketStore`. + """ + @spec dump(t()) :: [Record.t() | nil] + def dump(%__MODULE__{} = store) do + {records, _store} = flush_while(store, fn _store, _record -> true end) + records + end + + @doc """ + Returns timestamp (time of insertion) of the packet with the lowest index + """ + @spec first_record_timestamp(t()) :: integer() | nil + def first_record_timestamp(%__MODULE__{heap: heap}) do + case Heap.root(heap) do + %Record{timestamp_ms: time} -> time + nil -> nil + end + end + + @spec from_which_rollover(number() | nil, number(), number()) :: :current | :previous | :next + def from_which_rollover(previous_value, new_value, rollover_length) + + def from_which_rollover(nil, _new, _rollover_length), do: :current + + def from_which_rollover(previous_value, new_value, rollover_length) do + # a) current rollover + distance_if_current = abs(previous_value - new_value) + # b) new_value is from the previous rollover + distance_if_previous = abs(previous_value - (new_value - rollover_length)) + # c) new_value is in the next rollover + distance_if_next = abs(previous_value - (new_value + rollover_length)) + + [ + {:current, distance_if_current}, + {:previous, distance_if_previous}, + {:next, distance_if_next} + ] + |> Enum.min_by(fn {_atom, distance} -> distance end) + |> then(fn {result, _value} -> result end) + end + + defp fresh_packet?(flush_index, index), do: index > flush_index + + defp flush_while(%__MODULE__{heap: heap} = store, fun, acc \\ []) do + heap + |> Heap.root() + |> case do + nil -> + {Enum.reverse(acc), store} + + record -> + if fun.(store, record) do + {record, store} = flush_one(store) + flush_while(store, fun, [record | acc]) + else + {Enum.reverse(acc), store} + end + end + end + + defp add_record(%__MODULE__{heap: heap, set: set} = store, %Record{} = record, record_rollover) do + if set |> MapSet.member?(record.index) do + store + else + %__MODULE__{store | heap: Heap.push(heap, record), set: MapSet.put(set, record.index)} + |> update_highest_incoming_index(record.index) + |> update_roc(record_rollover) + end + end + + defp update_highest_incoming_index( + %__MODULE__{highest_incoming_index: last} = store, + added_index + ) + when added_index > last or last == nil, + do: %__MODULE__{store | highest_incoming_index: added_index} + + defp update_highest_incoming_index( + %__MODULE__{highest_incoming_index: last} = store, + added_index + ) + when last >= added_index, + do: store + + defp update_roc(%{rollover_count: roc} = store, :next), + do: %__MODULE__{store | rollover_count: roc + 1} + + defp update_roc(store, _record_rollover), do: store +end diff --git a/mix.exs b/mix.exs index 35d42ffb..b2c578d7 100644 --- a/mix.exs +++ b/mix.exs @@ -63,6 +63,7 @@ defmodule ExWebRTC.MixProject do {:ex_rtp, "~> 0.4.0"}, {:ex_rtcp, "~> 0.4.0"}, {:crc, "~> 0.10"}, + {:heap, "~> 3.0"}, # dev/test {:excoveralls, "~> 0.18.0", only: [:dev, :test], runtime: false}, diff --git a/mix.lock b/mix.lock index 5bdd05cd..ad648c11 100644 --- a/mix.lock +++ b/mix.lock @@ -23,6 +23,7 @@ "excoveralls": {:hex, :excoveralls, "0.18.2", "86efd87a0676a3198ff50b8c77620ea2f445e7d414afa9ec6c4ba84c9f8bdcc2", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "230262c418f0de64077626a498bd4fdf1126d5c2559bb0e6b43deac3005225a4"}, "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, "finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"}, + "heap": {:hex, :heap, "3.0.0", "c6dbcd6e9a0b021432176e89cfd829dd065bd6c115981fdcd981a4251fff5fde", [:mix], [], "hexpm", "373eaca5787e2a2b009c42338e70414f590ceabcf96cfc786627ed762ad4dfc6"}, "hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, diff --git a/test/ex_webrtc/rtp/jitter_buffer/packet_store_test.exs b/test/ex_webrtc/rtp/jitter_buffer/packet_store_test.exs new file mode 100644 index 00000000..49a901f7 --- /dev/null +++ b/test/ex_webrtc/rtp/jitter_buffer/packet_store_test.exs @@ -0,0 +1,295 @@ +defmodule ExWebRTC.RTP.JitterBuffer.PacketStoreTest do + use ExUnit.Case, async: true + + alias ExWebRTC.RTP.PacketFactory + alias ExWebRTC.RTP.JitterBuffer.PacketStore.Record + alias ExWebRTC.RTP.JitterBuffer.PacketStore + + @seq_number_limit 65_536 + @base_index 65_505 + @next_index @base_index + 1 + + setup_all do + [base_store: new_testing_store(@base_index)] + end + + describe "When adding packet to the PacketStore it" do + test "accepts the first packet" do + packet = PacketFactory.sample_packet(@base_index) + + assert {:ok, updated_store} = PacketStore.insert_packet(%PacketStore{}, packet) + assert has_packet(updated_store, packet) + end + + test "refuses packet with a seq_number smaller than last served", %{base_store: store} do + packet = PacketFactory.sample_packet(@base_index - 1) + + assert {:error, :late_packet} = PacketStore.insert_packet(store, packet) + end + + test "accepts a packet that got in time", %{base_store: store} do + packet = PacketFactory.sample_packet(@next_index) + assert {:ok, updated_store} = PacketStore.insert_packet(store, packet) + assert has_packet(updated_store, packet) + end + + test "puts it to the rollover if a sequence number has rolled over", %{base_store: store} do + packet = PacketFactory.sample_packet(10) + assert {:ok, store} = PacketStore.insert_packet(store, packet) + assert has_packet(store, packet) + end + + test "extracts the RTP metadata correctly from packet", %{base_store: store} do + packet = PacketFactory.sample_packet(@next_index) + {:ok, %PacketStore{heap: heap}} = PacketStore.insert_packet(store, packet) + + assert %Record{index: read_index} = Heap.root(heap) + + assert read_index == @next_index + end + + test "handles first packets starting with sequence_number 0" do + store = %PacketStore{} + packet_a = PacketFactory.sample_packet(0) + assert {:ok, store} = PacketStore.insert_packet(store, packet_a) + + {record_a, store} = PacketStore.flush_one(store) + + assert record_a.index == @seq_number_limit + assert record_a.packet.sequence_number == 0 + + packet_b = PacketFactory.sample_packet(1) + assert {:ok, store} = PacketStore.insert_packet(store, packet_b) + + {record_b, _store} = PacketStore.flush_one(store) + assert record_b.index == @seq_number_limit + 1 + assert record_b.packet.sequence_number == 1 + end + + test "handles packets with very big gaps" do + store = %PacketStore{} + first_packet = PacketFactory.sample_packet(20_072) + assert {:ok, store} = PacketStore.insert_packet(store, first_packet) + + second_packet = PacketFactory.sample_packet(52_840) + assert {:ok, store} = PacketStore.insert_packet(store, second_packet) + + third_packet = PacketFactory.sample_packet(52_841) + assert {:ok, _store} = PacketStore.insert_packet(store, third_packet) + end + + test "handles late packets when starting with sequence_number 0" do + store = %PacketStore{} + packet = PacketFactory.sample_packet(0) + assert {:ok, store} = PacketStore.insert_packet(store, packet) + + packet = PacketFactory.sample_packet(1) + assert {:ok, store} = PacketStore.insert_packet(store, packet) + + packet = PacketFactory.sample_packet(@seq_number_limit - 1) + assert {:error, :late_packet} = PacketStore.insert_packet(store, packet) + end + + test "handles rollover before any packet was sent" do + store = %PacketStore{} + packet = PacketFactory.sample_packet(@seq_number_limit - 1) + assert {:ok, store} = PacketStore.insert_packet(store, packet) + + packet = PacketFactory.sample_packet(0) + assert {:ok, store} = PacketStore.insert_packet(store, packet) + + packet = PacketFactory.sample_packet(1) + assert {:ok, store} = PacketStore.insert_packet(store, packet) + + seq_numbers = + store + |> PacketStore.dump() + |> Enum.map(& &1.packet.sequence_number) + + assert seq_numbers == [65_535, 0, 1] + + indexes = + store + |> PacketStore.dump() + |> Enum.map(& &1.index) + + assert indexes == [@seq_number_limit - 1, @seq_number_limit, @seq_number_limit + 1] + end + + test "handles late packet after rollover" do + store = %PacketStore{} + first_packet = PacketFactory.sample_packet(@seq_number_limit - 1) + assert {:ok, store} = PacketStore.insert_packet(store, first_packet) + + second_packet = PacketFactory.sample_packet(0) + assert {:ok, store} = PacketStore.insert_packet(store, second_packet) + + packet = PacketFactory.sample_packet(1) + assert {:ok, store} = PacketStore.insert_packet(store, packet) + + assert {%Record{packet: ^first_packet}, store} = PacketStore.flush_one(store) + assert {%Record{packet: ^second_packet}, store} = PacketStore.flush_one(store) + + packet = PacketFactory.sample_packet(@seq_number_limit - 2) + assert {:error, :late_packet} = PacketStore.insert_packet(store, packet) + + seq_numbers = + store + |> PacketStore.dump() + |> Enum.map(& &1.packet.sequence_number) + + assert seq_numbers == [1] + end + end + + describe "When getting a packet from PacketStore it" do + setup %{base_store: base_store} do + packet = PacketFactory.sample_packet(@next_index) + {:ok, store} = PacketStore.insert_packet(base_store, packet) + + [ + store: store, + packet: packet + ] + end + + test "returns the root packet and initializes it", %{store: store, packet: packet} do + assert {%Record{} = record, empty_store} = PacketStore.flush_one(store) + assert record.packet == packet + assert empty_store.heap.size == 0 + assert empty_store.flush_index == record.index + end + + test "returns nil when store is empty and bumps flush_index", %{base_store: store} do + assert {nil, new_store} = PacketStore.flush_one(store) + assert new_store.flush_index == store.flush_index + 1 + end + + test "returns nil when heap is not empty, but the next packet is not present", %{ + store: store + } do + broken_store = %PacketStore{store | flush_index: @base_index - 1} + assert {nil, new_store} = PacketStore.flush_one(broken_store) + assert new_store.flush_index == @base_index + end + + test "sorts packets by index number", %{base_store: store} do + test_base = 1..100 + + test_base + |> Enum.into([]) + |> Enum.shuffle() + |> enum_into_store(store) + |> (fn store -> store.heap end).() + |> Enum.zip(test_base) + |> Enum.each(fn {record, base_element} -> + assert %Record{index: index} = record + assert rem(index, 65_536) == base_element + end) + end + + test "handles rollover", %{base_store: base_store} do + store = %PacketStore{base_store | flush_index: 65_533} + before_rollover_seq_nums = 65_534..65_535 + after_rollover_seq_nums = 0..10 + + combined = Enum.into(before_rollover_seq_nums, []) ++ Enum.into(after_rollover_seq_nums, []) + combined_store = enum_into_store(combined, store) + + store = + Enum.reduce(combined, combined_store, fn elem, store -> + {record, store} = PacketStore.flush_one(store) + assert %Record{packet: packet} = record + assert %ExRTP.Packet{sequence_number: seq_number} = packet + assert seq_number == elem + store + end) + + assert store.rollover_count == 1 + end + + test "handles empty rollover", %{base_store: base_store} do + store = %PacketStore{base_store | flush_index: 65_533} + base_data = Enum.into(65_534..65_535, []) + store = enum_into_store(base_data, store) + + Enum.reduce(base_data, store, fn elem, store -> + {record, store} = PacketStore.flush_one(store) + assert %Record{index: ^elem} = record + store + end) + end + + test "handles later rollovers" do + m = @seq_number_limit + + flush_index = 3 * m - 6 + + store = %PacketStore{ + flush_index: flush_index, + highest_incoming_index: flush_index, + rollover_count: 2 + } + + store = + (Enum.into((m - 5)..(m - 1), []) ++ Enum.into(0..4, [])) + |> enum_into_store(store) + + store_content = PacketStore.dump(store) + assert length(store_content) == 10 + end + + test "handles late packets after a rollover" do + indexes = [65_535, 0, 65_534] + + store = + enum_into_store(indexes, %PacketStore{flush_index: 65_533, highest_incoming_index: 65_533}) + + Enum.each(indexes, fn _index -> + assert {%Record{}, _store} = PacketStore.flush_one(store) + end) + end + end + + describe "When dumping it" do + test "returns list that contains packets from heap" do + store = enum_into_store(1..10) + result = PacketStore.dump(store) + assert is_list(result) + assert Enum.count(result) == 10 + end + + test "returns empty list if no records are inside" do + assert PacketStore.dump(%PacketStore{}) == [] + end + end + + defp new_testing_store(index) do + %PacketStore{ + flush_index: index, + highest_incoming_index: index, + heap: Heap.new(&Record.comparator/2) + } + end + + defp enum_into_store(enumerable, store \\ %PacketStore{}) do + Enum.reduce(enumerable, store, fn elem, acc -> + packet = PacketFactory.sample_packet(elem) + {:ok, store} = PacketStore.insert_packet(acc, packet) + store + end) + end + + defp has_packet( + %PacketStore{} = store, + %ExRTP.Packet{sequence_number: seq_num} + ), + do: has_packet_with_seq_number(store, seq_num) + + defp has_packet_with_seq_number(%PacketStore{heap: heap}, index) when is_integer(index) do + heap + |> Enum.to_list() + |> Enum.map(& &1.packet.sequence_number) + |> Enum.member?(index) + end +end diff --git a/test/ex_webrtc/rtp/jitter_buffer/realtime_test.exs b/test/ex_webrtc/rtp/jitter_buffer/realtime_test.exs new file mode 100644 index 00000000..92006dfd --- /dev/null +++ b/test/ex_webrtc/rtp/jitter_buffer/realtime_test.exs @@ -0,0 +1,104 @@ +defmodule ExWebRTC.RTP.JitterBuffer.RealtimeTest do + use ExUnit.Case + + alias ExWebRTC.RTP.{JitterBuffer, PacketFactory} + alias ExRTP.Packet + + @seq_number_limit 65_536 + + defmodule PacketSource do + @moduledoc false + use GenServer + + @seq_number_limit 65_536 + + @impl true + def init(state) do + :ok = JitterBuffer.start_timer(state.buffer) + {:ok, state, {:continue, :after_init}} + end + + @impl true + def handle_continue( + :after_init, + %{ + packet_delay_ms: delay_ms, + packet_num: packet_num, + max_latency: max_latency + } = state + ) do + now = System.monotonic_time(:millisecond) + + 1..packet_num + |> Enum.each(fn n -> + time = + cond do + # Delay less than max latency + rem(n, 15) == 0 -> n * delay_ms + div(max_latency, 2) + # Delay more than max latency + rem(n, 19) == 0 -> n * delay_ms + max_latency * 2 + true -> n * delay_ms + end + + if rem(n, 50) < 30 or rem(n, 50) > 32 do + seq_number = rem(n, @seq_number_limit) + Process.send_after(self(), {:push_packet, seq_number}, now + time, abs: true) + end + end) + + {:noreply, state} + end + + @impl true + def handle_info({:push_packet, n}, %{buffer: buffer} = state) do + :ok = JitterBuffer.place_packet(buffer, PacketFactory.sample_packet(n)) + {:noreply, state} + end + end + + test "Jitter Buffer works in a pipeline with small latency" do + test_pipeline(300, 10, 200) + end + + test "Jitter Buffer works in a pipeline with large latency" do + test_pipeline(100, 30, 1000) + end + + @tag :long_running + @tag timeout: 70_000 * 10 + 20_000 + test "Jitter Buffer works in a long-running pipeline with small latency" do + test_pipeline(70_000, 10, 100) + end + + defp test_pipeline(packets, packet_delay_ms, latency_ms) do + {:ok, buffer} = JitterBuffer.start_link(latency: latency_ms) + + {:ok, _pid} = + GenServer.start_link(PacketSource, %{ + buffer: buffer, + packet_num: packets, + packet_delay_ms: packet_delay_ms, + max_latency: latency_ms + }) + + timeout = latency_ms + packet_delay_ms + 200 + + Enum.each(1..packets, fn n -> + seq_num = rem(n, @seq_number_limit) + + cond do + rem(n, 50) >= 30 and rem(n, 50) <= 32 -> + refute_receive {:jitter_buffer, _pid, {:packet, %Packet{sequence_number: ^seq_num}}}, + timeout + + rem(n, 19) == 0 and rem(n, 15) != 0 -> + refute_receive {:jitter_buffer, _pid, {:packet, %Packet{sequence_number: ^seq_num}}}, + timeout + + true -> + assert_receive {:jitter_buffer, _pid, {:packet, %Packet{sequence_number: ^seq_num}}}, + timeout + end + end) + end +end diff --git a/test/ex_webrtc/rtp/jitter_buffer_test.exs b/test/ex_webrtc/rtp/jitter_buffer_test.exs new file mode 100644 index 00000000..89d619bc --- /dev/null +++ b/test/ex_webrtc/rtp/jitter_buffer_test.exs @@ -0,0 +1,143 @@ +defmodule ExWebRTC.RTP.JitterBufferTest do + use ExUnit.Case, async: true + + alias ExWebRTC.RTP.JitterBuffer.PacketStore.Record + alias ExWebRTC.RTP.JitterBuffer.PacketStore + alias ExWebRTC.RTP.{JitterBuffer, PacketFactory} + + @base_seq_number PacketFactory.base_seq_number() + + setup do + packet = PacketFactory.sample_packet(@base_seq_number) + + {:ok, state} = JitterBuffer.init(controlling_process: self(), latency: 10) + state = %{state | waiting?: false} + + [state: state, packet: packet] + end + + describe "When JitterBuffer is in waiting state" do + setup %{state: state} do + [state: %{state | waiting?: true}] + end + + test "start of stream starts timer that changes state", %{state: state} do + {:noreply, state} = JitterBuffer.handle_cast(:start_timer, state) + assert_receive message, state.latency + 5 + {:noreply, final_state} = JitterBuffer.handle_info(message, state) + assert final_state.waiting? == false + end + + test "any new packet is kept", %{state: state, packet: packet} do + assert PacketStore.dump(state.store) == [] + {:noreply, state} = JitterBuffer.handle_cast({:packet, packet}, state) + + %{store: store} = state + {%Record{packet: ^packet}, new_store} = PacketStore.flush_one(store) + assert PacketStore.dump(new_store) == [] + + refute_receive {:jitter_buffer, _pid, {:packet, ^packet}} + end + end + + describe "When new packet arrives when not waiting and already pushed some packet" do + setup %{state: state} do + flush_index = @base_seq_number - 1 + store = %{state.store | flush_index: flush_index, highest_incoming_index: flush_index} + [state: %{state | waiting?: false, store: store}] + end + + test "outputs it immediately if it is in order", %{state: state, packet: packet} do + {:noreply, state} = JitterBuffer.handle_cast({:packet, packet}, state) + + assert_receive {:jitter_buffer, _pid, {:packet, ^packet}} + + %{store: store} = state + assert PacketStore.dump(store) == [] + end + + test "refuses to add that packet when it comes too late", %{state: state} do + late_packet = PacketFactory.sample_packet(@base_seq_number - 2) + {:noreply, new_state} = JitterBuffer.handle_cast({:packet, late_packet}, state) + assert new_state == state + refute_receive {:jitter_buffer, _pid, {:packet, ^late_packet}} + end + + test "adds it and when it fills the gap, returns all packets in order", %{state: state} do + first_packet = PacketFactory.sample_packet(@base_seq_number) + second_packet = PacketFactory.sample_packet(@base_seq_number + 1) + third_packet = PacketFactory.sample_packet(@base_seq_number + 2) + + flush_index = @base_seq_number - 1 + + store = %PacketStore{ + state.store + | flush_index: flush_index, + highest_incoming_index: flush_index + } + + {:ok, store} = PacketStore.insert_packet(store, second_packet) + {:ok, store} = PacketStore.insert_packet(store, third_packet) + + state = %{state | store: store} + + {:noreply, %{store: result_store}} = + JitterBuffer.handle_cast({:packet, first_packet}, state) + + for packet <- [first_packet, second_packet, third_packet] do + receive do + msg -> + assert {:jitter_buffer, _pid, {:packet, ^packet}} = msg + end + end + + assert PacketStore.dump(result_store) == [] + refute_receive {:jitter_buffer, _pid, {:packet, _packet}} + end + end + + describe "When latency passes without filling the gap, JitterBuffer" do + test "outputs discontinuity and late packet", %{state: state, packet: packet} do + flush_index = @base_seq_number - 2 + + store = %PacketStore{ + state.store + | flush_index: flush_index, + highest_incoming_index: flush_index + } + + state = %{state | store: store, waiting?: false} + + {:noreply, state} = JitterBuffer.handle_cast({:packet, packet}, state) + refute_received {:jitter_buffer, _pid, {:packet, ^packet}} + + assert is_reference(state.max_latency_timer) + + receive do + msg -> + {:noreply, _state} = JitterBuffer.handle_info(msg, state) + end + + assert_receive {:jitter_buffer, _pid, {:packet, ^packet}} + end + end + + describe "When asked to flush, JitterBuffer" do + test "dumps store and resets itself", %{state: state, packet: packet} do + flush_index = @base_seq_number - 2 + + store = %PacketStore{ + state.store + | flush_index: flush_index, + highest_incoming_index: flush_index + } + + {:ok, store} = PacketStore.insert_packet(store, packet) + state = %{state | store: store} + {:noreply, state} = JitterBuffer.handle_cast(:flush, state) + + assert_receive {:jitter_buffer, _pid, {:packet, ^packet}} + assert state.store == %PacketStore{} + end + end +end diff --git a/test/support/packet_factory.ex b/test/support/packet_factory.ex new file mode 100644 index 00000000..69e54cdb --- /dev/null +++ b/test/support/packet_factory.ex @@ -0,0 +1,24 @@ +defmodule ExWebRTC.RTP.PacketFactory do + @moduledoc false + + alias ExRTP.Packet + + @timestamp_increment 30_000 + @base_seq_number 50 + + @spec base_seq_number() :: Packet.uint16() + def base_seq_number(), do: @base_seq_number + + @spec sample_packet(Packet.uint16()) :: Packet.t() + def sample_packet(seq_num) do + seq_num_offset = seq_num - @base_seq_number + + Packet.new( + <<0, 255>>, + payload_type: 127, + ssrc: 0xDEADCAFE, + timestamp: seq_num_offset * @timestamp_increment, + sequence_number: seq_num + ) + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index fb40ccc4..d40f0fff 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1 +1 @@ -ExUnit.start(capture_log: true, assert_receive_timeout: 2000) +ExUnit.start(capture_log: true, exclude: [:long_running], assert_receive_timeout: 2000)