Skip to content

Commit

Permalink
Send status reports from apps manager (#2624)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatanklosko authored May 29, 2024
1 parent a344a42 commit 586c864
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 13 deletions.
72 changes: 60 additions & 12 deletions lib/livebook/apps/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,23 @@ defmodule Livebook.Apps.Manager do
GenServer.cast({:global, @name}, :sync_permanent_apps)
end

@doc """
Subscribes to manager reports.
The messages are only sent within the node that the manager runs on.
## Messages
* `{:apps_manager_status, status_entries}` - reports which permanent
app specs are running, and which are pending. Note that in some
cases the status may be sent, even if the entries do not change
"""
@spec subscribe() :: :ok | {:error, term()}
def subscribe() do
Phoenix.PubSub.subscribe(Livebook.PubSub, "apps_manager")
end

@impl true
def init({}) do
Apps.subscribe()
Expand Down Expand Up @@ -132,40 +149,45 @@ defmodule Livebook.Apps.Manager do

defp sync_apps(state) do
permanent_app_specs = Apps.get_permanent_app_specs()
state = deploy_missing_apps(state, permanent_app_specs)
close_leftover_apps(permanent_app_specs)
permanent_apps = Enum.filter(Apps.list_apps(), & &1.permanent)

{state, up_to_date_app_specs} = deploy_missing_apps(state, permanent_app_specs)
close_leftover_apps(permanent_apps, permanent_app_specs)

broadcast_status(permanent_app_specs, up_to_date_app_specs, permanent_apps)

state
end

defp deploy_missing_apps(state, permanent_app_specs) do
for app_spec <- permanent_app_specs,
not Map.has_key?(state.deployments, app_spec.slug),
reduce: state do
state ->
reduce: {state, []} do
{state, up_to_date_app_specs} ->
case fetch_app(app_spec.slug) do
{:ok, _state, app} when app.app_spec.version == app_spec.version ->
state
{state, [app_spec | up_to_date_app_specs]}

{:ok, :reachable, app} ->
ref = redeploy(app, app_spec)
track_deployment(state, app_spec, ref)
state = track_deployment(state, app_spec, ref)
{state, up_to_date_app_specs}

{:ok, :unreachable, _app} ->
state
{state, up_to_date_app_specs}

:error ->
ref = deploy(app_spec)
track_deployment(state, app_spec, ref)
state = track_deployment(state, app_spec, ref)
{state, up_to_date_app_specs}
end
end
end

defp close_leftover_apps(permanent_app_specs) do
defp close_leftover_apps(permanent_apps, permanent_app_specs) do
permanent_slugs = MapSet.new(permanent_app_specs, & &1.slug)

for app <- Apps.list_apps(),
app.permanent,
app.slug not in permanent_slugs do
for app <- permanent_apps, app.slug not in permanent_slugs do
Livebook.App.close_async(app.pid)
end
end
Expand All @@ -187,6 +209,32 @@ defmodule Livebook.Apps.Manager do
end
end

defp broadcast_status(permanent_app_specs, up_to_date_app_specs, permanent_apps) do
pending_app_specs = permanent_app_specs -- up_to_date_app_specs

running_app_specs = Enum.map(permanent_apps, & &1.app_spec)

# `up_to_date_app_specs` is the list of current permanent app
# specs that are already running. This information is based on
# :global and fetched directly from the processes, therefore it
# is more recent than the tracker and it may include app spec
# versions that the tracker does not know about yet. We combine
# this with information from the tracker (`running_app_specs`).
# Only one app spec may actually be running for the given slug,
# so we deduplicate, prioritizing `up_to_date_app_specs`.
running_app_specs = Enum.uniq_by(up_to_date_app_specs ++ running_app_specs, & &1.slug)

status_entries =
Enum.map(running_app_specs, &%{app_spec: &1, running?: true}) ++
Enum.map(pending_app_specs, &%{app_spec: &1, running?: false})

local_broadcast({:apps_manager_status, status_entries})
end

defp local_broadcast(message) do
Phoenix.PubSub.direct_broadcast!(node(), Livebook.PubSub, "apps_manager", message)
end

defp app_definitely_down?(slug) do
not Apps.exists?(slug) and Livebook.Tracker.fetch_app(slug) == :error
end
Expand Down
38 changes: 37 additions & 1 deletion lib/livebook/hubs/team_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ defmodule Livebook.Hubs.TeamClient do
file_systems: [],
deployment_groups: [],
app_deployments: [],
agents: []
agents: [],
app_deployment_statuses: nil
]

@type registry_name :: {:via, Registry, {Livebook.HubsRegistry, String.t()}}
Expand Down Expand Up @@ -145,6 +146,8 @@ defmodule Livebook.Hubs.TeamClient do

@impl true
def init(%Hubs.Team{offline: nil} = team) do
Livebook.Apps.Manager.subscribe()

derived_key = Teams.derive_key(team.teams_key)

headers =
Expand Down Expand Up @@ -269,6 +272,39 @@ defmodule Livebook.Hubs.TeamClient do
{:noreply, handle_event(topic, data, state)}
end

def handle_info({:apps_manager_status, status_entries}, state) do
app_deployment_statuses =
for %{
app_spec: %Livebook.Apps.TeamsAppSpec{} = app_spec,
running?: running?
} <- status_entries,
app_spec.hub_id == state.hub.id do
status = if(running?, do: :available, else: :processing)
%{version: app_spec.version, status: status}
end

# The manager can send the status list even if it didn't change,
# or it changed for non-teams app spec, so we check to send the
# event only when necessary
if app_deployment_statuses == state.app_deployment_statuses do
{:noreply, state}
else
# TODO: send this status list to Teams and set the statuses in
# the database. Note that app deployments for this deployment
# group (this agent), that are not present in this list, we
# effectively no longer know about, so we may want to reset
# their status.

# TODO: we want :version to be built on Teams server and just
# passed down to Livebook, so that Livebook does not care if
# we upsert app deployments or not. With that, we can also
# freely send the version with status here, and the server will
# recognise it.

{:noreply, %{state | app_deployment_statuses: app_deployment_statuses}}
end
end

@impl true
def handle_cast({:event, topic, data}, state) do
Logger.debug("Received event #{topic} with data: #{inspect(data)}")
Expand Down
35 changes: 35 additions & 0 deletions test/livebook/apps/manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,41 @@ defmodule Livebook.Apps.ManagerTest do
assert :global.whereis_name(Apps.Manager) == pid
end

test "sends status events about running app specs" do
slug = Livebook.Utils.random_short_id()
app_settings = %{Notebook.AppSettings.new() | slug: slug}
notebook = %{Notebook.new() | app_settings: app_settings}
app_spec = Apps.NotebookAppSpec.new(notebook)

Apps.Manager.subscribe()

Apps.set_startup_app_specs([app_spec])
Apps.Manager.sync_permanent_apps()

assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: false}]}
assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: true}]}

# Version change
app_spec_v2 = %{app_spec | version: "2"}
Apps.set_startup_app_specs([app_spec_v2])
Apps.Manager.sync_permanent_apps()

assert_receive {:apps_manager_status,
[
%{app_spec: ^app_spec, running?: true},
%{app_spec: ^app_spec_v2, running?: false}
]}

assert_receive {:apps_manager_status, [%{app_spec: ^app_spec_v2, running?: true}]}

# Restart
{:ok, app} = Apps.fetch_app(app_spec.slug)
Process.exit(app.pid, :kill)

assert_receive {:apps_manager_status, [%{app_spec: ^app_spec_v2, running?: false}]}
assert_receive {:apps_manager_status, [%{app_spec: ^app_spec_v2, running?: true}]}
end

defp path_app_spec(tmp_dir, slug) do
app_path = Path.join(tmp_dir, "app_#{slug}.livemd")

Expand Down

0 comments on commit 586c864

Please sign in to comment.