Skip to content

Commit

Permalink
RTC-520 add notifications: PeerAdded, PeerDeleted (#183)
Browse files Browse the repository at this point in the history
* WiP

* Update protos

* Use PeerAdded and PeerDeleted in tests

* Fix credo issue

* Update protos
  • Loading branch information
Rados13 authored Apr 23, 2024
1 parent 69e3a61 commit 325318e
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 15 deletions.
8 changes: 8 additions & 0 deletions lib/jellyfish/event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ defmodule Jellyfish.Event do
HlsUploadCrashed,
HlsUploaded,
MetricsReport,
PeerAdded,
PeerConnected,
PeerCrashed,
PeerDeleted,
PeerDisconnected,
PeerMetadataUpdated,
RoomCrashed,
Expand Down Expand Up @@ -54,6 +56,12 @@ defmodule Jellyfish.Event do
defp to_proto_server_notification({:room_crashed, room_id}),
do: {:room_crashed, %RoomCrashed{room_id: room_id}}

defp to_proto_server_notification({:peer_added, room_id, peer_id}),
do: {:peer_added, %PeerAdded{room_id: room_id, peer_id: peer_id}}

defp to_proto_server_notification({:peer_deleted, room_id, peer_id}),
do: {:peer_deleted, %PeerDeleted{room_id: room_id, peer_id: peer_id}}

defp to_proto_server_notification({:peer_connected, room_id, peer_id}),
do: {:peer_connected, %PeerConnected{room_id: room_id, peer_id: peer_id}}

Expand Down
4 changes: 1 addition & 3 deletions lib/jellyfish/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ defmodule Jellyfish.Room do
with false <- State.reached_peers_limit?(state),
options <- State.generate_peer_options(state, override_options),
{:ok, peer} <- Peer.new(peer_type, options) do
state = State.put_peer(state, peer)

Logger.info("Added peer #{inspect(peer.id)}")
state = State.add_peer(state, peer)

{:reply, {:ok, peer}, state}
else
Expand Down
24 changes: 20 additions & 4 deletions lib/jellyfish/room/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ defmodule Jellyfish.Room.State do
put_in(state, [:peers, peer_id, :metadata], metadata)
end

@spec add_peer(state :: t(), peer :: Peer.t()) :: t()
def add_peer(state, peer) do
state = put_peer(state, peer)

Logger.info("Added peer #{inspect(peer.id)}")
Event.broadcast_server_notification({:peer_added, state.id, peer.id})

state
end

@spec connect_peer(state :: t(), peer :: Peer.t(), socket_pid :: pid()) :: t()
def connect_peer(state, peer, socket_pid) do
peer = %{peer | status: :connected, socket_pid: socket_pid}
Expand Down Expand Up @@ -255,7 +265,9 @@ defmodule Jellyfish.Room.State do

@spec remove_peer(state :: t(), peer_id :: Peer.id(), reason :: any()) :: t()
def remove_peer(state, peer_id, :timeout) do
{_peer, state} = pop_in(state, [:peers, peer_id])
{peer, state} = pop_in(state, [:peers, peer_id])

Event.broadcast_server_notification({:peer_deleted, state.id, peer.id})

maybe_schedule_peerless_purge(state)
end
Expand All @@ -280,9 +292,13 @@ defmodule Jellyfish.Room.State do
:telemetry.execute([:jellyfish, :room], %{peer_disconnects: 1}, %{room_id: state.id})
end

with {:peer_crashed, crash_reason} <- reason do
Event.broadcast_server_notification({:peer_crashed, state.id, peer_id, crash_reason})
:telemetry.execute([:jellyfish, :room], %{peer_crashes: 1}, %{room_id: state.id})
case reason do
{:peer_crashed, crash_reason} ->
Event.broadcast_server_notification({:peer_crashed, state.id, peer_id, crash_reason})
:telemetry.execute([:jellyfish, :room], %{peer_crashes: 1}, %{room_id: state.id})

_other ->
Event.broadcast_server_notification({:peer_deleted, state.id, peer.id})
end

maybe_schedule_peerless_purge(state)
Expand Down
25 changes: 25 additions & 0 deletions lib/protos/jellyfish/server_notifications.pb.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ defmodule Jellyfish.ServerMessage.RoomCrashed do
field :room_id, 1, type: :string, json_name: "roomId"
end

defmodule Jellyfish.ServerMessage.PeerAdded do
@moduledoc false

use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.12.0"

field :room_id, 1, type: :string, json_name: "roomId"
field :peer_id, 2, type: :string, json_name: "peerId"
end

defmodule Jellyfish.ServerMessage.PeerDeleted do
@moduledoc false

use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.12.0"

field :room_id, 1, type: :string, json_name: "roomId"
field :peer_id, 2, type: :string, json_name: "peerId"
end

defmodule Jellyfish.ServerMessage.PeerConnected do
@moduledoc false

Expand Down Expand Up @@ -305,4 +323,11 @@ defmodule Jellyfish.ServerMessage do
type: Jellyfish.ServerMessage.TrackMetadataUpdated,
json_name: "trackMetadataUpdated",
oneof: 0

field :peer_added, 20, type: Jellyfish.ServerMessage.PeerAdded, json_name: "peerAdded", oneof: 0

field :peer_deleted, 21,
type: Jellyfish.ServerMessage.PeerDeleted,
json_name: "peerDeleted",
oneof: 0
end
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"klotho": {:hex, :klotho, "0.1.2", "3b1f1a569703e0cdce1ba964f41711351a7b06846c38fcbd601faa407e712bf2", [:mix], [], "hexpm", "a6a387982753582e30a5246fe9561721c6b9a4dd27678296cf2cd44faa6f3733"},
"libcluster": {:hex, :libcluster, "3.3.3", "a4f17721a19004cfc4467268e17cff8b1f951befe428975dd4f6f7b84d927fe0", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "7c0a2275a0bb83c07acd17dab3c3bfb4897b145106750eeccc62d302e3bdfee5"},
"logger_json": {:hex, :logger_json, "5.1.4", "9e30a4f2e31a8b9e402bdc20bd37cf9b67d3a31f19d0b33082a19a06b4c50f6d", [:mix], [{:ecto, "~> 2.1 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:phoenix, ">= 1.5.0", [hex: :phoenix, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "3f20eea58e406a33d3eb7814c7dff5accb503bab2ee8601e84da02976fa3934c"},
"membrane_aac_fdk_plugin": {:hex, :membrane_aac_fdk_plugin, "0.18.7", "4d9af018c22d9291b72d6025941452dd53c7921bcdbc826da8866bb6ecefa8cb", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "79904c3b78882bd0cec15b02928e6b53780602e64a359941acbc9a2408e7b74b"},
"membrane_aac_fdk_plugin": {:hex, :membrane_aac_fdk_plugin, "0.18.8", "88d47923805cbd9a977fc7e5d3eb8d3028a2e358ad9ad7b124684adc78c2e8ee", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "bb9e706d0949954affd4e295f5d3d4660096997756b5422119800d961c46cc63"},
"membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"},
"membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.18.1", "30433bffd4d5d773f79448dd9afd55d77338721688f09a89b20d742a68cc2c3d", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "8fd048c47d5d2949eb557e19f43f62d534d3af5096187f1a1a3a1694d14b772c"},
"membrane_audio_mix_plugin": {:hex, :membrane_audio_mix_plugin, "0.16.0", "34997707ee186683c6d7bd87572944e5e37c0249235cc44915d181d653c5c40e", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "a4a8c723f0da8d9cf9ac11bf657a732770ea0b8db4eff2efc16caa3a1819f435"},
Expand Down
2 changes: 1 addition & 1 deletion protos
32 changes: 26 additions & 6 deletions test/jellyfish_web/integration/server_notification_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do
HlsUploadCrashed,
HlsUploaded,
MetricsReport,
PeerAdded,
PeerConnected,
PeerDeleted,
PeerDisconnected,
PeerMetadataUpdated,
RoomCrashed,
Expand Down Expand Up @@ -172,7 +174,7 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do
test "doesn't send messages if not subscribed", %{conn: conn} do
create_and_authenticate()

trigger_notification(conn)
trigger_notification(conn, false)

refute_receive %PeerConnected{}, 200
end
Expand Down Expand Up @@ -224,6 +226,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do
conn = delete(conn, ~p"/room/#{room_id}/peer/#{peer_id}")
assert response(conn, :no_content)

assert_receive %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}
assert_receive {:webhook_notification, %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}}

Klotho.Mock.warp_by(@purge_timeout_ms + 25)

assert_receive %RoomDeleted{room_id: ^room_id}, 1_000
Expand All @@ -244,6 +249,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do

refute_received {:webhook_notification,
%PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}}

assert_receive %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}
assert_receive {:webhook_notification, %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}}
end

test "sends a message when peer connects and peer is removed", %{conn: conn} do
Expand All @@ -258,6 +266,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do
%PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}},
1_000

assert_receive %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}
assert_receive {:webhook_notification, %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}}

_conn = delete(conn, ~p"/room/#{room_id}")
assert_receive %RoomDeleted{room_id: ^room_id}

Expand Down Expand Up @@ -419,6 +430,7 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do
assert_receive {:webhook_notification, %RoomCreated{room_id: ^room_id}}, 1_000

{peer_id, token, _conn} = add_peer(conn, room_id)

{:ok, peer_ws} = WS.start("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer)
WS.send_auth_request(peer_ws, token)

Expand All @@ -438,6 +450,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do

Klotho.Mock.warp_by(@purge_timeout_ms * 3)

assert_receive %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}
assert_receive {:webhook_notification, %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}}

assert_receive %RoomDeleted{room_id: ^room_id}, 1_000
assert_receive {:webhook_notification, %RoomDeleted{room_id: ^room_id}}, 1_000
end
Expand Down Expand Up @@ -592,9 +607,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do
ws
end

defp add_room_and_peer(conn) do
defp add_room_and_peer(conn, assert_notifications? \\ true) do
{room_id, conn} = add_room(conn)
{peer_id, token, conn} = add_peer(conn, room_id)
{peer_id, token, conn} = add_peer(conn, room_id, assert_notifications?)

{room_id, peer_id, token, conn}
end
Expand All @@ -612,12 +627,17 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do
{room_id, conn}
end

defp add_peer(conn, room_id) do
defp add_peer(conn, room_id, assert_notifications? \\ true) do
conn = post(conn, ~p"/room/#{room_id}/peer", type: "webrtc")

assert %{"token" => peer_token, "peer" => %{"id" => peer_id}} =
json_response(conn, :created)["data"]

if assert_notifications? do
assert_receive %PeerAdded{room_id: ^room_id, peer_id: ^peer_id}
assert_receive {:webhook_notification, %PeerAdded{room_id: ^room_id, peer_id: ^peer_id}}
end

{peer_id, peer_token, conn}
end

Expand Down Expand Up @@ -670,8 +690,8 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do
{conn, component_id}
end

defp trigger_notification(conn) do
{_room_id, _peer_id, peer_token, _conn} = add_room_and_peer(conn)
defp trigger_notification(conn, assert_notifications?) do
{_room_id, _peer_id, peer_token, _conn} = add_room_and_peer(conn, assert_notifications?)

{:ok, peer_ws} = WS.start_link("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer)
WS.send_auth_request(peer_ws, peer_token)
Expand Down

0 comments on commit 325318e

Please sign in to comment.