From f8064128c33e2848d2651994e465f509c7e5dac9 Mon Sep 17 00:00:00 2001 From: Alexandre de Souza Date: Wed, 12 Jun 2024 11:50:33 -0300 Subject: [PATCH 1/7] Use enum values that only Livebook knows --- proto/lib/livebook_proto/app_deployment_status_type.pb.ex | 6 ++---- proto/messages.proto | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/proto/lib/livebook_proto/app_deployment_status_type.pb.ex b/proto/lib/livebook_proto/app_deployment_status_type.pb.ex index ea6c36bf9fd..66f3a40b821 100644 --- a/proto/lib/livebook_proto/app_deployment_status_type.pb.ex +++ b/proto/lib/livebook_proto/app_deployment_status_type.pb.ex @@ -1,8 +1,6 @@ defmodule LivebookProto.AppDeploymentStatusType do use Protobuf, enum: true, syntax: :proto3, protoc_gen_elixir_version: "0.12.0" - field :connecting, 0 - field :preparing, 1 - field :available, 2 - field :deactivated, 4 + field :preparing, 0 + field :available, 1 end diff --git a/proto/messages.proto b/proto/messages.proto index 454bf9d5f8c..f2550880b9c 100644 --- a/proto/messages.proto +++ b/proto/messages.proto @@ -167,10 +167,8 @@ message Agent { * Otherwise, it shouldn't be used. */ enum AppDeploymentStatusType { - connecting = 0; - preparing = 1; - available = 2; - deactivated = 4; + preparing = 0; + available = 1; } message AppDeploymentStatus { From 93976571436b8833a5c513ade603125000bc8ec3 Mon Sep 17 00:00:00 2001 From: Alexandre de Souza Date: Mon, 10 Jun 2024 16:41:47 -0300 Subject: [PATCH 2/7] Allow WebSocket connection to send messages --- lib/livebook/teams/connection.ex | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/lib/livebook/teams/connection.ex b/lib/livebook/teams/connection.ex index 3c46c87bc31..3747e15242c 100644 --- a/lib/livebook/teams/connection.ex +++ b/lib/livebook/teams/connection.ex @@ -21,6 +21,10 @@ defmodule Livebook.Teams.Connection do :gen_statem.start_link(__MODULE__, {listener, headers}, []) end + def send_message(conn, message) do + :gen_statem.call(conn, {:message, message}) + end + ## gen_statem callbacks @impl true @@ -83,6 +87,21 @@ defmodule Livebook.Teams.Connection do :keep_state_and_data end + def handle_event({:call, from}, {:message, message}, @no_state, data) do + case WebSocket.send(data.http_conn, data.websocket, data.ref, {:binary, message}) do + {:ok, conn, websocket} -> + :gen_statem.reply(from, :ok) + {:keep_state, %{data | http_conn: conn, websocket: websocket}} + + {:error, conn, websocket, reason} -> + data = %__MODULE__{data | http_conn: conn, websocket: websocket} + send(data.listener, {:connection_error, reason}) + :gen_statem.reply(from, {:error, reason}) + + {:keep_state, data, {:next_event, :internal, :connect}} + end + end + # Private defp handle_websocket_message(message, %__MODULE__{} = data) do From eb9218d81a0380e8539fea2227bcd96035aa51f6 Mon Sep 17 00:00:00 2001 From: Alexandre de Souza Date: Mon, 10 Jun 2024 16:42:08 -0300 Subject: [PATCH 3/7] Store the WebSocket connection PID in state --- lib/livebook/hubs/team_client.ex | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/livebook/hubs/team_client.ex b/lib/livebook/hubs/team_client.ex index 7328fb49ca2..77397c1f9f2 100644 --- a/lib/livebook/hubs/team_client.ex +++ b/lib/livebook/hubs/team_client.ex @@ -12,6 +12,7 @@ defmodule Livebook.Hubs.TeamClient do @supervisor Livebook.HubsSupervisor defstruct [ + :pid, :hub, :connection_status, :derived_key, @@ -169,8 +170,8 @@ defmodule Livebook.Hubs.TeamClient do ] end - {:ok, _pid} = Teams.Connection.start_link(self(), headers) - {:ok, %__MODULE__{hub: team, derived_key: derived_key}} + {:ok, pid} = Teams.Connection.start_link(self(), headers) + {:ok, %__MODULE__{pid: pid, hub: team, derived_key: derived_key}} end def init(%Hubs.Team{} = team) do From 51ba7936274cac910708c96bac1fbc8480cf2d82 Mon Sep 17 00:00:00 2001 From: Alexandre de Souza Date: Mon, 10 Jun 2024 16:43:58 -0300 Subject: [PATCH 4/7] Send app deployment status report to Teams server --- lib/livebook/hubs/team_client.ex | 38 ++++++++++++++++---------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/lib/livebook/hubs/team_client.ex b/lib/livebook/hubs/team_client.ex index 77397c1f9f2..dfb355e5d4e 100644 --- a/lib/livebook/hubs/team_client.ex +++ b/lib/livebook/hubs/team_client.ex @@ -2,6 +2,7 @@ defmodule Livebook.Hubs.TeamClient do use GenServer require Logger + alias Livebook.Apps.TeamsAppSpec alias Livebook.FileSystem alias Livebook.FileSystems alias Livebook.Hubs @@ -273,15 +274,17 @@ defmodule Livebook.Hubs.TeamClient do {:noreply, handle_event(topic, data, state)} end - def handle_info({:apps_manager_status, status_entries}, state) do + def handle_info({:apps_manager_status, status_entries}, %{hub: %{id: id}} = state) do app_deployment_statuses = - for %{ - app_spec: %Livebook.Apps.TeamsAppSpec{} = app_spec, - running?: running? - } <- status_entries, - app_spec.hub_id == state.hub.id do - status = if(running?, do: :available, else: :processing) - %{version: app_spec.version, status: status} + for %{app_spec: %TeamsAppSpec{hub_id: ^id} = app_spec, running?: running?} <- status_entries do + status = if running?, do: :available, else: :preparing + + %LivebookProto.AppDeploymentStatus{ + id: app_spec.app_deployment_id, + deployment_group_id: state.deployment_group_id, + version: app_spec.version, + status: status + } end # The manager can send the status list even if it didn't change, @@ -290,17 +293,14 @@ defmodule Livebook.Hubs.TeamClient do if app_deployment_statuses == state.app_deployment_statuses do {:noreply, state} else - # TODO: send this status list to Teams and set the statuses in - # the database. Note that app deployments for this deployment - # group (this agent), that are not present in this list, we - # effectively no longer know about, so we may want to reset - # their status. - - # TODO: we want :version to be built on Teams server and just - # passed down to Livebook, so that Livebook does not care if - # we upsert app deployments or not. With that, we can also - # freely send the version with status here, and the server will - # recognise it. + report = %LivebookProto.AppDeploymentStatusReport{ + app_deployment_statuses: app_deployment_statuses + } + + Logger.debug("Sending apps manager report to Teams server #{inspect(report)}") + + message = LivebookProto.AppDeploymentStatusReport.encode(report) + :ok = Teams.Connection.send_message(state.pid, message) {:noreply, %{state | app_deployment_statuses: app_deployment_statuses}} end From bd7b31b3277ae748b569776ae7eb074d8b2a3c20 Mon Sep 17 00:00:00 2001 From: Alexandre de Souza Date: Wed, 12 Jun 2024 18:15:18 -0300 Subject: [PATCH 5/7] Improve tests --- test/livebook_teams/hubs/team_client_test.exs | 40 +++++++++++++++++-- test/livebook_teams/hubs_test.exs | 8 ---- test/livebook_teams/teams_test.exs | 8 ---- test/support/hub_helpers.ex | 18 +++++++++ 4 files changed, 55 insertions(+), 19 deletions(-) diff --git a/test/livebook_teams/hubs/team_client_test.exs b/test/livebook_teams/hubs/team_client_test.exs index c126569ba10..c3ec4a1c417 100644 --- a/test/livebook_teams/hubs/team_client_test.exs +++ b/test/livebook_teams/hubs/team_client_test.exs @@ -686,10 +686,26 @@ defmodule Livebook.Hubs.TeamClientTest do agent_connected = %{agent_connected | app_deployments: [livebook_proto_app_deployment]} Livebook.Apps.subscribe() + Livebook.Apps.Manager.subscribe() + + assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{} send(pid, {:event, :agent_connected, agent_connected}) assert_receive {:app_deployment_started, ^app_deployment} - assert_receive {:app_created, %{slug: ^slug}} + + [app_spec] = Livebook.Hubs.Provider.get_app_specs(team) + Livebook.Apps.Manager.sync_permanent_apps() + + assert_receive {:app_created, %{slug: ^slug}}, 3_000 + assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: false}]} + + assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{ + app_spec.version => %{ + id: app_spec.app_deployment_id, + status: :preparing, + deployment_group_id: deployment_group_id + } + } assert_receive {:app_updated, %{ @@ -698,11 +714,24 @@ defmodule Livebook.Hubs.TeamClientTest do sessions: [%{app_status: %{execution: :executed}}] }} + assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: true}]} assert app_deployment in TeamClient.get_app_deployments(team.id) - agent_connected = %{agent_connected | app_deployments: []} - send(pid, {:event, :agent_connected, agent_connected}) + # TODO: Replace this with a better solution + Process.sleep(100) + + assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{ + app_spec.version => %{ + id: app_spec.app_deployment_id, + status: :available, + deployment_group_id: deployment_group_id + } + } + + erpc_call(node, :toggle_app_deployment, [app_deployment.id, teams_org.id]) + assert_receive {:app_deployment_stopped, ^app_deployment} + assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: false}]} refute app_deployment in TeamClient.get_app_deployments(team.id) assert_receive {:app_closed, @@ -711,6 +740,11 @@ defmodule Livebook.Hubs.TeamClientTest do warnings: [], sessions: [%{app_status: %{execution: :executed}}] }} + + # TODO: Replace this with a better solution + Process.sleep(200) + + assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{} end test "dispatches the agents list", diff --git a/test/livebook_teams/hubs_test.exs b/test/livebook_teams/hubs_test.exs index 1fd77f51ee0..049f6190b65 100644 --- a/test/livebook_teams/hubs_test.exs +++ b/test/livebook_teams/hubs_test.exs @@ -250,14 +250,6 @@ defmodule Livebook.HubsTest do Hubs.Provider.verify_notebook_stamp(team, notebook_source <> "change\n", stamp) end - defp connect_to_teams(user, node) do - %{id: id} = team = create_team_hub(user, node) - assert_receive {:hub_connected, ^id} - assert_receive {:client_connected, ^id} - - team - end - defp secret_name(%{id: id}) do id |> String.replace("-", "_") diff --git a/test/livebook_teams/teams_test.exs b/test/livebook_teams/teams_test.exs index cb343a63aa1..c02dae797dd 100644 --- a/test/livebook_teams/teams_test.exs +++ b/test/livebook_teams/teams_test.exs @@ -258,12 +258,4 @@ defmodule Livebook.TeamsTest do assert_receive {:app_deployment_stopped, ^app_deployment2} end end - - defp connect_to_teams(user, node) do - %{id: id} = team = create_team_hub(user, node) - assert_receive {:hub_connected, ^id}, 3_000 - assert_receive {:client_connected, ^id}, 3_000 - - team - end end diff --git a/test/support/hub_helpers.ex b/test/support/hub_helpers.ex index a4bacc2b44c..022cbcdaa25 100644 --- a/test/support/hub_helpers.ex +++ b/test/support/hub_helpers.ex @@ -281,6 +281,24 @@ defmodule Livebook.HubHelpers do assert_receive {:agent_joined, ^agent} end + @doc """ + Creates a new Team hub from given user and node, and await the WebSocket to be connected. + + test "my test", %{user: user, node: node} do + team = connect_to_teams(user, node) + assert "team-" <> _ = team.id + end + + """ + @spec connect_to_teams(struct(), node()) :: Livebook.Hubs.Team.t() + def connect_to_teams(user, node) do + %{id: id} = team = create_team_hub(user, node) + assert_receive {:hub_connected, ^id}, 3_000 + assert_receive {:client_connected, ^id}, 3_000 + + team + end + defp hub_pid(hub) do if pid = GenServer.whereis({:via, Registry, {Livebook.HubsRegistry, hub.id}}) do {:ok, pid} From 7430b079d7c6efa229777d107c944160a1672979 Mon Sep 17 00:00:00 2001 From: Alexandre de Souza Date: Fri, 14 Jun 2024 15:09:44 -0300 Subject: [PATCH 6/7] Apply review comments --- lib/livebook/hubs/team_client.ex | 12 ++++++------ test/livebook_teams/hubs/team_client_test.exs | 10 ++++------ 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/lib/livebook/hubs/team_client.ex b/lib/livebook/hubs/team_client.ex index dfb355e5d4e..fc8310d8ec6 100644 --- a/lib/livebook/hubs/team_client.ex +++ b/lib/livebook/hubs/team_client.ex @@ -2,7 +2,7 @@ defmodule Livebook.Hubs.TeamClient do use GenServer require Logger - alias Livebook.Apps.TeamsAppSpec + alias Livebook.Apps alias Livebook.FileSystem alias Livebook.FileSystems alias Livebook.Hubs @@ -148,7 +148,7 @@ defmodule Livebook.Hubs.TeamClient do @impl true def init(%Hubs.Team{offline: nil} = team) do - Livebook.Apps.Manager.subscribe() + Apps.Manager.subscribe() derived_key = Teams.derive_key(team.teams_key) @@ -274,9 +274,9 @@ defmodule Livebook.Hubs.TeamClient do {:noreply, handle_event(topic, data, state)} end - def handle_info({:apps_manager_status, status_entries}, %{hub: %{id: id}} = state) do + def handle_info({:apps_manager_status, entries}, %{hub: %{id: id}} = state) do app_deployment_statuses = - for %{app_spec: %TeamsAppSpec{hub_id: ^id} = app_spec, running?: running?} <- status_entries do + for %{app_spec: %Apps.TeamsAppSpec{hub_id: ^id} = app_spec, running?: running?} <- entries do status = if running?, do: :available, else: :preparing %LivebookProto.AppDeploymentStatus{ @@ -808,8 +808,8 @@ defmodule Livebook.Hubs.TeamClient do defp manager_sync() do # Each node runs the teams client, but we only need to call sync once - if Livebook.Apps.Manager.local?() do - Livebook.Apps.Manager.sync_permanent_apps() + if Apps.Manager.local?() do + Apps.Manager.sync_permanent_apps() end end end diff --git a/test/livebook_teams/hubs/team_client_test.exs b/test/livebook_teams/hubs/team_client_test.exs index c3ec4a1c417..6d09b6ad134 100644 --- a/test/livebook_teams/hubs/team_client_test.exs +++ b/test/livebook_teams/hubs/team_client_test.exs @@ -687,6 +687,7 @@ defmodule Livebook.Hubs.TeamClientTest do Livebook.Apps.subscribe() Livebook.Apps.Manager.subscribe() + erpc_call(node, :subscribe, [self(), teams_deployment_group, teams_org]) assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{} @@ -698,6 +699,7 @@ defmodule Livebook.Hubs.TeamClientTest do assert_receive {:app_created, %{slug: ^slug}}, 3_000 assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: false}]} + assert_receive {:teams_broadcast, {:agent_updated, _agent}} assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{ app_spec.version => %{ @@ -716,9 +718,7 @@ defmodule Livebook.Hubs.TeamClientTest do assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: true}]} assert app_deployment in TeamClient.get_app_deployments(team.id) - - # TODO: Replace this with a better solution - Process.sleep(100) + assert_receive {:teams_broadcast, {:agent_updated, _agent}} assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{ app_spec.version => %{ @@ -741,9 +741,7 @@ defmodule Livebook.Hubs.TeamClientTest do sessions: [%{app_status: %{execution: :executed}}] }} - # TODO: Replace this with a better solution - Process.sleep(200) - + assert_receive {:teams_broadcast, {:agent_updated, _agent}} assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{} end From 7818cfdfd34325b4046c7209e3e6406ac048c6a1 Mon Sep 17 00:00:00 2001 From: Alexandre de Souza Date: Mon, 17 Jun 2024 10:04:51 -0300 Subject: [PATCH 7/7] Apply review comments --- lib/livebook/hubs/team_client.ex | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/livebook/hubs/team_client.ex b/lib/livebook/hubs/team_client.ex index fc8310d8ec6..dc06bf5edac 100644 --- a/lib/livebook/hubs/team_client.ex +++ b/lib/livebook/hubs/team_client.ex @@ -13,7 +13,7 @@ defmodule Livebook.Hubs.TeamClient do @supervisor Livebook.HubsSupervisor defstruct [ - :pid, + :connection_pid, :hub, :connection_status, :derived_key, @@ -172,7 +172,7 @@ defmodule Livebook.Hubs.TeamClient do end {:ok, pid} = Teams.Connection.start_link(self(), headers) - {:ok, %__MODULE__{pid: pid, hub: team, derived_key: derived_key}} + {:ok, %__MODULE__{connection_pid: pid, hub: team, derived_key: derived_key}} end def init(%Hubs.Team{} = team) do @@ -300,7 +300,7 @@ defmodule Livebook.Hubs.TeamClient do Logger.debug("Sending apps manager report to Teams server #{inspect(report)}") message = LivebookProto.AppDeploymentStatusReport.encode(report) - :ok = Teams.Connection.send_message(state.pid, message) + :ok = Teams.Connection.send_message(state.connection_pid, message) {:noreply, %{state | app_deployment_statuses: app_deployment_statuses}} end