diff --git a/lib/livebook/apps/manager.ex b/lib/livebook/apps/manager.ex index 5fa61bd4b24..065651b49b1 100644 --- a/lib/livebook/apps/manager.ex +++ b/lib/livebook/apps/manager.ex @@ -50,6 +50,23 @@ defmodule Livebook.Apps.Manager do GenServer.cast({:global, @name}, :sync_permanent_apps) end + @doc """ + Subscribes to manager reports. + + The messages are only sent within the node that the manager runs on. + + ## Messages + + * `{:apps_manager_status, status_entries}` - reports which permanent + app specs are running, and which are pending. Note that in some + cases the status may be sent, even if the entries do not change + + """ + @spec subscribe() :: :ok | {:error, term()} + def subscribe() do + Phoenix.PubSub.subscribe(Livebook.PubSub, "apps_manager") + end + @impl true def init({}) do Apps.subscribe() @@ -132,40 +149,45 @@ defmodule Livebook.Apps.Manager do defp sync_apps(state) do permanent_app_specs = Apps.get_permanent_app_specs() - state = deploy_missing_apps(state, permanent_app_specs) - close_leftover_apps(permanent_app_specs) + permanent_apps = Enum.filter(Apps.list_apps(), & &1.permanent) + + {state, up_to_date_app_specs} = deploy_missing_apps(state, permanent_app_specs) + close_leftover_apps(permanent_apps, permanent_app_specs) + + broadcast_status(permanent_app_specs, up_to_date_app_specs, permanent_apps) + state end defp deploy_missing_apps(state, permanent_app_specs) do for app_spec <- permanent_app_specs, not Map.has_key?(state.deployments, app_spec.slug), - reduce: state do - state -> + reduce: {state, []} do + {state, up_to_date_app_specs} -> case fetch_app(app_spec.slug) do {:ok, _state, app} when app.app_spec.version == app_spec.version -> - state + {state, [app_spec | up_to_date_app_specs]} {:ok, :reachable, app} -> ref = redeploy(app, app_spec) - track_deployment(state, app_spec, ref) + state = track_deployment(state, app_spec, ref) + {state, up_to_date_app_specs} {:ok, :unreachable, _app} -> - state + {state, up_to_date_app_specs} :error -> ref = deploy(app_spec) - track_deployment(state, app_spec, ref) + state = track_deployment(state, app_spec, ref) + {state, up_to_date_app_specs} end end end - defp close_leftover_apps(permanent_app_specs) do + defp close_leftover_apps(permanent_apps, permanent_app_specs) do permanent_slugs = MapSet.new(permanent_app_specs, & &1.slug) - for app <- Apps.list_apps(), - app.permanent, - app.slug not in permanent_slugs do + for app <- permanent_apps, app.slug not in permanent_slugs do Livebook.App.close_async(app.pid) end end @@ -187,6 +209,32 @@ defmodule Livebook.Apps.Manager do end end + defp broadcast_status(permanent_app_specs, up_to_date_app_specs, permanent_apps) do + pending_app_specs = permanent_app_specs -- up_to_date_app_specs + + running_app_specs = Enum.map(permanent_apps, & &1.app_spec) + + # `up_to_date_app_specs` is the list of current permanent app + # specs that are already running. This information is based on + # :global and fetched directly from the processes, therefore it + # is more recent than the tracker and it may include app spec + # versions that the tracker does not know about yet. We combine + # this with information from the tracker (`running_app_specs`). + # Only one app spec may actually be running for the given slug, + # so we deduplicate, prioritizing `up_to_date_app_specs`. + running_app_specs = Enum.uniq_by(up_to_date_app_specs ++ running_app_specs, & &1.slug) + + status_entries = + Enum.map(running_app_specs, &%{app_spec: &1, running?: true}) ++ + Enum.map(pending_app_specs, &%{app_spec: &1, running?: false}) + + local_broadcast({:apps_manager_status, status_entries}) + end + + defp local_broadcast(message) do + Phoenix.PubSub.direct_broadcast!(node(), Livebook.PubSub, "apps_manager", message) + end + defp app_definitely_down?(slug) do not Apps.exists?(slug) and Livebook.Tracker.fetch_app(slug) == :error end diff --git a/lib/livebook/hubs/team_client.ex b/lib/livebook/hubs/team_client.ex index fc4044af01e..8ebf472c5df 100644 --- a/lib/livebook/hubs/team_client.ex +++ b/lib/livebook/hubs/team_client.ex @@ -21,7 +21,8 @@ defmodule Livebook.Hubs.TeamClient do file_systems: [], deployment_groups: [], app_deployments: [], - agents: [] + agents: [], + app_deployment_statuses: nil ] @type registry_name :: {:via, Registry, {Livebook.HubsRegistry, String.t()}} @@ -145,6 +146,8 @@ defmodule Livebook.Hubs.TeamClient do @impl true def init(%Hubs.Team{offline: nil} = team) do + Livebook.Apps.Manager.subscribe() + derived_key = Teams.derive_key(team.teams_key) headers = @@ -269,6 +272,39 @@ defmodule Livebook.Hubs.TeamClient do {:noreply, handle_event(topic, data, state)} end + def handle_info({:apps_manager_status, status_entries}, state) do + app_deployment_statuses = + for %{ + app_spec: %Livebook.Apps.TeamsAppSpec{} = app_spec, + running?: running? + } <- status_entries, + app_spec.hub_id == state.hub.id do + status = if(running?, do: :available, else: :processing) + %{version: app_spec.version, status: status} + end + + # The manager can send the status list even if it didn't change, + # or it changed for non-teams app spec, so we check to send the + # event only when necessary + if app_deployment_statuses == state.app_deployment_statuses do + {:noreply, state} + else + # TODO: send this status list to Teams and set the statuses in + # the database. Note that app deployments for this deployment + # group (this agent), that are not present in this list, we + # effectively no longer know about, so we may want to reset + # their status. + + # TODO: we want :version to be built on Teams server and just + # passed down to Livebook, so that Livebook does not care if + # we upsert app deployments or not. With that, we can also + # freely send the version with status here, and the server will + # recognise it. + + {:noreply, %{state | app_deployment_statuses: app_deployment_statuses}} + end + end + @impl true def handle_cast({:event, topic, data}, state) do Logger.debug("Received event #{topic} with data: #{inspect(data)}") diff --git a/test/livebook/apps/manager_test.exs b/test/livebook/apps/manager_test.exs index e103aef1af2..bf0499f2907 100644 --- a/test/livebook/apps/manager_test.exs +++ b/test/livebook/apps/manager_test.exs @@ -104,6 +104,41 @@ defmodule Livebook.Apps.ManagerTest do assert :global.whereis_name(Apps.Manager) == pid end + test "sends status events about running app specs" do + slug = Livebook.Utils.random_short_id() + app_settings = %{Notebook.AppSettings.new() | slug: slug} + notebook = %{Notebook.new() | app_settings: app_settings} + app_spec = Apps.NotebookAppSpec.new(notebook) + + Apps.Manager.subscribe() + + Apps.set_startup_app_specs([app_spec]) + Apps.Manager.sync_permanent_apps() + + assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: false}]} + assert_receive {:apps_manager_status, [%{app_spec: ^app_spec, running?: true}]} + + # Version change + app_spec_v2 = %{app_spec | version: "2"} + Apps.set_startup_app_specs([app_spec_v2]) + Apps.Manager.sync_permanent_apps() + + assert_receive {:apps_manager_status, + [ + %{app_spec: ^app_spec, running?: true}, + %{app_spec: ^app_spec_v2, running?: false} + ]} + + assert_receive {:apps_manager_status, [%{app_spec: ^app_spec_v2, running?: true}]} + + # Restart + {:ok, app} = Apps.fetch_app(app_spec.slug) + Process.exit(app.pid, :kill) + + assert_receive {:apps_manager_status, [%{app_spec: ^app_spec_v2, running?: false}]} + assert_receive {:apps_manager_status, [%{app_spec: ^app_spec_v2, running?: true}]} + end + defp path_app_spec(tmp_dir, slug) do app_path = Path.join(tmp_dir, "app_#{slug}.livemd")