From ba8e03af35d2c1f1550f1dd4a5dba564f14fac1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Tue, 13 Aug 2024 11:08:45 +0200 Subject: [PATCH] Refactor RTMP namespace (#96) * Allow for urls without app and stream key * Unify Server.ClientHandler and Server.ClientHandlerBehaviour * Rename new_client_callback into handle_new_client * Rename RTMP.Server into RTMPServer * Bump to v0.24.0 --- README.md | 4 +- examples/source_with_standalone_server.exs | 8 +-- lib/membrane_rtmp_plugin/rtmp/avc/utils.ex | 9 --- ...andler.ex => client_handler_for_source.ex} | 12 +--- .../rtmp/source/source.ex | 22 +++---- .../rtmp/source/source_bin.ex | 4 +- lib/membrane_rtmp_plugin/rtmp_server.ex | 46 +++++++++++--- .../rtmp_server/client_handler.ex | 63 ++++++++++++++++--- .../rtmp_server/client_handler_behaviour.ex | 52 --------------- .../rtmp_server/listener.ex | 6 +- lib/membrane_rtmp_plugin/utils.ex | 28 --------- mix.exs | 2 +- .../rtmp_source_bin_test.exs | 10 +-- 13 files changed, 122 insertions(+), 144 deletions(-) delete mode 100644 lib/membrane_rtmp_plugin/rtmp/avc/utils.ex rename lib/membrane_rtmp_plugin/rtmp/source/{client_handler.ex => client_handler_for_source.ex} (80%) delete mode 100644 lib/membrane_rtmp_plugin/rtmp_server/client_handler_behaviour.ex delete mode 100644 lib/membrane_rtmp_plugin/utils.ex diff --git a/README.md b/README.md index 86255253..11060987 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ The package can be installed by adding `membrane_rtmp_plugin` to your list of de ```elixir def deps do [ - {:membrane_rtmp_plugin, "~> 0.23.4"} + {:membrane_rtmp_plugin, "~> 0.24.0"} ] end ``` @@ -83,7 +83,7 @@ When the script terminates, the `testsrc` content should be available in the `re ### RTMP receive with standalone RTMP server -If you want to see how you could setup the `Membrane.RTMP.Server` on your own and use it +If you want to see how you could setup the `Membrane.RTMPServer` on your own and use it with cooperation with the `Membane.RTMP.SourceBin`, take a look at [`examples/source_with_standalone_server.exs`](examples/source_with_standalone_server.exs) Run it with: diff --git a/examples/source_with_standalone_server.exs b/examples/source_with_standalone_server.exs index 4cef8924..1f22a78e 100644 --- a/examples/source_with_standalone_server.exs +++ b/examples/source_with_standalone_server.exs @@ -51,17 +51,17 @@ port = 1935 # example lambda function that upon launching will send client reference back to parent process. parent_process_pid = self() -new_client_callback = fn client_ref, app, stream_key -> +handle_new_client = fn client_ref, app, stream_key -> send(parent_process_pid, {:client_ref, client_ref, app, stream_key}) end # Run the standalone server {:ok, server} = - Membrane.RTMP.Server.start_link( - handler: %Membrane.RTMP.Source.ClientHandler{controlling_process: self()}, + Membrane.RTMPServer.start_link( + handler: %Membrane.RTMP.Source.ClientHandlerImpl{controlling_process: self()}, port: port, use_ssl?: false, - new_client_callback: new_client_callback, + handle_new_client: handle_new_client, client_timeout: 5_000 ) diff --git a/lib/membrane_rtmp_plugin/rtmp/avc/utils.ex b/lib/membrane_rtmp_plugin/rtmp/avc/utils.ex deleted file mode 100644 index 4c4b6aa6..00000000 --- a/lib/membrane_rtmp_plugin/rtmp/avc/utils.ex +++ /dev/null @@ -1,9 +0,0 @@ -defmodule Membrane.RTMP.AVC.Utils do - @moduledoc false - - @spec to_annex_b(binary()) :: binary() - def to_annex_b(<>), - do: <<0, 0, 1>> <> data <> to_annex_b(rest) - - def to_annex_b(_otherwise), do: <<>> -end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp/source/client_handler_for_source.ex similarity index 80% rename from lib/membrane_rtmp_plugin/rtmp/source/client_handler.ex rename to lib/membrane_rtmp_plugin/rtmp/source/client_handler_for_source.ex index 34be1419..6039e115 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/client_handler_for_source.ex @@ -1,10 +1,10 @@ -defmodule Membrane.RTMP.Source.ClientHandler do +defmodule Membrane.RTMP.Source.ClientHandlerImpl do @moduledoc """ - An implementation of `Membrane.RTMP.Server.ClienHandlerBehaviour` compatible with the + An implementation of `Membrane.RTMPServer.ClienHandlerBehaviour` compatible with the `Membrane.RTMP.Source` element. """ - @behaviour Membrane.RTMP.Server.ClientHandlerBehaviour + @behaviour Membrane.RTMPServer.ClientHandler defstruct [:controlling_process] @@ -67,10 +67,4 @@ defmodule Membrane.RTMP.Source.ClientHandler do send(pid, :end_of_stream) :ok end - - @spec request_for_data(pid()) :: :ok - def request_for_data(client_reference) do - send(client_reference, {:send_me_data, self()}) - :ok - end end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/source.ex b/lib/membrane_rtmp_plugin/rtmp/source/source.ex index a51a39cc..41881fd8 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/source.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/source.ex @@ -8,14 +8,12 @@ defmodule Membrane.RTMP.Source do connect on this URL, the source won't complete its setup. Note that all attempted connections to other `app` or `stream_key` than specified ones will be rejected. - * by spawning `Membrane.RTMP.Server`, receiving a client reference and passing it to the `#{inspect(__MODULE__)}`. + * by spawning `Membrane.RTMPServer`, receiving a client reference and passing it to the `#{inspect(__MODULE__)}`. """ use Membrane.Source require Membrane.Logger require Logger - alias __MODULE__.ClientHandler, as: SourceClientHandler - alias Membrane.RTMP.Server.ClientHandler - alias Membrane.RTMP.Utils + alias Membrane.RTMPServer.ClientHandler def_output_pad :output, availability: :always, @@ -28,7 +26,7 @@ defmodule Membrane.RTMP.Source do spec: pid(), description: """ A pid of a process acting as a client reference. - Can be gained with the use of `Membrane.RTMP.Server`. + Can be gained with the use of `Membrane.RTMPServer`. """ ], url: [ @@ -82,20 +80,20 @@ defmodule Membrane.RTMP.Source do @impl true def handle_setup(_ctx, %{mode: :builtin_server} = state) do - {use_ssl?, port, app, stream_key} = Utils.parse_url(state.url) + {use_ssl?, port, app, stream_key} = Membrane.RTMPServer.parse_url(state.url) parent_pid = self() - new_client_callback = fn client_ref, app, stream_key -> + handle_new_client = fn client_ref, app, stream_key -> send(parent_pid, {:client_ref, client_ref, app, stream_key}) end {:ok, server_pid} = - Membrane.RTMP.Server.start_link( - handler: %SourceClientHandler{controlling_process: self()}, + Membrane.RTMPServer.start_link( + handler: %__MODULE__.ClientHandlerImpl{controlling_process: self()}, port: port, use_ssl?: use_ssl?, - new_client_callback: new_client_callback, + handle_new_client: handle_new_client, client_timeout: 100 ) @@ -115,7 +113,7 @@ defmodule Membrane.RTMP.Source do {:output, %Membrane.RemoteStream{content_format: Membrane.FLV, type: :bytestream}} ] - :ok = SourceClientHandler.request_for_data(state.client_ref) + send(state.client_ref, {:send_me_data, self()}) {stream_format, state} end @@ -150,7 +148,7 @@ defmodule Membrane.RTMP.Source do %{client_ref: client_ref, mode: :builtin_server} = state ) do :ok = ClientHandler.demand_data(client_ref, size) - :ok = SourceClientHandler.request_for_data(client_ref) + send(client_ref, {:send_me_data, self()}) {[], state} end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/source_bin.ex b/lib/membrane_rtmp_plugin/rtmp/source/source_bin.ex index c3d3151a..6bf035a5 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/source_bin.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/source_bin.ex @@ -8,7 +8,7 @@ defmodule Membrane.RTMP.SourceBin do The bin can be used in the following two scenarios: * by providing the URL on which the client is expected to connect - note, that if the client doesn't connect on this URL, the bin won't complete its setup - * by spawning `Membrane.RTMP.Server`, receiving client reference after client connects on a given `app` and `stream_key` + * by spawning `Membrane.RTMPServer`, receiving client reference after client connects on a given `app` and `stream_key` and passing the client reference to the `#{inspect(__MODULE__)}`. """ use Membrane.Bin @@ -28,7 +28,7 @@ defmodule Membrane.RTMP.SourceBin do spec: pid(), description: """ A pid of a process acting as a client reference. - Can be gained with the use of `Membrane.RTMP.Server`. + Can be gained with the use of `Membrane.RTMPServer`. """ ], url: [ diff --git a/lib/membrane_rtmp_plugin/rtmp_server.ex b/lib/membrane_rtmp_plugin/rtmp_server.ex index 883347a6..4e76dfe7 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server.ex @@ -1,31 +1,31 @@ -defmodule Membrane.RTMP.Server do +defmodule Membrane.RTMPServer do @moduledoc """ - A simple RTMP server, which handles each new incoming connection. When a new client connects, the `new_client_callback` is invoked. + A simple RTMP server, which handles each new incoming connection. When a new client connects, the `handle_new_client` is invoked. New connections remain in an incomplete RTMP handshake state, until another process makes demand for their data. If no data is demanded within the client_timeout period, TCP socket is closed. Options: - client_timeout: Time (ms) after which an unused client connection is automatically closed. - - new_client_callback: An anonymous function called when a new client connects. + - handle_new_client: An anonymous function called when a new client connects. It receives the client reference, `app` and `stream_key`, allowing custom processing, like sending the reference to another process. If it's not provided, default implementation is used: - {:client_ref, client_ref, app, stream_key} message is sent to the process that invoked RTMP.Server.start_link(). + {:client_ref, client_ref, app, stream_key} message is sent to the process that invoked RTMPServer.start_link(). """ use GenServer require Logger - alias Membrane.RTMP.Server.ClientHandlerBehaviour + alias Membrane.RTMPServer.ClientHandler @typedoc """ Defines options for the RTMP server. """ @type t :: [ - handler: ClientHandlerBehaviour.t(), + handler: ClientHandler.t(), port: :inet.port_number(), use_ssl?: boolean(), name: atom() | nil, - new_client_callback: + handle_new_client: (client_ref :: pid(), app :: String.t(), stream_key :: String.t() -> any()) | nil, @@ -44,14 +44,14 @@ defmodule Membrane.RTMP.Server do server_options = Enum.into(server_options, %{}) server_options = - if server_options[:new_client_callback] == nil do + if server_options[:handle_new_client] == nil do parent_process_pid = self() callback = fn client_ref, app, stream_key -> send(parent_process_pid, {:client_ref, client_ref, app, stream_key}) end - Map.put(server_options, :new_client_callback, callback) + Map.put(server_options, :handle_new_client, callback) else server_options end @@ -70,7 +70,7 @@ defmodule Membrane.RTMP.Server do @impl true def init(server_options) do pid = - Task.start_link(Membrane.RTMP.Server.Listener, :run, [ + Task.start_link(Membrane.RTMPServer.Listener, :run, [ Map.merge(server_options, %{server: self()}) ]) @@ -97,4 +97,30 @@ defmodule Membrane.RTMP.Server do Enum.each(state.to_reply, &GenServer.reply(&1, port)) {:noreply, %{state | port: port, to_reply: []}} end + + @doc """ + Extracts ssl, port, app and stream_key from url. + """ + @spec parse_url(url :: String.t()) :: {boolean(), integer(), String.t(), String.t()} + def parse_url(url) do + uri = URI.parse(url) + port = uri.port + + {app, stream_key} = + case (uri.path || "") + |> String.trim_leading("/") + |> String.trim_trailing("/") + |> String.split("/") do + [app, stream_key] -> {app, stream_key} + [app] -> {app, ""} + end + + use_ssl? = + case uri.scheme do + "rtmp" -> false + "rtmps" -> true + end + + {use_ssl?, port, app, stream_key} + end end diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex index 652a3d72..5a08a66e 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/client_handler.ex @@ -1,7 +1,10 @@ -defmodule Membrane.RTMP.Server.ClientHandler do - @moduledoc false +defmodule Membrane.RTMPServer.ClientHandler do + @moduledoc """ + A behaviour describing the actions that might be taken by the client + handler in response to different events. + """ - # Module responsible for maintaining the lifecycle of the + # It also containts functions responsible for maintaining the lifecycle of the # client connection. use GenServer @@ -9,6 +12,52 @@ defmodule Membrane.RTMP.Server.ClientHandler do require Logger alias Membrane.RTMP.{Handshake, MessageHandler, MessageParser} + @typedoc """ + Type representing the user defined state of the client handler. + """ + @type t :: term() + + @doc """ + The callback invoked once the client handler is created. + It should return the initial state of the client handler. + """ + @callback handle_init(any()) :: t() + + @doc """ + The callback invoked when the client sends the `Membrane.RTMP.Messages.Connect.t()` + message. + """ + @callback handle_connected(connected_msg :: Membrane.RTMP.Messages.Connect.t(), state :: t()) :: + t() + + @doc """ + The callback invoked when the client sends the `Membrane.RTMP.Messages.Publish.t()` + message. + """ + @callback handle_stream_published( + publish_msg :: Membrane.RTMP.Messages.Publish.t(), + state :: t() + ) :: t() + + @doc """ + The callback invoked when new piece of data is received from a given client. + """ + @callback handle_data_available(payload :: binary(), state :: t()) :: t() + + @doc """ + The callback invoked when the client served by given client handler + stops sending data. + (for instance, when the remote client deletes the stream or + terminates the socket connection) + """ + @callback handle_end_of_stream(state :: t()) :: t() + + @doc """ + The callback invoked when the client handler receives a message + that is not recognized as an internal message of the client handler. + """ + @callback handle_info(msg :: term(), t()) :: t() + @doc """ Makes the client handler ask client for the desired number of buffers """ @@ -40,7 +89,7 @@ defmodule Membrane.RTMP.Server.ClientHandler do buffers_demanded: 0, published?: false, notified_about_client?: false, - new_client_callback: opts.new_client_callback, + handle_new_client: opts.handle_new_client, client_timeout: opts.client_timeout }} end @@ -114,10 +163,10 @@ defmodule Membrane.RTMP.Server.ClientHandler do %{publish_msg: %Membrane.RTMP.Messages.Publish{stream_key: stream_key}} = message_handler_state - if is_function(state.new_client_callback) do - state.new_client_callback.(self(), state.app, stream_key) + if is_function(state.handle_new_client) do + state.handle_new_client.(self(), state.app, stream_key) else - raise "new_client_callback is not a function" + raise "handle_new_client is not a function" end Process.send_after(self(), {:client_timeout, state.app, stream_key}, state.client_timeout) diff --git a/lib/membrane_rtmp_plugin/rtmp_server/client_handler_behaviour.ex b/lib/membrane_rtmp_plugin/rtmp_server/client_handler_behaviour.ex deleted file mode 100644 index e55b75a5..00000000 --- a/lib/membrane_rtmp_plugin/rtmp_server/client_handler_behaviour.ex +++ /dev/null @@ -1,52 +0,0 @@ -defmodule Membrane.RTMP.Server.ClientHandlerBehaviour do - @moduledoc """ - A behaviour describing the actions that might be taken by the client - handler in response to different events. - """ - - @typedoc """ - Type representing the user defined state of the client handler. - """ - @type t :: term() - - @doc """ - The callback invoked once the client handler is created. - It should return the initial state of the client handler. - """ - @callback handle_init(any()) :: t() - - @doc """ - The callback invoked when the client sends the `Membrane.RTMP.Messages.Connect.t()` - message. - """ - @callback handle_connected(connected_msg :: Membrane.RTMP.Messages.Connect.t(), state :: t()) :: - t() - - @doc """ - The callback invoked when the client sends the `Membrane.RTMP.Messages.Publish.t()` - message. - """ - @callback handle_stream_published( - publish_msg :: Membrane.RTMP.Messages.Publish.t(), - state :: t() - ) :: t() - - @doc """ - The callback invoked when new piece of data is received from a given client. - """ - @callback handle_data_available(payload :: binary(), state :: t()) :: t() - - @doc """ - The callback invoked when the client served by given client handler - stops sending data. - (for instance, when the remote client deletes the stream or - terminates the socket connection) - """ - @callback handle_end_of_stream(state :: t()) :: t() - - @doc """ - The callback invoked when the client handler receives a message - that is not recognized as an internal message of the client handler. - """ - @callback handle_info(msg :: term(), t()) :: t() -end diff --git a/lib/membrane_rtmp_plugin/rtmp_server/listener.ex b/lib/membrane_rtmp_plugin/rtmp_server/listener.ex index 23b42e04..6cf28654 100644 --- a/lib/membrane_rtmp_plugin/rtmp_server/listener.ex +++ b/lib/membrane_rtmp_plugin/rtmp_server/listener.ex @@ -1,11 +1,11 @@ -defmodule Membrane.RTMP.Server.Listener do +defmodule Membrane.RTMPServer.Listener do @moduledoc false # Module responsible for maintaining the listening socket. use Task require Logger - alias Membrane.RTMP.Server.ClientHandler + alias Membrane.RTMPServer.ClientHandler @spec run( options :: %{ @@ -53,7 +53,7 @@ defmodule Membrane.RTMP.Server.Listener do use_ssl?: options.use_ssl?, handler: options.handler, server: options.server, - new_client_callback: options.new_client_callback, + handle_new_client: options.handle_new_client, client_timeout: options.client_timeout ) diff --git a/lib/membrane_rtmp_plugin/utils.ex b/lib/membrane_rtmp_plugin/utils.ex deleted file mode 100644 index b782b8d8..00000000 --- a/lib/membrane_rtmp_plugin/utils.ex +++ /dev/null @@ -1,28 +0,0 @@ -defmodule Membrane.RTMP.Utils do - @moduledoc """ - Utility functions - """ - - @doc """ - Extracts ssl, port, app and stream_key from url. - """ - @spec parse_url(url :: String.t()) :: {boolean(), integer(), String.t(), String.t()} - def parse_url(url) do - uri = URI.parse(url) - port = uri.port - - {app, stream_key} = - case String.trim_leading(uri.path, "/") |> String.trim_trailing("/") |> String.split("/") do - [app, stream_key] -> {app, stream_key} - [app] -> {app, ""} - end - - use_ssl? = - case uri.scheme do - "rtmp" -> false - "rtmps" -> true - end - - {use_ssl?, port, app, stream_key} - end -end diff --git a/mix.exs b/mix.exs index 1fc7207e..916b79a7 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.RTMP.Mixfile do use Mix.Project - @version "0.23.4" + @version "0.24.0" @github_url "https://github.com/membraneframework/membrane_rtmp_plugin" def project do diff --git a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs index 44ebf096..9d5d4896 100644 --- a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs +++ b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs @@ -220,22 +220,22 @@ defmodule Membrane.RTMP.SourceBin.IntegrationTest do ) do parent_process_pid = self() - new_client_callback = fn client_ref, app, stream_key -> + handle_new_client = fn client_ref, app, stream_key -> send(parent_process_pid, {:client_ref, client_ref, app, stream_key}) end {:ok, server_pid} = - Membrane.RTMP.Server.start_link( - handler: %Membrane.RTMP.Source.ClientHandler{ + Membrane.RTMPServer.start_link( + handler: %Membrane.RTMP.Source.ClientHandlerImpl{ controlling_process: self() }, port: port, use_ssl?: use_ssl?, - new_client_callback: new_client_callback, + handle_new_client: handle_new_client, client_timeout: 3_000 ) - {:ok, assigned_port} = Membrane.RTMP.Server.get_port(server_pid) + {:ok, assigned_port} = Membrane.RTMPServer.get_port(server_pid) send(parent, {:port, assigned_port})