diff --git a/lib/jellyfish/event.ex b/lib/jellyfish/event.ex index ffed71f6..918fd843 100644 --- a/lib/jellyfish/event.ex +++ b/lib/jellyfish/event.ex @@ -7,8 +7,10 @@ defmodule Jellyfish.Event do HlsUploadCrashed, HlsUploaded, MetricsReport, + PeerAdded, PeerConnected, PeerCrashed, + PeerDeleted, PeerDisconnected, PeerMetadataUpdated, RoomCrashed, @@ -54,6 +56,12 @@ defmodule Jellyfish.Event do defp to_proto_server_notification({:room_crashed, room_id}), do: {:room_crashed, %RoomCrashed{room_id: room_id}} + defp to_proto_server_notification({:peer_added, room_id, peer_id}), + do: {:peer_added, %PeerAdded{room_id: room_id, peer_id: peer_id}} + + defp to_proto_server_notification({:peer_deleted, room_id, peer_id}), + do: {:peer_deleted, %PeerDeleted{room_id: room_id, peer_id: peer_id}} + defp to_proto_server_notification({:peer_connected, room_id, peer_id}), do: {:peer_connected, %PeerConnected{room_id: room_id, peer_id: peer_id}} diff --git a/lib/jellyfish/room.ex b/lib/jellyfish/room.ex index fa25f80b..47be9ebf 100644 --- a/lib/jellyfish/room.ex +++ b/lib/jellyfish/room.ex @@ -145,9 +145,7 @@ defmodule Jellyfish.Room do with false <- State.reached_peers_limit?(state), options <- State.generate_peer_options(state, override_options), {:ok, peer} <- Peer.new(peer_type, options) do - state = State.put_peer(state, peer) - - Logger.info("Added peer #{inspect(peer.id)}") + state = State.add_peer(state, peer) {:reply, {:ok, peer}, state} else diff --git a/lib/jellyfish/room/state.ex b/lib/jellyfish/room/state.ex index 1a6c96a4..1f239cf6 100644 --- a/lib/jellyfish/room/state.ex +++ b/lib/jellyfish/room/state.ex @@ -212,6 +212,16 @@ defmodule Jellyfish.Room.State do put_in(state, [:peers, peer_id, :metadata], metadata) end + @spec add_peer(state :: t(), peer :: Peer.t()) :: t() + def add_peer(state, peer) do + state = put_peer(state, peer) + + Logger.info("Added peer #{inspect(peer.id)}") + Event.broadcast_server_notification({:peer_added, state.id, peer.id}) + + state + end + @spec connect_peer(state :: t(), peer :: Peer.t(), socket_pid :: pid()) :: t() def connect_peer(state, peer, socket_pid) do peer = %{peer | status: :connected, socket_pid: socket_pid} @@ -255,7 +265,9 @@ defmodule Jellyfish.Room.State do @spec remove_peer(state :: t(), peer_id :: Peer.id(), reason :: any()) :: t() def remove_peer(state, peer_id, :timeout) do - {_peer, state} = pop_in(state, [:peers, peer_id]) + {peer, state} = pop_in(state, [:peers, peer_id]) + + Event.broadcast_server_notification({:peer_deleted, state.id, peer.id}) maybe_schedule_peerless_purge(state) end @@ -280,9 +292,13 @@ defmodule Jellyfish.Room.State do :telemetry.execute([:jellyfish, :room], %{peer_disconnects: 1}, %{room_id: state.id}) end - with {:peer_crashed, crash_reason} <- reason do - Event.broadcast_server_notification({:peer_crashed, state.id, peer_id, crash_reason}) - :telemetry.execute([:jellyfish, :room], %{peer_crashes: 1}, %{room_id: state.id}) + case reason do + {:peer_crashed, crash_reason} -> + Event.broadcast_server_notification({:peer_crashed, state.id, peer_id, crash_reason}) + :telemetry.execute([:jellyfish, :room], %{peer_crashes: 1}, %{room_id: state.id}) + + _other -> + Event.broadcast_server_notification({:peer_deleted, state.id, peer.id}) end maybe_schedule_peerless_purge(state) diff --git a/lib/protos/jellyfish/server_notifications.pb.ex b/lib/protos/jellyfish/server_notifications.pb.ex index d00fc40b..5e599133 100644 --- a/lib/protos/jellyfish/server_notifications.pb.ex +++ b/lib/protos/jellyfish/server_notifications.pb.ex @@ -26,6 +26,24 @@ defmodule Jellyfish.ServerMessage.RoomCrashed do field :room_id, 1, type: :string, json_name: "roomId" end +defmodule Jellyfish.ServerMessage.PeerAdded do + @moduledoc false + + use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.12.0" + + field :room_id, 1, type: :string, json_name: "roomId" + field :peer_id, 2, type: :string, json_name: "peerId" +end + +defmodule Jellyfish.ServerMessage.PeerDeleted do + @moduledoc false + + use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.12.0" + + field :room_id, 1, type: :string, json_name: "roomId" + field :peer_id, 2, type: :string, json_name: "peerId" +end + defmodule Jellyfish.ServerMessage.PeerConnected do @moduledoc false @@ -305,4 +323,11 @@ defmodule Jellyfish.ServerMessage do type: Jellyfish.ServerMessage.TrackMetadataUpdated, json_name: "trackMetadataUpdated", oneof: 0 + + field :peer_added, 20, type: Jellyfish.ServerMessage.PeerAdded, json_name: "peerAdded", oneof: 0 + + field :peer_deleted, 21, + type: Jellyfish.ServerMessage.PeerDeleted, + json_name: "peerDeleted", + oneof: 0 end diff --git a/mix.lock b/mix.lock index afde6f0c..bfa58dc0 100644 --- a/mix.lock +++ b/mix.lock @@ -39,7 +39,7 @@ "klotho": {:hex, :klotho, "0.1.2", "3b1f1a569703e0cdce1ba964f41711351a7b06846c38fcbd601faa407e712bf2", [:mix], [], "hexpm", "a6a387982753582e30a5246fe9561721c6b9a4dd27678296cf2cd44faa6f3733"}, "libcluster": {:hex, :libcluster, "3.3.3", "a4f17721a19004cfc4467268e17cff8b1f951befe428975dd4f6f7b84d927fe0", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "7c0a2275a0bb83c07acd17dab3c3bfb4897b145106750eeccc62d302e3bdfee5"}, "logger_json": {:hex, :logger_json, "5.1.4", "9e30a4f2e31a8b9e402bdc20bd37cf9b67d3a31f19d0b33082a19a06b4c50f6d", [:mix], [{:ecto, "~> 2.1 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:phoenix, ">= 1.5.0", [hex: :phoenix, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "3f20eea58e406a33d3eb7814c7dff5accb503bab2ee8601e84da02976fa3934c"}, - "membrane_aac_fdk_plugin": {:hex, :membrane_aac_fdk_plugin, "0.18.7", "4d9af018c22d9291b72d6025941452dd53c7921bcdbc826da8866bb6ecefa8cb", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "79904c3b78882bd0cec15b02928e6b53780602e64a359941acbc9a2408e7b74b"}, + "membrane_aac_fdk_plugin": {:hex, :membrane_aac_fdk_plugin, "0.18.8", "88d47923805cbd9a977fc7e5d3eb8d3028a2e358ad9ad7b124684adc78c2e8ee", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "bb9e706d0949954affd4e295f5d3d4660096997756b5422119800d961c46cc63"}, "membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"}, "membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.18.1", "30433bffd4d5d773f79448dd9afd55d77338721688f09a89b20d742a68cc2c3d", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "8fd048c47d5d2949eb557e19f43f62d534d3af5096187f1a1a3a1694d14b772c"}, "membrane_audio_mix_plugin": {:hex, :membrane_audio_mix_plugin, "0.16.0", "34997707ee186683c6d7bd87572944e5e37c0249235cc44915d181d653c5c40e", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "a4a8c723f0da8d9cf9ac11bf657a732770ea0b8db4eff2efc16caa3a1819f435"}, diff --git a/protos b/protos index b9683a3f..7da5da12 160000 --- a/protos +++ b/protos @@ -1 +1 @@ -Subproject commit b9683a3faec93b90b5f39e68cae387390fceba05 +Subproject commit 7da5da127c8e018ee0c845c921f598b10209271c diff --git a/test/jellyfish_web/integration/server_notification_test.exs b/test/jellyfish_web/integration/server_notification_test.exs index 6fc28cbe..9eabe3d6 100644 --- a/test/jellyfish_web/integration/server_notification_test.exs +++ b/test/jellyfish_web/integration/server_notification_test.exs @@ -20,7 +20,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do HlsUploadCrashed, HlsUploaded, MetricsReport, + PeerAdded, PeerConnected, + PeerDeleted, PeerDisconnected, PeerMetadataUpdated, RoomCrashed, @@ -172,7 +174,7 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do test "doesn't send messages if not subscribed", %{conn: conn} do create_and_authenticate() - trigger_notification(conn) + trigger_notification(conn, false) refute_receive %PeerConnected{}, 200 end @@ -224,6 +226,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do conn = delete(conn, ~p"/room/#{room_id}/peer/#{peer_id}") assert response(conn, :no_content) + assert_receive %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id} + assert_receive {:webhook_notification, %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}} + Klotho.Mock.warp_by(@purge_timeout_ms + 25) assert_receive %RoomDeleted{room_id: ^room_id}, 1_000 @@ -244,6 +249,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do refute_received {:webhook_notification, %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}} + + assert_receive %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id} + assert_receive {:webhook_notification, %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}} end test "sends a message when peer connects and peer is removed", %{conn: conn} do @@ -258,6 +266,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}}, 1_000 + assert_receive %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id} + assert_receive {:webhook_notification, %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}} + _conn = delete(conn, ~p"/room/#{room_id}") assert_receive %RoomDeleted{room_id: ^room_id} @@ -419,6 +430,7 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do assert_receive {:webhook_notification, %RoomCreated{room_id: ^room_id}}, 1_000 {peer_id, token, _conn} = add_peer(conn, room_id) + {:ok, peer_ws} = WS.start("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer) WS.send_auth_request(peer_ws, token) @@ -438,6 +450,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do Klotho.Mock.warp_by(@purge_timeout_ms * 3) + assert_receive %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id} + assert_receive {:webhook_notification, %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}} + assert_receive %RoomDeleted{room_id: ^room_id}, 1_000 assert_receive {:webhook_notification, %RoomDeleted{room_id: ^room_id}}, 1_000 end @@ -592,9 +607,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do ws end - defp add_room_and_peer(conn) do + defp add_room_and_peer(conn, assert_notifications? \\ true) do {room_id, conn} = add_room(conn) - {peer_id, token, conn} = add_peer(conn, room_id) + {peer_id, token, conn} = add_peer(conn, room_id, assert_notifications?) {room_id, peer_id, token, conn} end @@ -612,12 +627,17 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do {room_id, conn} end - defp add_peer(conn, room_id) do + defp add_peer(conn, room_id, assert_notifications? \\ true) do conn = post(conn, ~p"/room/#{room_id}/peer", type: "webrtc") assert %{"token" => peer_token, "peer" => %{"id" => peer_id}} = json_response(conn, :created)["data"] + if assert_notifications? do + assert_receive %PeerAdded{room_id: ^room_id, peer_id: ^peer_id} + assert_receive {:webhook_notification, %PeerAdded{room_id: ^room_id, peer_id: ^peer_id}} + end + {peer_id, peer_token, conn} end @@ -670,8 +690,8 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do {conn, component_id} end - defp trigger_notification(conn) do - {_room_id, _peer_id, peer_token, _conn} = add_room_and_peer(conn) + defp trigger_notification(conn, assert_notifications?) do + {_room_id, _peer_id, peer_token, _conn} = add_room_and_peer(conn, assert_notifications?) {:ok, peer_ws} = WS.start_link("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer) WS.send_auth_request(peer_ws, peer_token)