Skip to content

Commit

Permalink
[RTC-310] Prometheus metrics for Jellyfish (#62)
Browse files Browse the repository at this point in the history
* [RTC-310] Prometheus metrics for Jellyfish
* Review fixes
* Update credo config
  • Loading branch information
sgfn authored Aug 2, 2023
1 parent 1de16a8 commit 1ed3220
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .credo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
# set this value to 0 (zero).
#
{Credo.Check.Design.TagTODO, [exit_status: 0]},
{Credo.Check.Design.TagFIXME, []},
{Credo.Check.Design.TagFIXME, [exit_status: 0]},

#
## Readability Checks
Expand Down
6 changes: 3 additions & 3 deletions lib/jellyfish/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ defmodule Jellyfish.Application do

children = [
{Phoenix.PubSub, name: Jellyfish.PubSub},
# Start the Telemetry supervisor
JellyfishWeb.Telemetry,
{Membrane.TelemetryMetrics.Reporter,
[metrics: Membrane.RTC.Engine.Metrics.metrics(), name: JellyfishMetricsReporter]},
{Jellyfish.MetricsScraper, scrape_interval},
JellyfishWeb.Endpoint,
# Start the RoomService
Jellyfish.RoomService,
{Registry, keys: :unique, name: Jellyfish.RoomRegistry}
{Registry, keys: :unique, name: Jellyfish.RoomRegistry},
# Start the Telemetry supervisor (must be started after Jellyfish.RoomRegistry)
JellyfishWeb.Telemetry
]

children =
Expand Down
86 changes: 52 additions & 34 deletions lib/jellyfish_web/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule JellyfishWeb.Telemetry do

use Supervisor
import Telemetry.Metrics
alias JellyfishWeb.Telemetry.MetricsAggregator

def start_link(arg) do
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
Expand All @@ -11,61 +12,78 @@ defmodule JellyfishWeb.Telemetry do
@impl true
def init(_arg) do
children = [
# Telemetry poller will execute the given period measurements
# every 10_000ms. Learn more here: https://hexdocs.pm/telemetry_metrics
{:telemetry_poller, measurements: periodic_measurements(), period: 10_000}
# Add reporters as children of your supervision tree.
# {Telemetry.Metrics.ConsoleReporter, metrics: metrics()}
MetricsAggregator,
{TelemetryMetricsPrometheus, metrics: metrics(&last_value/2)}
]

Supervisor.init(children, strategy: :one_for_one)
end

def metrics do
# Phoenix by default uses the `summary` metric type in `LiveDashboard`,
# but `TelemetryMetricsPrometheus` doesn't support it, so we have to use `last_value` instead.
#
# The metrics, events and measurements are named according to the Prometheus guidelines.
# For more information, refer to these links:
# - https://prometheus.io/docs/practices/naming/
# - https://hexdocs.pm/telemetry_metrics_prometheus_core/1.0.0/TelemetryMetricsPrometheus.Core.html#module-naming
def metrics(metric_type \\ &summary/2) do
[
# Phoenix Metrics
summary("phoenix.endpoint.start.system_time",
unit: {:native, :millisecond}
metric_type.("phoenix.endpoint.start.system_time.seconds",
event_name: [:phoenix, :endpoint, :start],
measurement: :system_time,
unit: {:native, :second}
),
summary("phoenix.endpoint.stop.duration",
unit: {:native, :millisecond}
metric_type.("phoenix.endpoint.stop.duration.seconds",
event_name: [:phoenix, :endpoint, :stop],
measurement: :duration,
unit: {:native, :second}
),
summary("phoenix.router_dispatch.start.system_time",
metric_type.("phoenix.router_dispatch.start.system_time.seconds",
event_name: [:phoenix, :router_dispatch, :start],
measurement: :system_time,
tags: [:route],
unit: {:native, :millisecond}
unit: {:native, :second}
),
summary("phoenix.router_dispatch.exception.duration",
metric_type.("phoenix.router_dispatch.exception.duration.seconds",
event_name: [:phoenix, :router_dispatch, :exception],
measurement: :duration,
tags: [:route],
unit: {:native, :millisecond}
unit: {:native, :second}
),
summary("phoenix.router_dispatch.stop.duration",
metric_type.("phoenix.router_dispatch.stop.duration.seconds",
event_name: [:phoenix, :router_dispatch, :stop],
measurement: :duration,
tags: [:route],
unit: {:native, :millisecond}
unit: {:native, :second}
),
summary("phoenix.socket_connected.duration",
unit: {:native, :millisecond}
metric_type.("phoenix.socket_connected.duration.seconds",
event_name: [:phoenix, :socket_connected],
measurement: :duration,
unit: {:native, :second}
),
summary("phoenix.channel_join.duration",
unit: {:native, :millisecond}
metric_type.("phoenix.channel_join.duration.seconds",
event_name: [:phoenix, :channel_join],
measurement: :duration,
unit: {:native, :second}
),
summary("phoenix.channel_handled_in.duration",
metric_type.("phoenix.channel_handled_in.duration.seconds",
event_name: [:phoenix, :channel_handled_in],
measurement: :duration,
tags: [:event],
unit: {:native, :millisecond}
unit: {:native, :second}
),

# VM Metrics
summary("vm.memory.total", unit: {:byte, :kilobyte}),
summary("vm.total_run_queue_lengths.total"),
summary("vm.total_run_queue_lengths.cpu"),
summary("vm.total_run_queue_lengths.io")
]
end

defp periodic_measurements do
[
# A module, function and arguments to be invoked periodically.
# This function must call :telemetry.execute/3 and a metric must be added above.
# {JellyfishWeb, :count_users, []}
metric_type.("vm.memory.total.bytes",
event_name: [:vm, :memory],
measurement: :total
),
metric_type.("vm.total_run_queue_lengths.total", []),
metric_type.("vm.total_run_queue_lengths.cpu", []),
metric_type.("vm.total_run_queue_lengths.io", [])
]
# Jellyfish Metrics
|> Enum.concat(MetricsAggregator.metrics())
end
end
127 changes: 127 additions & 0 deletions lib/jellyfish_web/telemetry/metrics_aggregator.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
defmodule JellyfishWeb.Telemetry.MetricsAggregator do
@moduledoc false

use GenServer

# in seconds
@metric_forwarding_interval 10

@ice_received_event [Membrane.ICE, :ice, :payload, :received]
@ice_sent_event [Membrane.ICE, :ice, :payload, :sent]

@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(opts),
do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)

@spec handle_event(
:telemetry.event_name(),
:telemetry.event_measurements(),
:telemetry.event_metadata(),
%{ets_table: :ets.table()}
) :: :ok | {:error, atom()}
def handle_event(name, measurements, metadata, config)

def handle_event(@ice_received_event, %{bytes: bytes}, _metadata, %{ets_table: ets_table}) do
:ets.update_counter(ets_table, :ingress_delta, bytes, {:ingress_delta, 0})
:ok
end

def handle_event(@ice_sent_event, %{bytes: bytes}, _metadata, %{ets_table: ets_table}) do
:ets.update_counter(ets_table, :egress_delta, bytes, {:egress_delta, 0})
:ok
end

def handle_event(_name, _measurements, _metadata, _config), do: {:error, :unknown_event}

@spec metrics() :: [Telemetry.Metrics.t()]
def metrics() do
import Telemetry.Metrics

[
# FIXME: The traffic metrics work only with ICE events (emitted only by WebRTC components)
# which means they don't count the traffic from/to RTSP and HLS components
sum("jellyfish.traffic.ingress.total.bytes",
event_name: [:jellyfish],
measurement: :traffic_ingress_total
),
last_value("jellyfish.traffic.ingress.throughput.bytes_per_second",
event_name: [:jellyfish],
measurement: :traffic_ingress_throughput
),
sum("jellyfish.traffic.egress.total.bytes",
event_name: [:jellyfish],
measurement: :traffic_egress_total
),
last_value("jellyfish.traffic.egress.throughput.bytes_per_second",
event_name: [:jellyfish],
measurement: :traffic_egress_throughput
),
last_value("jellyfish.rooms"),

# FIXME: Prometheus warns about using labels to store dimensions with high cardinality,
# such as UUIDs. For more information refer here: https://prometheus.io/docs/practices/naming/#labels
last_value("jellyfish.room.peers",
tags: [:room_id]
),
sum("jellyfish.room.peer_time.total.seconds",
event_name: [:jellyfish, :room],
measurement: :peer_time_total,
tags: [:room_id]
)
]
end

@impl true
def init(_args) do
ets_table = :ets.new(:measurements, [:public, :set, {:write_concurrency, true}])

:telemetry.attach_many(
__MODULE__,
[@ice_received_event, @ice_sent_event],
&__MODULE__.handle_event/4,
%{ets_table: ets_table}
)

Process.send_after(self(), :forward_metrics, @metric_forwarding_interval * 1000)

{:ok, %{ets_table: ets_table}}
end

@impl true
def handle_info(:forward_metrics, %{ets_table: ets_table} = state) do
rooms = Jellyfish.RoomService.list_rooms()

[ingress_delta, egress_delta] =
:ets.tab2list(ets_table)
|> Enum.flat_map(fn {key, _val} -> :ets.take(ets_table, key) end)
|> then(fn kwl -> Enum.map([:ingress_delta, :egress_delta], &Keyword.get(kwl, &1, 0)) end)

:telemetry.execute(
[:jellyfish],
%{
traffic_ingress_total: ingress_delta,
traffic_ingress_throughput: div(ingress_delta, @metric_forwarding_interval),
traffic_egress_total: egress_delta,
traffic_egress_throughput: div(egress_delta, @metric_forwarding_interval),
rooms: Enum.count(rooms)
}
)

for room <- rooms do
peer_count = room.peers |> Map.keys() |> Enum.count()

:telemetry.execute(
[:jellyfish, :room],
%{
peers: peer_count,
peer_time_total: peer_count * @metric_forwarding_interval
},
%{room_id: room.id}
)
end

Process.send_after(self(), :forward_metrics, @metric_forwarding_interval * 1000)

{:noreply, state}
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ defmodule Jellyfish.MixProject do
{:phoenix, "~> 1.7.1"},
{:phoenix_live_dashboard, "~> 0.7.2"},
{:telemetry_metrics, "~> 0.6"},
{:telemetry_metrics_prometheus, "~> 1.1"},
{:telemetry_poller, "~> 1.0"},
{:jason, "~> 1.2"},
{:plug_cowboy, "~> 2.5"},
Expand Down
2 changes: 2 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@
"statistics": {:hex, :statistics, "0.6.2", "213dcedc2b3ae7fb775b5510ea9630c66d3c0019ea2f86d5096559853623a60d", [:mix], [], "hexpm", "329f1008dc4ad24430d94c04b52ff09d5fb435ab11f34360831f11eb0c391c17"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"telemetry_metrics": {:hex, :telemetry_metrics, "0.6.1", "315d9163a1d4660aedc3fee73f33f1d355dcc76c5c3ab3d59e76e3edf80eef1f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7be9e0871c41732c233be71e4be11b96e56177bf15dde64a8ac9ce72ac9834c6"},
"telemetry_metrics_prometheus": {:hex, :telemetry_metrics_prometheus, "1.1.0", "1cc23e932c1ef9aa3b91db257ead31ea58d53229d407e059b29bb962c1505a13", [:mix], [{:plug_cowboy, "~> 2.1", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.0", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: false]}], "hexpm", "d43b3659b3244da44fe0275b717701542365d4519b79d9ce895b9719c1ce4d26"},
"telemetry_metrics_prometheus_core": {:hex, :telemetry_metrics_prometheus_core, "1.1.0", "4e15f6d7dbedb3a4e3aed2262b7e1407f166fcb9c30ca3f96635dfbbef99965c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "0dd10e7fe8070095df063798f82709b0a1224c31b8baf6278b423898d591a069"},
"telemetry_poller": {:hex, :telemetry_poller, "1.0.0", "db91bb424e07f2bb6e73926fcafbfcbcb295f0193e0a00e825e589a0a47e8453", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "b3a24eafd66c3f42da30fc3ca7dda1e9d546c12250a2d60d7b81d264fbec4f6e"},
"toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
Expand Down

0 comments on commit 1ed3220

Please sign in to comment.