diff --git a/.credo.exs b/.credo.exs index cb59fab2..f9cc8d32 100644 --- a/.credo.exs +++ b/.credo.exs @@ -92,7 +92,7 @@ # set this value to 0 (zero). # {Credo.Check.Design.TagTODO, [exit_status: 0]}, - {Credo.Check.Design.TagFIXME, []}, + {Credo.Check.Design.TagFIXME, [exit_status: 0]}, # ## Readability Checks diff --git a/lib/jellyfish/application.ex b/lib/jellyfish/application.ex index 5d65829b..756f69fb 100644 --- a/lib/jellyfish/application.ex +++ b/lib/jellyfish/application.ex @@ -13,15 +13,15 @@ defmodule Jellyfish.Application do children = [ {Phoenix.PubSub, name: Jellyfish.PubSub}, - # Start the Telemetry supervisor - JellyfishWeb.Telemetry, {Membrane.TelemetryMetrics.Reporter, [metrics: Membrane.RTC.Engine.Metrics.metrics(), name: JellyfishMetricsReporter]}, {Jellyfish.MetricsScraper, scrape_interval}, JellyfishWeb.Endpoint, # Start the RoomService Jellyfish.RoomService, - {Registry, keys: :unique, name: Jellyfish.RoomRegistry} + {Registry, keys: :unique, name: Jellyfish.RoomRegistry}, + # Start the Telemetry supervisor (must be started after Jellyfish.RoomRegistry) + JellyfishWeb.Telemetry ] children = diff --git a/lib/jellyfish_web/telemetry.ex b/lib/jellyfish_web/telemetry.ex index 2efa51e3..4587fbbb 100644 --- a/lib/jellyfish_web/telemetry.ex +++ b/lib/jellyfish_web/telemetry.ex @@ -3,6 +3,7 @@ defmodule JellyfishWeb.Telemetry do use Supervisor import Telemetry.Metrics + alias JellyfishWeb.Telemetry.MetricsAggregator def start_link(arg) do Supervisor.start_link(__MODULE__, arg, name: __MODULE__) @@ -11,61 +12,78 @@ defmodule JellyfishWeb.Telemetry do @impl true def init(_arg) do children = [ - # Telemetry poller will execute the given period measurements - # every 10_000ms. Learn more here: https://hexdocs.pm/telemetry_metrics - {:telemetry_poller, measurements: periodic_measurements(), period: 10_000} - # Add reporters as children of your supervision tree. - # {Telemetry.Metrics.ConsoleReporter, metrics: metrics()} + MetricsAggregator, + {TelemetryMetricsPrometheus, metrics: metrics(&last_value/2)} ] Supervisor.init(children, strategy: :one_for_one) end - def metrics do + # Phoenix by default uses the `summary` metric type in `LiveDashboard`, + # but `TelemetryMetricsPrometheus` doesn't support it, so we have to use `last_value` instead. + # + # The metrics, events and measurements are named according to the Prometheus guidelines. + # For more information, refer to these links: + # - https://prometheus.io/docs/practices/naming/ + # - https://hexdocs.pm/telemetry_metrics_prometheus_core/1.0.0/TelemetryMetricsPrometheus.Core.html#module-naming + def metrics(metric_type \\ &summary/2) do [ # Phoenix Metrics - summary("phoenix.endpoint.start.system_time", - unit: {:native, :millisecond} + metric_type.("phoenix.endpoint.start.system_time.seconds", + event_name: [:phoenix, :endpoint, :start], + measurement: :system_time, + unit: {:native, :second} ), - summary("phoenix.endpoint.stop.duration", - unit: {:native, :millisecond} + metric_type.("phoenix.endpoint.stop.duration.seconds", + event_name: [:phoenix, :endpoint, :stop], + measurement: :duration, + unit: {:native, :second} ), - summary("phoenix.router_dispatch.start.system_time", + metric_type.("phoenix.router_dispatch.start.system_time.seconds", + event_name: [:phoenix, :router_dispatch, :start], + measurement: :system_time, tags: [:route], - unit: {:native, :millisecond} + unit: {:native, :second} ), - summary("phoenix.router_dispatch.exception.duration", + metric_type.("phoenix.router_dispatch.exception.duration.seconds", + event_name: [:phoenix, :router_dispatch, :exception], + measurement: :duration, tags: [:route], - unit: {:native, :millisecond} + unit: {:native, :second} ), - summary("phoenix.router_dispatch.stop.duration", + metric_type.("phoenix.router_dispatch.stop.duration.seconds", + event_name: [:phoenix, :router_dispatch, :stop], + measurement: :duration, tags: [:route], - unit: {:native, :millisecond} + unit: {:native, :second} ), - summary("phoenix.socket_connected.duration", - unit: {:native, :millisecond} + metric_type.("phoenix.socket_connected.duration.seconds", + event_name: [:phoenix, :socket_connected], + measurement: :duration, + unit: {:native, :second} ), - summary("phoenix.channel_join.duration", - unit: {:native, :millisecond} + metric_type.("phoenix.channel_join.duration.seconds", + event_name: [:phoenix, :channel_join], + measurement: :duration, + unit: {:native, :second} ), - summary("phoenix.channel_handled_in.duration", + metric_type.("phoenix.channel_handled_in.duration.seconds", + event_name: [:phoenix, :channel_handled_in], + measurement: :duration, tags: [:event], - unit: {:native, :millisecond} + unit: {:native, :second} ), # VM Metrics - summary("vm.memory.total", unit: {:byte, :kilobyte}), - summary("vm.total_run_queue_lengths.total"), - summary("vm.total_run_queue_lengths.cpu"), - summary("vm.total_run_queue_lengths.io") - ] - end - - defp periodic_measurements do - [ - # A module, function and arguments to be invoked periodically. - # This function must call :telemetry.execute/3 and a metric must be added above. - # {JellyfishWeb, :count_users, []} + metric_type.("vm.memory.total.bytes", + event_name: [:vm, :memory], + measurement: :total + ), + metric_type.("vm.total_run_queue_lengths.total", []), + metric_type.("vm.total_run_queue_lengths.cpu", []), + metric_type.("vm.total_run_queue_lengths.io", []) ] + # Jellyfish Metrics + |> Enum.concat(MetricsAggregator.metrics()) end end diff --git a/lib/jellyfish_web/telemetry/metrics_aggregator.ex b/lib/jellyfish_web/telemetry/metrics_aggregator.ex new file mode 100644 index 00000000..c6f893b8 --- /dev/null +++ b/lib/jellyfish_web/telemetry/metrics_aggregator.ex @@ -0,0 +1,127 @@ +defmodule JellyfishWeb.Telemetry.MetricsAggregator do + @moduledoc false + + use GenServer + + # in seconds + @metric_forwarding_interval 10 + + @ice_received_event [Membrane.ICE, :ice, :payload, :received] + @ice_sent_event [Membrane.ICE, :ice, :payload, :sent] + + @spec start_link(Keyword.t()) :: GenServer.on_start() + def start_link(opts), + do: GenServer.start_link(__MODULE__, opts, name: __MODULE__) + + @spec handle_event( + :telemetry.event_name(), + :telemetry.event_measurements(), + :telemetry.event_metadata(), + %{ets_table: :ets.table()} + ) :: :ok | {:error, atom()} + def handle_event(name, measurements, metadata, config) + + def handle_event(@ice_received_event, %{bytes: bytes}, _metadata, %{ets_table: ets_table}) do + :ets.update_counter(ets_table, :ingress_delta, bytes, {:ingress_delta, 0}) + :ok + end + + def handle_event(@ice_sent_event, %{bytes: bytes}, _metadata, %{ets_table: ets_table}) do + :ets.update_counter(ets_table, :egress_delta, bytes, {:egress_delta, 0}) + :ok + end + + def handle_event(_name, _measurements, _metadata, _config), do: {:error, :unknown_event} + + @spec metrics() :: [Telemetry.Metrics.t()] + def metrics() do + import Telemetry.Metrics + + [ + # FIXME: The traffic metrics work only with ICE events (emitted only by WebRTC components) + # which means they don't count the traffic from/to RTSP and HLS components + sum("jellyfish.traffic.ingress.total.bytes", + event_name: [:jellyfish], + measurement: :traffic_ingress_total + ), + last_value("jellyfish.traffic.ingress.throughput.bytes_per_second", + event_name: [:jellyfish], + measurement: :traffic_ingress_throughput + ), + sum("jellyfish.traffic.egress.total.bytes", + event_name: [:jellyfish], + measurement: :traffic_egress_total + ), + last_value("jellyfish.traffic.egress.throughput.bytes_per_second", + event_name: [:jellyfish], + measurement: :traffic_egress_throughput + ), + last_value("jellyfish.rooms"), + + # FIXME: Prometheus warns about using labels to store dimensions with high cardinality, + # such as UUIDs. For more information refer here: https://prometheus.io/docs/practices/naming/#labels + last_value("jellyfish.room.peers", + tags: [:room_id] + ), + sum("jellyfish.room.peer_time.total.seconds", + event_name: [:jellyfish, :room], + measurement: :peer_time_total, + tags: [:room_id] + ) + ] + end + + @impl true + def init(_args) do + ets_table = :ets.new(:measurements, [:public, :set, {:write_concurrency, true}]) + + :telemetry.attach_many( + __MODULE__, + [@ice_received_event, @ice_sent_event], + &__MODULE__.handle_event/4, + %{ets_table: ets_table} + ) + + Process.send_after(self(), :forward_metrics, @metric_forwarding_interval * 1000) + + {:ok, %{ets_table: ets_table}} + end + + @impl true + def handle_info(:forward_metrics, %{ets_table: ets_table} = state) do + rooms = Jellyfish.RoomService.list_rooms() + + [ingress_delta, egress_delta] = + :ets.tab2list(ets_table) + |> Enum.flat_map(fn {key, _val} -> :ets.take(ets_table, key) end) + |> then(fn kwl -> Enum.map([:ingress_delta, :egress_delta], &Keyword.get(kwl, &1, 0)) end) + + :telemetry.execute( + [:jellyfish], + %{ + traffic_ingress_total: ingress_delta, + traffic_ingress_throughput: div(ingress_delta, @metric_forwarding_interval), + traffic_egress_total: egress_delta, + traffic_egress_throughput: div(egress_delta, @metric_forwarding_interval), + rooms: Enum.count(rooms) + } + ) + + for room <- rooms do + peer_count = room.peers |> Map.keys() |> Enum.count() + + :telemetry.execute( + [:jellyfish, :room], + %{ + peers: peer_count, + peer_time_total: peer_count * @metric_forwarding_interval + }, + %{room_id: room.id} + ) + end + + Process.send_after(self(), :forward_metrics, @metric_forwarding_interval * 1000) + + {:noreply, state} + end +end diff --git a/mix.exs b/mix.exs index b70ec9ca..71db1c98 100644 --- a/mix.exs +++ b/mix.exs @@ -49,6 +49,7 @@ defmodule Jellyfish.MixProject do {:phoenix, "~> 1.7.1"}, {:phoenix_live_dashboard, "~> 0.7.2"}, {:telemetry_metrics, "~> 0.6"}, + {:telemetry_metrics_prometheus, "~> 1.1"}, {:telemetry_poller, "~> 1.0"}, {:jason, "~> 1.2"}, {:plug_cowboy, "~> 2.5"}, diff --git a/mix.lock b/mix.lock index 7656961d..ab5a0daa 100644 --- a/mix.lock +++ b/mix.lock @@ -100,6 +100,8 @@ "statistics": {:hex, :statistics, "0.6.2", "213dcedc2b3ae7fb775b5510ea9630c66d3c0019ea2f86d5096559853623a60d", [:mix], [], "hexpm", "329f1008dc4ad24430d94c04b52ff09d5fb435ab11f34360831f11eb0c391c17"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, "telemetry_metrics": {:hex, :telemetry_metrics, "0.6.1", "315d9163a1d4660aedc3fee73f33f1d355dcc76c5c3ab3d59e76e3edf80eef1f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7be9e0871c41732c233be71e4be11b96e56177bf15dde64a8ac9ce72ac9834c6"}, + "telemetry_metrics_prometheus": {:hex, :telemetry_metrics_prometheus, "1.1.0", "1cc23e932c1ef9aa3b91db257ead31ea58d53229d407e059b29bb962c1505a13", [:mix], [{:plug_cowboy, "~> 2.1", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.0", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: false]}], "hexpm", "d43b3659b3244da44fe0275b717701542365d4519b79d9ce895b9719c1ce4d26"}, + "telemetry_metrics_prometheus_core": {:hex, :telemetry_metrics_prometheus_core, "1.1.0", "4e15f6d7dbedb3a4e3aed2262b7e1407f166fcb9c30ca3f96635dfbbef99965c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "0dd10e7fe8070095df063798f82709b0a1224c31b8baf6278b423898d591a069"}, "telemetry_poller": {:hex, :telemetry_poller, "1.0.0", "db91bb424e07f2bb6e73926fcafbfcbcb295f0193e0a00e825e589a0a47e8453", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "b3a24eafd66c3f42da30fc3ca7dda1e9d546c12250a2d60d7b81d264fbec4f6e"}, "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},