diff --git a/lib/jellyfish/application.ex b/lib/jellyfish/application.ex index c55fc00d..8e24612b 100644 --- a/lib/jellyfish/application.ex +++ b/lib/jellyfish/application.ex @@ -7,6 +7,9 @@ defmodule Jellyfish.Application do require Logger + # seconds + @resource_manager_opts %{interval: 600, recording_timeout: 3_600} + @impl true def start(_type, _args) do scrape_interval = Application.fetch_env!(:jellyfish, :webrtc_metrics_scrape_interval) @@ -30,6 +33,8 @@ defmodule Jellyfish.Application do JellyfishWeb.Endpoint, # Start the RoomService Jellyfish.RoomService, + # Start the ResourceManager, responsible for cleaning old recordings + {Jellyfish.ResourceManager, @resource_manager_opts}, Jellyfish.WebhookNotifier, {Registry, keys: :unique, name: Jellyfish.RoomRegistry}, {Registry, keys: :unique, name: Jellyfish.RequestHandlerRegistry}, diff --git a/lib/jellyfish/component/recording.ex b/lib/jellyfish/component/recording.ex index 712a729f..d6d1a6bc 100644 --- a/lib/jellyfish/component/recording.ex +++ b/lib/jellyfish/component/recording.ex @@ -58,6 +58,9 @@ defmodule Jellyfish.Component.Recording do end end + def get_base_path(), + do: :jellyfish |> Application.fetch_env!(:media_files_path) |> Path.join("raw_recordings") + defp parse_subscribe_mode(opts) do Map.update!(opts, :subscribe_mode, &String.to_atom/1) end @@ -79,7 +82,4 @@ defmodule Jellyfish.Component.Recording do _else -> {:error, :overridding_path_prefix} end end - - defp get_base_path(), - do: :jellyfish |> Application.fetch_env!(:media_files_path) |> Path.join("raw_recordings") end diff --git a/lib/jellyfish/resource_manager.ex b/lib/jellyfish/resource_manager.ex new file mode 100644 index 00000000..5e5e0f1f --- /dev/null +++ b/lib/jellyfish/resource_manager.ex @@ -0,0 +1,102 @@ +defmodule Jellyfish.ResourceManager do + @moduledoc """ + Module responsible for deleting outdated resources. + Right now it only removes outdated resources created by recording component. + """ + + use GenServer, restart: :permanent + + require Logger + + alias Jellyfish.Component.Recording + alias Jellyfish.RoomService + + @type seconds :: pos_integer() + @type opts :: %{interval: seconds(), recording_timeout: seconds()} + + @spec start(opts()) :: {:ok, pid()} | {:error, term()} + def start(opts), do: GenServer.start(__MODULE__, opts) + + @spec start_link(opts()) :: GenServer.on_start() + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + @impl true + def init(opts) do + Logger.debug("Initialize resource manager") + + schedule_free_resources(opts.interval) + + {:ok, opts} + end + + @impl true + def handle_info(:free_resources, state) do + base_path = Recording.get_base_path() + current_time = System.system_time(:second) + + rooms_list = File.ls!(base_path) + + recordings_list = + rooms_list + |> Enum.map(fn room -> + room_path = Path.join(base_path, room) + + room_path + |> File.ls!() + |> Enum.map(fn recording -> {room, Path.join(room_path, recording)} end) + end) + |> Enum.concat() + + Enum.each( + recordings_list, + &remove_recording_if_obsolete(current_time, state.recording_timeout, &1) + ) + + Enum.each(rooms_list, &remove_room_if_obsolete(&1, base_path)) + + schedule_free_resources(state.interval) + + {:noreply, state} + end + + defp schedule_free_resources(interval), + do: Process.send_after(self(), :free_resources, :timer.seconds(interval)) + + defp remove_recording_if_obsolete(current_time, recording_timeout, {room, recording_path}) do + with {:error, :room_not_found} <- RoomService.find_room(room) do + case File.ls!(recording_path) do + [] -> + File.rm_rf!(recording_path) + + files -> + # select the most recently modified file + %{mtime: mtime} = + files + |> Enum.map(fn file -> + recording_path |> Path.join(file) |> File.lstat!(time: :posix) + end) + |> Enum.sort_by(fn stats -> stats.mtime end, :desc) + |> List.first() + + should_remove_recording?(current_time, mtime, recording_timeout) && + File.rm_rf!(recording_path) + end + end + end + + defp remove_room_if_obsolete(room_id, base_path) do + state_of_room = RoomService.find_room(room_id) + room_path = Path.join(base_path, room_id) + content = File.ls!(room_path) + + if should_remove_room?(content, state_of_room), do: File.rmdir!(room_path) + end + + defp should_remove_room?([], {:error, :room_not_found}), do: true + defp should_remove_room?(_content, _state_of_room), do: false + + defp should_remove_recording?(current_time, mtime, recording_timeout), + do: current_time - mtime > recording_timeout +end diff --git a/test/jellyfish/resource_manager_test.exs b/test/jellyfish/resource_manager_test.exs new file mode 100644 index 00000000..d5f760ab --- /dev/null +++ b/test/jellyfish/resource_manager_test.exs @@ -0,0 +1,99 @@ +defmodule Jellyfish.ResourceManagerTest do + use JellyfishWeb.ComponentCase, async: true + + alias Jellyfish.Component.Recording + alias Jellyfish.ResourceManager + + @hour 3_600 + + setup do + base_path = Recording.get_base_path() + {:ok, pid} = ResourceManager.start(%{interval: 1, recording_timeout: @hour}) + + on_exit(fn -> Process.exit(pid, :force) end) + + %{base_path: base_path} + end + + test "room directory removal", %{room_id: room_id, base_path: base_path} do + case_1 = Path.join([base_path, room_id]) + case_2 = Path.join([base_path, "not_existing_room_1"]) + case_3 = Path.join([base_path, "not_existing_room_2", "part_1"]) + + File.mkdir_p!(case_1) + File.mkdir_p!(case_2) + File.mkdir_p!(case_3) + + case_3 |> Path.join("report.json") |> File.touch() + + # Wait double the interval + Process.sleep(2_000) + + # doesn't remove recordings if room exists + assert {:ok, []} = File.ls(case_1) + + # removes empty recordings if room doesn't exists + assert {:error, :enoent} = File.ls(case_2) + + # doesn't remove recordings including parts + assert {:ok, _} = File.ls(case_3) + + clean_recordings([ + room_id, + "not_existing_room_1", + "not_existing_room_2" + ]) + end + + test "recording part directory removal", %{room_id: room_id, base_path: base_path} do + case_1 = Path.join([base_path, room_id, "part_1"]) + case_2 = Path.join([base_path, "not_existing_room_4", "part_1"]) + + File.mkdir_p!(case_1) + File.mkdir_p!(case_2) + + # Wait double the interval + Process.sleep(2_000) + + # doesn't remove empty part if room exists + assert {:ok, []} = File.ls(case_1) + + # removes empty part if room doesn't exists + assert {:error, :enoent} = File.ls(case_2) + + clean_recordings([room_id, "not_existing_room_4"]) + end + + test "recording files removal", %{room_id: room_id, base_path: base_path} do + case_1 = Path.join([base_path, room_id, "part_1"]) + case_2 = Path.join([base_path, "not_existing_room_5", "part_1"]) + case_3 = Path.join([base_path, "not_existing_room_5", "part_2"]) + + File.mkdir_p!(case_1) + File.mkdir_p!(case_2) + File.mkdir_p!(case_3) + + # modify creation time + case_1 |> Path.join("report.json") |> File.touch!(System.os_time(:second) - 2 * @hour) + case_2 |> Path.join("report.json") |> File.touch!(System.os_time(:second) - 2 * @hour) + case_3 |> Path.join("report.json") |> File.touch!(System.os_time(:second)) + + # Wait double the interval + Process.sleep(2_000) + + # doesn't remove recording if room exists + assert {:ok, ["report.json"]} = File.ls(case_1) + + # removes recording if exceeds timeout and room doesn't exist + assert {:error, :enoent} = File.ls(case_2) + + # doesn't remove recording if doesn't exceed timeout + assert {:ok, ["report.json"]} = File.ls(case_3) + + clean_recordings([room_id, "not_existing_room_5"]) + end + + defp clean_recordings(dirs) do + Enum.each(dirs, fn dir -> Recording.get_base_path() |> Path.join(dir) |> File.rm_rf!() end) + end +end