Skip to content

Commit

Permalink
Ensure single subscription (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
roznawsk authored Jul 20, 2023
1 parent ea67d58 commit f316fae
Showing 1 changed file with 31 additions and 19 deletions.
50 changes: 31 additions & 19 deletions lib/jellyfish_web/server_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule JellyfishWeb.ServerSocket do
alias Jellyfish.ServerMessage.SubscribeResponse.{RoomNotFound, RoomsState, RoomState}

@heartbeat_interval 30_000
@valid_subscribe_topics [:server_notification, :metrics]

@impl true
def child_spec(_opts), do: :ignore
Expand All @@ -38,7 +39,14 @@ defmodule JellyfishWeb.ServerSocket do

@impl true
def init(state) do
{:ok, Map.put(state, :authenticated?, false)}
state =
state
|> Map.merge(%{
authenticated?: false,
subscriptions: MapSet.new()
})

{:ok, state}
end

@impl true
Expand Down Expand Up @@ -84,7 +92,7 @@ defmodule JellyfishWeb.ServerSocket do
def handle_in({encoded_message, [opcode: :binary]}, state) do
with %ServerMessage{content: {:subscribe_request, request}} <-
ServerMessage.decode(encoded_message),
{:ok, response} <- handle_subscribe(request) do
{:ok, response, state} <- handle_subscribe(request, state) do
reply =
%ServerMessage{
content: {:subscribe_response, response}
Expand Down Expand Up @@ -116,29 +124,33 @@ defmodule JellyfishWeb.ServerSocket do
{:stop, :closed, {1003, "operation not allowed"}, state}
end

defp handle_subscribe(%SubscribeRequest{
id: id,
event_type:
{:server_notification,
%SubscribeRequest.ServerNotification{room_id: {_variant, option}}}
}) do
:ok = Phoenix.PubSub.subscribe(Jellyfish.PubSub, "server_notification")
defp handle_subscribe(
%SubscribeRequest{
id: id,
event_type: {topic, event_type}
},
state
)
when topic in @valid_subscribe_topics do
unless MapSet.member?(state.subscriptions, topic) do
:ok = Phoenix.PubSub.subscribe(Jellyfish.PubSub, Atom.to_string(topic))
end

room_state = get_room_state(option)
state = update_in(state.subscriptions, &MapSet.put(&1, topic))

{:ok, %SubscribeResponse{id: id, content: room_state}}
end
content =
case event_type do
%SubscribeRequest.ServerNotification{room_id: {_variant, option}} ->
get_room_state(option)

defp handle_subscribe(%SubscribeRequest{
id: id,
event_type: {:metrics, %SubscribeRequest.Metrics{}}
}) do
:ok = Phoenix.PubSub.subscribe(Jellyfish.PubSub, "metrics")
%SubscribeRequest.Metrics{} ->
nil
end

{:ok, %SubscribeResponse{id: id}}
{:ok, %SubscribeResponse{id: id, content: content}, state}
end

defp handle_subscribe(request), do: {:error, request}
defp handle_subscribe(request, _state), do: {:error, request}

@impl true
def handle_info(:send_ping, state) do
Expand Down

0 comments on commit f316fae

Please sign in to comment.