diff --git a/examples.livemd b/examples.livemd index a4c14a3..ac2cece 100644 --- a/examples.livemd +++ b/examples.livemd @@ -89,7 +89,7 @@ To receive the stream, visit http://localhost:1234/hls/stream.html after running ```elixir "#{hls_out_dir}/*" |> Path.wildcard() |> Enum.each(&File.rm!/1) -Boombox.run(input: bbb_mp4, output: {:hls, hls_out_dir}) +Boombox.run(input: bbb_mp4, output: "#{hls_out_dir}/index.m3u8") ``` diff --git a/lib/boombox/hls.ex b/lib/boombox/hls.ex index add50e7..c0f513a 100644 --- a/lib/boombox/hls.ex +++ b/lib/boombox/hls.ex @@ -1,18 +1,80 @@ defmodule Boombox.HLS do @moduledoc false + defmodule Uploader do + use GenServer + + require Logger + + alias Membrane.HTTPAdaptiveStream.Storages.GenServerStorage + + @impl true + def init(config) do + {:ok, config} + end + + @impl true + def handle_call( + {GenServerStorage, :store, %{context: %{type: :partial_segment}}}, + _from, + state + ) do + Logger.warning("LL-HLS is not supported. The partial segment is omitted.") + {:reply, :ok, state} + end + + @impl true + def handle_call({GenServerStorage, :store, params}, _from, state) do + location = Path.join(state.directory, params.name) + + reply = + case :hackney.request(:post, location, [], params.contents, follow_redirect: true) do + {:ok, status, _headers, _ref} when status in 200..299 -> + :ok + + {:ok, status, _headers, _ref} -> + {:error, "POST failed with status code #{status}"} + + error -> + error + end + + {:reply, reply, state} + end + + @impl true + def handle_call({GenServerStorage, :remove, params}, _from, state) do + location = Path.join(state.directory, params.name) + + reply = + case :hackney.request(:delete, location, [], <<>>, follow_redirect: true) do + {:ok, status, _headers, _ref} when status in 200..299 -> + :ok + + {:ok, status, _headers, _ref} -> + {:error, "DELETE failed with status code #{status}"} + + error -> + error + end + + {:reply, reply, state} + end + end + import Membrane.ChildrenSpec require Membrane.Pad, as: Pad alias Boombox.Pipeline.Ready - alias Membrane.Time + alias Membrane.{HTTPAdaptiveStream, Time} @spec link_output( + Boombox.Pipeline.storage_type(), Path.t(), Boombox.Pipeline.track_builders(), Membrane.ChildrenSpec.t() ) :: Ready.t() - def link_output(location, track_builders, spec_builder) do + def link_output(storage_type, location, track_builders, spec_builder) do {directory, manifest_name} = if Path.extname(location) == ".m3u8" do {Path.dirname(location), Path.basename(location, ".m3u8")} @@ -20,6 +82,16 @@ defmodule Boombox.HLS do {location, "index"} end + storage = + case storage_type do + :file -> + %HTTPAdaptiveStream.Storages.FileStorage{directory: directory} + + :http -> + {:ok, uploader} = GenServer.start_link(Uploader, %{directory: directory}) + %HTTPAdaptiveStream.Storages.GenServerStorage{destination: uploader} + end + hls_mode = if Map.keys(track_builders) == [:video], do: :separate_av, else: :muxed_av @@ -31,9 +103,7 @@ defmodule Boombox.HLS do %Membrane.HTTPAdaptiveStream.SinkBin{ manifest_name: manifest_name, manifest_module: Membrane.HTTPAdaptiveStream.HLS, - storage: %Membrane.HTTPAdaptiveStream.Storages.FileStorage{ - directory: directory - }, + storage: storage, hls_mode: hls_mode } ), diff --git a/lib/boombox/pipeline.ex b/lib/boombox/pipeline.ex index 839802c..d529336 100644 --- a/lib/boombox/pipeline.ex +++ b/lib/boombox/pipeline.ex @@ -24,7 +24,7 @@ defmodule Boombox.Pipeline do require Membrane.Logger - @supported_file_extensions %{".mp4" => :mp4, ".m3u8" => :m3u8} + @supported_file_extensions %{".mp4" => :mp4, ".m3u8" => :hls} @type track_builders :: %{ optional(:audio) => Membrane.ChildrenSpec.t(), @@ -308,8 +308,8 @@ defmodule Boombox.Pipeline do Boombox.MP4.link_output(location, track_builders, spec_builder) end - defp link_output({:hls, location}, track_builders, spec_builder, _ctx) do - Boombox.HLS.link_output(location, track_builders, spec_builder) + defp link_output({storage_type, :hls, location}, track_builders, spec_builder, _ctx) do + Boombox.HLS.link_output(storage_type, location, track_builders, spec_builder) end defp parse_input(input) when is_binary(input) do @@ -346,10 +346,10 @@ defmodule Boombox.Pipeline do case uri do %URI{scheme: nil, path: path} when path != nil -> - case parse_file_extension(path) do - :m3u8 -> {:hls, path} - file_type -> {:file, file_type, path} - end + {:file, parse_file_extension(path), path} + + %URI{scheme: scheme, path: path} when scheme in ["http", "https"] and path != nil -> + {:http, parse_file_extension(path), output} _other -> raise "Unsupported URI: #{output}" @@ -360,12 +360,12 @@ defmodule Boombox.Pipeline do output end - @spec parse_file_extension(Path.t()) :: Boombox.file_extension() | :m3u8 + @spec parse_file_extension(Path.t()) :: Boombox.file_extension() | :hls defp parse_file_extension(path) do extension = Path.extname(path) case @supported_file_extensions do - %{^extension => file_type} -> file_type + %{^extension => format} -> format _no_match -> raise "Unsupported file extension: #{extension}" end end