Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send the apps manager report to Teams through WebSocket #2647

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions lib/livebook/hubs/team_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Livebook.Hubs.TeamClient do
use GenServer
require Logger

alias Livebook.Apps.TeamsAppSpec
aleDsz marked this conversation as resolved.
Show resolved Hide resolved
alias Livebook.FileSystem
alias Livebook.FileSystems
alias Livebook.Hubs
Expand All @@ -12,6 +13,7 @@ defmodule Livebook.Hubs.TeamClient do
@supervisor Livebook.HubsSupervisor

defstruct [
:pid,
aleDsz marked this conversation as resolved.
Show resolved Hide resolved
:hub,
:connection_status,
:derived_key,
Expand Down Expand Up @@ -169,8 +171,8 @@ defmodule Livebook.Hubs.TeamClient do
]
end

{:ok, _pid} = Teams.Connection.start_link(self(), headers)
{:ok, %__MODULE__{hub: team, derived_key: derived_key}}
{:ok, pid} = Teams.Connection.start_link(self(), headers)
{:ok, %__MODULE__{pid: pid, hub: team, derived_key: derived_key}}
end

def init(%Hubs.Team{} = team) do
Expand Down Expand Up @@ -272,15 +274,17 @@ defmodule Livebook.Hubs.TeamClient do
{:noreply, handle_event(topic, data, state)}
end

def handle_info({:apps_manager_status, status_entries}, state) do
def handle_info({:apps_manager_status, status_entries}, %{hub: %{id: id}} = state) do
app_deployment_statuses =
for %{
app_spec: %Livebook.Apps.TeamsAppSpec{} = app_spec,
running?: running?
} <- status_entries,
app_spec.hub_id == state.hub.id do
status = if(running?, do: :available, else: :processing)
%{version: app_spec.version, status: status}
for %{app_spec: %TeamsAppSpec{hub_id: ^id} = app_spec, running?: running?} <- status_entries do
status = if running?, do: :available, else: :preparing

%LivebookProto.AppDeploymentStatus{
id: app_spec.app_deployment_id,
deployment_group_id: state.deployment_group_id,
version: app_spec.version,
status: status
}
end

# The manager can send the status list even if it didn't change,
Expand All @@ -289,17 +293,14 @@ defmodule Livebook.Hubs.TeamClient do
if app_deployment_statuses == state.app_deployment_statuses do
{:noreply, state}
else
# TODO: send this status list to Teams and set the statuses in
# the database. Note that app deployments for this deployment
# group (this agent), that are not present in this list, we
# effectively no longer know about, so we may want to reset
# their status.

# TODO: we want :version to be built on Teams server and just
# passed down to Livebook, so that Livebook does not care if
# we upsert app deployments or not. With that, we can also
# freely send the version with status here, and the server will
# recognise it.
report = %LivebookProto.AppDeploymentStatusReport{
app_deployment_statuses: app_deployment_statuses
}

Logger.debug("Sending apps manager report to Teams server #{inspect(report)}")

message = LivebookProto.AppDeploymentStatusReport.encode(report)
:ok = Teams.Connection.send_message(state.pid, message)

{:noreply, %{state | app_deployment_statuses: app_deployment_statuses}}
end
Expand Down
19 changes: 19 additions & 0 deletions lib/livebook/teams/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ defmodule Livebook.Teams.Connection do
:gen_statem.start_link(__MODULE__, {listener, headers}, [])
end

def send_message(conn, message) do
:gen_statem.call(conn, {:message, message})
end

## gen_statem callbacks

@impl true
Expand Down Expand Up @@ -83,6 +87,21 @@ defmodule Livebook.Teams.Connection do
:keep_state_and_data
end

def handle_event({:call, from}, {:message, message}, @no_state, data) do
case WebSocket.send(data.http_conn, data.websocket, data.ref, {:binary, message}) do
{:ok, conn, websocket} ->
:gen_statem.reply(from, :ok)
{:keep_state, %{data | http_conn: conn, websocket: websocket}}

{:error, conn, websocket, reason} ->
data = %__MODULE__{data | http_conn: conn, websocket: websocket}
send(data.listener, {:connection_error, reason})
:gen_statem.reply(from, {:error, reason})

{:keep_state, data, {:next_event, :internal, :connect}}
end
end

# Private

defp handle_websocket_message(message, %__MODULE__{} = data) do
Expand Down
6 changes: 2 additions & 4 deletions proto/lib/livebook_proto/app_deployment_status_type.pb.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
defmodule LivebookProto.AppDeploymentStatusType do
use Protobuf, enum: true, syntax: :proto3, protoc_gen_elixir_version: "0.12.0"

field :connecting, 0
field :preparing, 1
field :available, 2
field :deactivated, 4
field :preparing, 0
field :available, 1
end
6 changes: 2 additions & 4 deletions proto/messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,8 @@ message Agent {
* Otherwise, it shouldn't be used.
*/
enum AppDeploymentStatusType {
connecting = 0;
preparing = 1;
available = 2;
deactivated = 4;
preparing = 0;
available = 1;
}

message AppDeploymentStatus {
Expand Down
40 changes: 37 additions & 3 deletions test/livebook_teams/hubs/team_client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -686,10 +686,26 @@ defmodule Livebook.Hubs.TeamClientTest do
agent_connected = %{agent_connected | app_deployments: [livebook_proto_app_deployment]}

Livebook.Apps.subscribe()
Livebook.Apps.Manager.subscribe()

assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{}

send(pid, {:event, :agent_connected, agent_connected})
assert_receive {:app_deployment_started, ^app_deployment}
assert_receive {:app_created, %{slug: ^slug}}

[app_spec] = Livebook.Hubs.Provider.get_app_specs(team)
Livebook.Apps.Manager.sync_permanent_apps()

assert_receive {:app_created, %{slug: ^slug}}, 3_000
assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: false}]}

assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{
app_spec.version => %{
id: app_spec.app_deployment_id,
status: :preparing,
deployment_group_id: deployment_group_id
}
}

assert_receive {:app_updated,
%{
Expand All @@ -698,11 +714,24 @@ defmodule Livebook.Hubs.TeamClientTest do
sessions: [%{app_status: %{execution: :executed}}]
}}

assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: true}]}
assert app_deployment in TeamClient.get_app_deployments(team.id)

agent_connected = %{agent_connected | app_deployments: []}
send(pid, {:event, :agent_connected, agent_connected})
# TODO: Replace this with a better solution
Process.sleep(100)
aleDsz marked this conversation as resolved.
Show resolved Hide resolved

assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{
app_spec.version => %{
id: app_spec.app_deployment_id,
status: :available,
deployment_group_id: deployment_group_id
}
}

erpc_call(node, :toggle_app_deployment, [app_deployment.id, teams_org.id])

assert_receive {:app_deployment_stopped, ^app_deployment}
assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: false}]}
refute app_deployment in TeamClient.get_app_deployments(team.id)

assert_receive {:app_closed,
Expand All @@ -711,6 +740,11 @@ defmodule Livebook.Hubs.TeamClientTest do
warnings: [],
sessions: [%{app_status: %{execution: :executed}}]
}}

# TODO: Replace this with a better solution
Process.sleep(200)

assert erpc_call(node, :get_apps_metadatas, [deployment_group_id]) == %{}
end

test "dispatches the agents list",
Expand Down
8 changes: 0 additions & 8 deletions test/livebook_teams/hubs_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,6 @@ defmodule Livebook.HubsTest do
Hubs.Provider.verify_notebook_stamp(team, notebook_source <> "change\n", stamp)
end

defp connect_to_teams(user, node) do
%{id: id} = team = create_team_hub(user, node)
assert_receive {:hub_connected, ^id}
assert_receive {:client_connected, ^id}

team
end

defp secret_name(%{id: id}) do
id
|> String.replace("-", "_")
Expand Down
8 changes: 0 additions & 8 deletions test/livebook_teams/teams_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,4 @@ defmodule Livebook.TeamsTest do
assert_receive {:app_deployment_stopped, ^app_deployment2}
end
end

defp connect_to_teams(user, node) do
%{id: id} = team = create_team_hub(user, node)
assert_receive {:hub_connected, ^id}, 3_000
assert_receive {:client_connected, ^id}, 3_000

team
end
end
18 changes: 18 additions & 0 deletions test/support/hub_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,24 @@ defmodule Livebook.HubHelpers do
assert_receive {:agent_joined, ^agent}
end

@doc """
Creates a new Team hub from given user and node, and await the WebSocket to be connected.

test "my test", %{user: user, node: node} do
team = connect_to_teams(user, node)
assert "team-" <> _ = team.id
end

"""
@spec connect_to_teams(struct(), node()) :: Livebook.Hubs.Team.t()
def connect_to_teams(user, node) do
%{id: id} = team = create_team_hub(user, node)
assert_receive {:hub_connected, ^id}, 3_000
assert_receive {:client_connected, ^id}, 3_000

team
end

defp hub_pid(hub) do
if pid = GenServer.whereis({:via, Registry, {Livebook.HubsRegistry, hub.id}}) do
{:ok, pid}
Expand Down
Loading