Skip to content

Commit

Permalink
Implement simple hls files uploading
Browse files Browse the repository at this point in the history
  • Loading branch information
Noarkhh committed Aug 28, 2024
1 parent e871fcf commit 57d07cb
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 15 deletions.
2 changes: 1 addition & 1 deletion examples.livemd
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ To receive the stream, visit http://localhost:1234/hls/stream.html after running
```elixir
"#{hls_out_dir}/*" |> Path.wildcard() |> Enum.each(&File.rm!/1)

Boombox.run(input: bbb_mp4, output: {:hls, hls_out_dir})
Boombox.run(input: bbb_mp4, output: "#{hls_out_dir}/index.m3u8")
```

<!-- livebook:{"branch_parent_index":0} -->
Expand Down
80 changes: 75 additions & 5 deletions lib/boombox/hls.ex
Original file line number Diff line number Diff line change
@@ -1,25 +1,97 @@
defmodule Boombox.HLS do
@moduledoc false

defmodule Uploader do
use GenServer

require Logger

alias Membrane.HTTPAdaptiveStream.Storages.GenServerStorage

@impl true
def init(config) do
{:ok, config}
end

@impl true
def handle_call(
{GenServerStorage, :store, %{context: %{type: :partial_segment}}},
_from,
state
) do
Logger.warning("LL-HLS is not supported. The partial segment is omitted.")
{:reply, :ok, state}
end

@impl true
def handle_call({GenServerStorage, :store, params}, _from, state) do
location = Path.join(state.directory, params.name)

reply =
case :hackney.request(:post, location, [], params.contents, follow_redirect: true) do
{:ok, status, _headers, _ref} when status in 200..299 ->
:ok

{:ok, status, _headers, _ref} ->
{:error, "POST failed with status code #{status}"}

error ->
error
end

{:reply, reply, state}
end

@impl true
def handle_call({GenServerStorage, :remove, params}, _from, state) do
location = Path.join(state.directory, params.name)

reply =
case :hackney.request(:delete, location, [], <<>>, follow_redirect: true) do
{:ok, status, _headers, _ref} when status in 200..299 ->
:ok

{:ok, status, _headers, _ref} ->
{:error, "DELETE failed with status code #{status}"}

error ->
error
end

{:reply, reply, state}
end
end

import Membrane.ChildrenSpec

require Membrane.Pad, as: Pad
alias Boombox.Pipeline.Ready
alias Membrane.Time
alias Membrane.{HTTPAdaptiveStream, Time}

@spec link_output(
Boombox.Pipeline.storage_type(),
Path.t(),
Boombox.Pipeline.track_builders(),
Membrane.ChildrenSpec.t()
) :: Ready.t()
def link_output(location, track_builders, spec_builder) do
def link_output(storage_type, location, track_builders, spec_builder) do
{directory, manifest_name} =
if Path.extname(location) == ".m3u8" do
{Path.dirname(location), Path.basename(location, ".m3u8")}
else
{location, "index"}
end

storage =
case storage_type do
:file ->
%HTTPAdaptiveStream.Storages.FileStorage{directory: directory}

:http ->
{:ok, uploader} = GenServer.start_link(Uploader, %{directory: directory})
%HTTPAdaptiveStream.Storages.GenServerStorage{destination: uploader}
end

hls_mode =
if Map.keys(track_builders) == [:video], do: :separate_av, else: :muxed_av

Expand All @@ -31,9 +103,7 @@ defmodule Boombox.HLS do
%Membrane.HTTPAdaptiveStream.SinkBin{
manifest_name: manifest_name,
manifest_module: Membrane.HTTPAdaptiveStream.HLS,
storage: %Membrane.HTTPAdaptiveStream.Storages.FileStorage{
directory: directory
},
storage: storage,
hls_mode: hls_mode
}
),
Expand Down
18 changes: 9 additions & 9 deletions lib/boombox/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ defmodule Boombox.Pipeline do

require Membrane.Logger

@supported_file_extensions %{".mp4" => :mp4, ".m3u8" => :m3u8}
@supported_file_extensions %{".mp4" => :mp4, ".m3u8" => :hls}

@type track_builders :: %{
optional(:audio) => Membrane.ChildrenSpec.t(),
Expand Down Expand Up @@ -308,8 +308,8 @@ defmodule Boombox.Pipeline do
Boombox.MP4.link_output(location, track_builders, spec_builder)
end

defp link_output({:hls, location}, track_builders, spec_builder, _ctx) do
Boombox.HLS.link_output(location, track_builders, spec_builder)
defp link_output({storage_type, :hls, location}, track_builders, spec_builder, _ctx) do
Boombox.HLS.link_output(storage_type, location, track_builders, spec_builder)
end

defp parse_input(input) when is_binary(input) do
Expand Down Expand Up @@ -346,10 +346,10 @@ defmodule Boombox.Pipeline do

case uri do
%URI{scheme: nil, path: path} when path != nil ->
case parse_file_extension(path) do
:m3u8 -> {:hls, path}
file_type -> {:file, file_type, path}
end
{:file, parse_file_extension(path), path}

%URI{scheme: scheme, path: path} when scheme in ["http", "https"] and path != nil ->
{:http, parse_file_extension(path), output}

_other ->
raise "Unsupported URI: #{output}"
Expand All @@ -360,12 +360,12 @@ defmodule Boombox.Pipeline do
output
end

@spec parse_file_extension(Path.t()) :: Boombox.file_extension() | :m3u8
@spec parse_file_extension(Path.t()) :: Boombox.file_extension() | :hls
defp parse_file_extension(path) do
extension = Path.extname(path)

case @supported_file_extensions do
%{^extension => file_type} -> file_type
%{^extension => format} -> format
_no_match -> raise "Unsupported file extension: #{extension}"
end
end
Expand Down

0 comments on commit 57d07cb

Please sign in to comment.