Skip to content

Commit

Permalink
RTC-459 Fix metrics (#148)
Browse files Browse the repository at this point in the history
* Fix Jellyfish Metrics

* Fix lint issue

* Remove rooms in hls component tests

* Fixes after review

* Review fixes

* Remove redundant code
  • Loading branch information
Rados13 authored Feb 8, 2024
1 parent 36e1d00 commit f452cd5
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 11 deletions.
5 changes: 3 additions & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ config :jellyfish, JellyfishWeb.Endpoint,
pubsub_server: Jellyfish.PubSub,
live_view: [signing_salt: "/Lo03qJT"]

config :jellyfish, metrics_scrape_interval: 1000
config :jellyfish,
webrtc_metrics_scrape_interval: 1000,
room_metrics_scrape_interval: 10

config :membrane_telemetry_metrics, enabled: true
config :membrane_opentelemetry, enabled: true

config :logger, :console,
format: "$time $metadata[$level] $message\n",
Expand Down
1 change: 0 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ alias Jellyfish.ConfigReader
# any compile-time configuration in here, as it won't be applied.
# The block below contains prod specific runtime configuration.
config :ex_dtls, impl: :nif
config :opentelemetry, traces_exporter: :none

prod? = config_env() == :prod

Expand Down
3 changes: 2 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ config :jellyfish,
ip: {127, 0, 0, 1},
port: 4002,
server_api_token: "development",
metrics_scrape_interval: 50
webrtc_metrics_scrape_interval: 50,
room_metrics_scrape_interval: 1

config :jellyfish, JellyfishWeb.Endpoint,
http: [ip: {127, 0, 0, 1}, port: 4002],
Expand Down
2 changes: 1 addition & 1 deletion lib/jellyfish/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Jellyfish.Application do

@impl true
def start(_type, _args) do
scrape_interval = Application.fetch_env!(:jellyfish, :metrics_scrape_interval)
scrape_interval = Application.fetch_env!(:jellyfish, :webrtc_metrics_scrape_interval)
dist_config = Application.fetch_env!(:jellyfish, :dist_config)
webrtc_config = Application.fetch_env!(:jellyfish, :webrtc_config)

Expand Down
12 changes: 11 additions & 1 deletion lib/jellyfish/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,17 @@ defmodule Jellyfish.Room do
end

@impl true
def handle_info(%EndpointAdded{endpoint_id: endpoint_id}, state)
def handle_info(%EndpointRemoved{endpoint_id: endpoint_id}, state) do
{_endpoint, state} = pop_in(state, [:peers, endpoint_id])
Logger.info("Peer #{endpoint_id} removed")
{:noreply, state}
end

@impl true
def handle_info(
%EndpointAdded{endpoint_id: endpoint_id},
state
)
when endpoint_exists?(state, endpoint_id) do
{:noreply, state}
end
Expand Down
25 changes: 20 additions & 5 deletions lib/jellyfish/room_service.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ defmodule Jellyfish.RoomService do

alias Jellyfish.{Event, Room, WebhookNotifier}

# in seconds
@metric_interval 10
@metric_interval_in_seconds Application.compile_env!(:jellyfish, :room_metrics_scrape_interval)
@metric_interval_in_milliseconds @metric_interval_in_seconds * 1_000

def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
Expand Down Expand Up @@ -127,7 +127,7 @@ defmodule Jellyfish.RoomService do

@impl true
def handle_continue(_continue_arg, state) do
Process.send_after(self(), :rooms_metrics, @metric_interval)
Process.send_after(self(), :rooms_metrics, @metric_interval_in_milliseconds)
:ok = Phoenix.PubSub.subscribe(Jellyfish.PubSub, "jellyfishes")
{:noreply, state}
end
Expand Down Expand Up @@ -186,13 +186,13 @@ defmodule Jellyfish.RoomService do
[:jellyfish, :room],
%{
peers: peer_count,
peer_time_total: peer_count * @metric_interval
peer_time_total: peer_count * @metric_interval_in_seconds
},
%{room_id: room.id}
)
end

Process.send_after(self(), :rooms_metrics, @metric_interval * 1000)
Process.send_after(self(), :rooms_metrics, @metric_interval_in_milliseconds)

{:noreply, state}
end
Expand All @@ -203,6 +203,9 @@ defmodule Jellyfish.RoomService do

Logger.debug("Room #{room_id} is down with reason: normal")

Phoenix.PubSub.broadcast(Jellyfish.PubSub, room_id, :room_stopped)
clear_room_metrics(room_id)

{:noreply, state}
end

Expand All @@ -214,10 +217,22 @@ defmodule Jellyfish.RoomService do

Phoenix.PubSub.broadcast(Jellyfish.PubSub, room_id, :room_crashed)
Event.broadcast_server_notification({:room_crashed, room_id})
clear_room_metrics(room_id)

{:noreply, state}
end

defp clear_room_metrics(room_id) do
:telemetry.execute(
[:jellyfish, :room],
%{
peers: 0,
peer_time_total: 0
},
%{room_id: room_id}
)
end

defp find_best_node(node_resources) do
%{node: min_node} =
Enum.min(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ defmodule JellyfishWeb.Component.HlsComponentTest do

assert model_response(conn, :bad_request, "Error")["errors"] ==
"Incompatible video codec enforced in room #{room_id}"

RoomService.delete_room(room_id)
end

test "renders errors when video codec is different than h264 - nil", %{conn: conn} do
Expand All @@ -200,6 +202,8 @@ defmodule JellyfishWeb.Component.HlsComponentTest do

assert model_response(conn, :bad_request, "Error")["errors"] ==
"Incompatible video codec enforced in room #{room_id}"

RoomService.delete_room(room_id)
end
end

Expand All @@ -209,6 +213,10 @@ defmodule JellyfishWeb.Component.HlsComponentTest do
assert %{"id" => room_id} =
model_response(conn, :created, "RoomCreateDetailsResponse")["data"]["room"]

on_exit(fn ->
RoomService.delete_room(room_id)
end)

%{room_id: room_id}
end

Expand Down
52 changes: 52 additions & 0 deletions test/jellyfish_web/integration/peer_socket_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,63 @@ defmodule JellyfishWeb.Integration.PeerSocketTest do
assert_receive {:disconnected, {:remote, 1000, "Room stopped"}}, 1000
end

test "proper calculated peer metrics", %{room_id: room_id, token: token, conn: conn} do
assert %{} = get_peers_room_metrics()
create_and_authenticate(token)

peers_in_room_key = "jellyfish_room_peers{room_id=\"#{room_id}\"}"
peers_in_room_time_key = "jellyfish_room_peer_time_total_seconds{room_id=\"#{room_id}\"}"

metrics_after_one_tick = %{
peers_in_room_key => "1",
peers_in_room_time_key => "1",
"jellyfish_rooms" => "1"
}

Process.sleep(1_000)

assert ^metrics_after_one_tick =
Map.intersect(metrics_after_one_tick, get_peers_room_metrics())

conn = delete(conn, ~p"/room/#{room_id}/")
response(conn, :no_content)

Process.sleep(1_000)

metrics_after_removal = %{
peers_in_room_key => "0",
peers_in_room_time_key => "1",
"jellyfish_rooms" => "0"
}

assert ^metrics_after_removal = Map.intersect(metrics_after_removal, get_peers_room_metrics())
end

def create_and_authenticate(token) do
{:ok, ws} = WS.start_link(@path, :peer)
WS.send_auth_request(ws, token)
assert_receive @auth_response, 1000

ws
end

defp get_peers_room_metrics() do
"http://localhost:9568/metrics"
|> HTTPoison.get!()
|> Map.get(:body)
|> String.split("\n")
|> Enum.reject(&(String.starts_with?(&1, "# HELP") or String.starts_with?(&1, "# TYPE")))
|> Enum.reduce(%{}, fn elem, acc ->
if elem == "" do
acc
else
[key, value | _] = String.split(elem, " ")
Map.put(acc, key, value)
end
end)
|> Enum.filter(fn {key, _value} ->
String.starts_with?(key, "jellyfish_room")
end)
|> Map.new()
end
end

0 comments on commit f452cd5

Please sign in to comment.