Skip to content

Commit

Permalink
Add ResourceManager that will remove raw recordings that exceeds time…
Browse files Browse the repository at this point in the history
…out (#175)

* Add ResourceManager that will remove raw recordings that exceeds timeout

* Requested changes

* Requested changes

* Adjust ResourceManager to changes due to rebase

* Fix tests

* Fix credo warning
  • Loading branch information
Karolk99 authored Apr 11, 2024
1 parent b6b5420 commit cd8bb9b
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 3 deletions.
5 changes: 5 additions & 0 deletions lib/jellyfish/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ defmodule Jellyfish.Application do

require Logger

# seconds
@resource_manager_opts %{interval: 600, recording_timeout: 3_600}

@impl true
def start(_type, _args) do
scrape_interval = Application.fetch_env!(:jellyfish, :webrtc_metrics_scrape_interval)
Expand All @@ -30,6 +33,8 @@ defmodule Jellyfish.Application do
JellyfishWeb.Endpoint,
# Start the RoomService
Jellyfish.RoomService,
# Start the ResourceManager, responsible for cleaning old recordings
{Jellyfish.ResourceManager, @resource_manager_opts},
Jellyfish.WebhookNotifier,
{Registry, keys: :unique, name: Jellyfish.RoomRegistry},
{Registry, keys: :unique, name: Jellyfish.RequestHandlerRegistry},
Expand Down
6 changes: 3 additions & 3 deletions lib/jellyfish/component/recording.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ defmodule Jellyfish.Component.Recording do
end
end

def get_base_path(),
do: :jellyfish |> Application.fetch_env!(:media_files_path) |> Path.join("raw_recordings")

defp parse_subscribe_mode(opts) do
Map.update!(opts, :subscribe_mode, &String.to_atom/1)
end
Expand All @@ -79,7 +82,4 @@ defmodule Jellyfish.Component.Recording do
_else -> {:error, :overridding_path_prefix}
end
end

defp get_base_path(),
do: :jellyfish |> Application.fetch_env!(:media_files_path) |> Path.join("raw_recordings")
end
102 changes: 102 additions & 0 deletions lib/jellyfish/resource_manager.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
defmodule Jellyfish.ResourceManager do
@moduledoc """
Module responsible for deleting outdated resources.
Right now it only removes outdated resources created by recording component.
"""

use GenServer, restart: :permanent

require Logger

alias Jellyfish.Component.Recording
alias Jellyfish.RoomService

@type seconds :: pos_integer()
@type opts :: %{interval: seconds(), recording_timeout: seconds()}

@spec start(opts()) :: {:ok, pid()} | {:error, term()}
def start(opts), do: GenServer.start(__MODULE__, opts)

@spec start_link(opts()) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@impl true
def init(opts) do
Logger.debug("Initialize resource manager")

schedule_free_resources(opts.interval)

{:ok, opts}
end

@impl true
def handle_info(:free_resources, state) do
base_path = Recording.get_base_path()
current_time = System.system_time(:second)

rooms_list = File.ls!(base_path)

recordings_list =
rooms_list
|> Enum.map(fn room ->
room_path = Path.join(base_path, room)

room_path
|> File.ls!()
|> Enum.map(fn recording -> {room, Path.join(room_path, recording)} end)
end)
|> Enum.concat()

Enum.each(
recordings_list,
&remove_recording_if_obsolete(current_time, state.recording_timeout, &1)
)

Enum.each(rooms_list, &remove_room_if_obsolete(&1, base_path))

schedule_free_resources(state.interval)

{:noreply, state}
end

defp schedule_free_resources(interval),
do: Process.send_after(self(), :free_resources, :timer.seconds(interval))

defp remove_recording_if_obsolete(current_time, recording_timeout, {room, recording_path}) do
with {:error, :room_not_found} <- RoomService.find_room(room) do
case File.ls!(recording_path) do
[] ->
File.rm_rf!(recording_path)

files ->
# select the most recently modified file
%{mtime: mtime} =
files
|> Enum.map(fn file ->
recording_path |> Path.join(file) |> File.lstat!(time: :posix)
end)
|> Enum.sort_by(fn stats -> stats.mtime end, :desc)
|> List.first()

should_remove_recording?(current_time, mtime, recording_timeout) &&
File.rm_rf!(recording_path)
end
end
end

defp remove_room_if_obsolete(room_id, base_path) do
state_of_room = RoomService.find_room(room_id)
room_path = Path.join(base_path, room_id)
content = File.ls!(room_path)

if should_remove_room?(content, state_of_room), do: File.rmdir!(room_path)
end

defp should_remove_room?([], {:error, :room_not_found}), do: true
defp should_remove_room?(_content, _state_of_room), do: false

defp should_remove_recording?(current_time, mtime, recording_timeout),
do: current_time - mtime > recording_timeout
end
99 changes: 99 additions & 0 deletions test/jellyfish/resource_manager_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
defmodule Jellyfish.ResourceManagerTest do
use JellyfishWeb.ComponentCase, async: true

alias Jellyfish.Component.Recording
alias Jellyfish.ResourceManager

@hour 3_600

setup do
base_path = Recording.get_base_path()
{:ok, pid} = ResourceManager.start(%{interval: 1, recording_timeout: @hour})

on_exit(fn -> Process.exit(pid, :force) end)

%{base_path: base_path}
end

test "room directory removal", %{room_id: room_id, base_path: base_path} do
case_1 = Path.join([base_path, room_id])
case_2 = Path.join([base_path, "not_existing_room_1"])
case_3 = Path.join([base_path, "not_existing_room_2", "part_1"])

File.mkdir_p!(case_1)
File.mkdir_p!(case_2)
File.mkdir_p!(case_3)

case_3 |> Path.join("report.json") |> File.touch()

# Wait double the interval
Process.sleep(2_000)

# doesn't remove recordings if room exists
assert {:ok, []} = File.ls(case_1)

# removes empty recordings if room doesn't exists
assert {:error, :enoent} = File.ls(case_2)

# doesn't remove recordings including parts
assert {:ok, _} = File.ls(case_3)

clean_recordings([
room_id,
"not_existing_room_1",
"not_existing_room_2"
])
end

test "recording part directory removal", %{room_id: room_id, base_path: base_path} do
case_1 = Path.join([base_path, room_id, "part_1"])
case_2 = Path.join([base_path, "not_existing_room_4", "part_1"])

File.mkdir_p!(case_1)
File.mkdir_p!(case_2)

# Wait double the interval
Process.sleep(2_000)

# doesn't remove empty part if room exists
assert {:ok, []} = File.ls(case_1)

# removes empty part if room doesn't exists
assert {:error, :enoent} = File.ls(case_2)

clean_recordings([room_id, "not_existing_room_4"])
end

test "recording files removal", %{room_id: room_id, base_path: base_path} do
case_1 = Path.join([base_path, room_id, "part_1"])
case_2 = Path.join([base_path, "not_existing_room_5", "part_1"])
case_3 = Path.join([base_path, "not_existing_room_5", "part_2"])

File.mkdir_p!(case_1)
File.mkdir_p!(case_2)
File.mkdir_p!(case_3)

# modify creation time
case_1 |> Path.join("report.json") |> File.touch!(System.os_time(:second) - 2 * @hour)
case_2 |> Path.join("report.json") |> File.touch!(System.os_time(:second) - 2 * @hour)
case_3 |> Path.join("report.json") |> File.touch!(System.os_time(:second))

# Wait double the interval
Process.sleep(2_000)

# doesn't remove recording if room exists
assert {:ok, ["report.json"]} = File.ls(case_1)

# removes recording if exceeds timeout and room doesn't exist
assert {:error, :enoent} = File.ls(case_2)

# doesn't remove recording if doesn't exceed timeout
assert {:ok, ["report.json"]} = File.ls(case_3)

clean_recordings([room_id, "not_existing_room_5"])
end

defp clean_recordings(dirs) do
Enum.each(dirs, fn dir -> Recording.get_base_path() |> Path.join(dir) |> File.rm_rf!() end)
end
end

0 comments on commit cd8bb9b

Please sign in to comment.