diff --git a/README.md b/README.md index 3ee09d28..9156c442 100644 --- a/README.md +++ b/README.md @@ -4,16 +4,8 @@ [![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](https://hexdocs.pm/membrane_rtmp_plugin) [![CircleCI](https://circleci.com/gh/membraneframework/membrane_rtmp_plugin.svg?style=svg)](https://circleci.com/gh/membraneframework/membrane_rtmp_plugin) -This package provides RTMP server which listens to a connection from a client and element for streaming to an RTMP server. It is part of [Membrane Multimedia Framework](https://membraneframework.org). -### Server -After establishing connection it receives RTMP stream, demux it and outputs H264 video and AAC audio. -At this moment only one client can connect to the server. -### Client -After establishing connection with server it waits to receive video and audio streams. Once both streams are received they are streamed to the server. -Currently only the following codecs are supported: -- H264 for video -- AAC for audio - +This package provides RTMP server which receives an RTMP stream from a client and an element for streaming to an RTMP server. +It is part of [Membrane Multimedia Framework](https://membraneframework.org). ## Installation @@ -22,37 +14,59 @@ The package can be installed by adding `membrane_rtmp_plugin` to your list of de ```elixir def deps do [ - {:membrane_rtmp_plugin, "~> 0.8.1"} + {:membrane_rtmp_plugin, "~> 0.9.0"} ] end ``` +## SourceBin + +Requires a socket, which has been connected to the client. It receives RTMP stream, demuxes it and outputs H264 video and AAC audio. + +## Client +After establishing connection with server it waits to receive video and audio streams. Once both streams are received they are streamed to the server. +Currently only the following codecs are supported: +- H264 for video +- AAC for audio + +## Tcp Server +It's a simple implementation of tcp server. It opens a tcp port and listens for incoming connections. For each new connection, a user-provided function is executed. + ### Prerequisites In order to successfully build and install the plugin, you need to have **ffmpeg == 4.4** installed on your system ## Usage -### `Membrane.RTMP.Source` - RTMP Server + +### RTMP receiver Server-side example, in which Membrane will act as an RTMP server and receive the stream, can be found under [`examples/source.exs`](examples/source.exs). Run it with: + ```bash elixir examples/source.exs ``` + When the server is ready you can connect to it with RTMP. If you just want to test it, you can use FFmpeg: ```bash ffmpeg -re -i test/fixtures/testsrc.flv -f flv -c:v copy -c:a copy rtmp://localhost:5000 ``` -### `Membrane.RTMP.Sink` - Streaming over RTMP -Streaming implementation example is provided with the following [script](examples/sink.exs). Run it with: + +### Streaming with RTMP +Streaming implementation example is provided with the following [`examples/sink.exs`](examples/sink.exs). Run it with: + ```bash elixir examples/sink.exs ``` + It will connect to RTMP server provided via URL and stream H264 video and AAC audio. RTMP server that will receive this stream can be launched with ffmpeg by running the following commands: + ```bash export RTMP_URL=rtmp://localhost:1935 ffmpeg -listen 1 -f flv -i rtmp://localhost:1935 -c copy dest.flv ``` + It will receive stream and once streaming is completed dump it to .flv file. If you are using the command above, please remember to run it **before** the streaming script. + ## Copyright and License Copyright 2021, [Software Mansion](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=membrane_rtmp_plugin) diff --git a/bundlex.exs b/bundlex.exs index 7f6d3f87..46290ad9 100644 --- a/bundlex.exs +++ b/bundlex.exs @@ -9,13 +9,6 @@ defmodule Membrane.RTMP.BundlexProject do defp natives(_platform) do [ - rtmp_source: [ - sources: ["source/rtmp_source.c"], - deps: [unifex: :unifex], - interface: [:nif], - preprocessor: Unifex, - pkg_configs: ["libavformat", "libavutil"] - ], rtmp_sink: [ sources: ["sink/rtmp_sink.c"], deps: [unifex: :unifex], diff --git a/c_src/membrane_rtmp_plugin/source/_generated/.gitignore b/c_src/membrane_rtmp_plugin/source/_generated/.gitignore deleted file mode 100644 index e7b4472d..00000000 --- a/c_src/membrane_rtmp_plugin/source/_generated/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -**/*.h -**/*.c -**/*.cpp diff --git a/c_src/membrane_rtmp_plugin/source/rtmp_source.c b/c_src/membrane_rtmp_plugin/source/rtmp_source.c deleted file mode 100644 index 594a40f1..00000000 --- a/c_src/membrane_rtmp_plugin/source/rtmp_source.c +++ /dev/null @@ -1,217 +0,0 @@ -#include "rtmp_source.h" -#include - -void handle_init_state(State *); - -// A callback invoked periodically by 'avformat_open_input' to check -// if the blocking call should be interrupted. -static int interrupt_callback(void *ctx) { - bool is_terminating = *(bool *)ctx; - // interrupt if the flag is set - return is_terminating; -} - -UNIFEX_TERM create(UnifexEnv *env) { - State *s = unifex_alloc_state(env); - handle_init_state(s); - - if (s->h264_bsf_ctx == NULL) { - unifex_release_state(env, s); - return unifex_raise(env, "Could not find filter h264_mp4toannexb"); - } - - s->input_ctx->interrupt_callback.callback = interrupt_callback; - s->input_ctx->interrupt_callback.opaque = &s->terminating; - return create_result_ok(env, s); -} - -UNIFEX_TERM await_open(UnifexEnv *env, State *s, char *url, int timeout) { - AVDictionary *d = NULL; - av_dict_set(&d, "listen", "1", 0); - av_dict_set_int(&d, "timeout", timeout, 0); - - UNIFEX_TERM ret; - - int av_err = avformat_open_input(&s->input_ctx, url, NULL, &d); - if (av_err == AVERROR(ETIMEDOUT)) { - ret = await_open_result_error_timeout(env); - goto err; - } else if (av_err == AVERROR_EXIT) { - // Error returned when interrupt_callback returns non-zero - ret = await_open_result_error_interrupted(env); - goto err; - } else if (av_err < 0) { - ret = await_open_result_error(env, av_err2str(av_err)); - goto err; - } - - if (avformat_find_stream_info(s->input_ctx, NULL) < 0) { - ret = await_open_result_error(env, "Couldn't get stream info"); - goto err; - } - - s->number_of_streams = s->input_ctx->nb_streams; - - if (s->number_of_streams == 0) { - ret = await_open_result_error( - env, "No streams found - at least one stream is required"); - goto err; - } - - for (int i = 0; i < s->number_of_streams; i++) { - AVStream *in_stream = s->input_ctx->streams[i]; - AVCodecParameters *in_codecpar = in_stream->codecpar; - if (in_codecpar->codec_type != AVMEDIA_TYPE_AUDIO && - in_codecpar->codec_type != AVMEDIA_TYPE_VIDEO) { - continue; - } - - if (in_codecpar->codec_id != AV_CODEC_ID_H264 && - in_codecpar->codec_id != AV_CODEC_ID_AAC) { - ret = await_open_result_error( - env, "Unsupported codec. Only H264 and AAC are supported"); - goto err; - } - if (in_codecpar->codec_id == AV_CODEC_ID_H264) { - s->h264_bsf_ctx->time_base_in = in_stream->time_base; - s->h264_bsf_ctx->par_in->codec_id = in_codecpar->codec_id; - } - } - - av_bsf_init(s->h264_bsf_ctx); - ret = await_open_result_ok(env, s); -err: - unifex_release_state(env, s); - return ret; -} - -UNIFEX_TERM set_terminate(UnifexEnv *env, State *s) { - s->terminating = true; - return set_terminate_result_ok(env); -} - -UNIFEX_TERM get_audio_params(UnifexEnv *env, State *s) { - for (int i = 0; i < s->number_of_streams; i++) { - if (s->input_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) { - UnifexPayload payload; - unifex_payload_alloc(env, UNIFEX_PAYLOAD_BINARY, - s->input_ctx->streams[i]->codecpar->extradata_size, - &payload); - memcpy(payload.data, s->input_ctx->streams[i]->codecpar->extradata, - s->input_ctx->streams[i]->codecpar->extradata_size); - UNIFEX_TERM result = get_audio_params_result_ok(env, &payload); - unifex_payload_release(&payload); - return result; - } - } - - return get_audio_params_result_error(env); -} - -UNIFEX_TERM get_video_params(UnifexEnv *env, State *s) { - for (int i = 0; i < s->number_of_streams; i++) { - if (s->input_ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { - UnifexPayload payload; - unifex_payload_alloc(env, UNIFEX_PAYLOAD_BINARY, - s->input_ctx->streams[i]->codecpar->extradata_size, - &payload); - memcpy(payload.data, s->input_ctx->streams[i]->codecpar->extradata, - s->input_ctx->streams[i]->codecpar->extradata_size); - UNIFEX_TERM result = get_video_params_result_ok(env, &payload); - unifex_payload_release(&payload); - return result; - } - } - - return get_video_params_result_error(env); -} - -int64_t get_pts(AVPacket *pkt, AVStream *stream) { - const AVRational target_time_base = {1, 1000}; - return av_rescale_q_rnd(pkt->pts, stream->time_base, target_time_base, - AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX); -} - -int64_t get_dts(AVPacket *pkt, AVStream *stream) { - const AVRational target_time_base = {1, 1000}; - return av_rescale_q_rnd(pkt->dts, stream->time_base, target_time_base, - AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX); -} - -UNIFEX_TERM read_frame(UnifexEnv *env, State *s) { - AVPacket packet; - AVStream *in_stream; - enum AVMediaType codec_type; - UNIFEX_TERM result; - - while (true) { - if (av_read_frame(s->input_ctx, &packet) < 0) { - result = read_frame_result_end_of_stream(env); - goto end; - } - - if (packet.stream_index >= s->number_of_streams) { - result = read_frame_result_error(env, "Invalid stream index"); - goto end; - } - - in_stream = s->input_ctx->streams[packet.stream_index]; - codec_type = in_stream->codecpar->codec_type; - - if (codec_type != AVMEDIA_TYPE_AUDIO && codec_type != AVMEDIA_TYPE_VIDEO) { - av_packet_unref(&packet); - } else { - break; - } - } - - UNIFEX_TERM(*result_func) - (UnifexEnv *, int64_t, int64_t, UnifexPayload *) = NULL; - - switch (codec_type) { - case AVMEDIA_TYPE_VIDEO: - av_bsf_send_packet(s->h264_bsf_ctx, &packet); - av_bsf_receive_packet(s->h264_bsf_ctx, &packet); - result_func = &read_frame_result_video; - break; - - case AVMEDIA_TYPE_AUDIO: - result_func = &read_frame_result_audio; - break; - - default: - return unifex_raise(env, "Unsupported frame type"); - } - - UnifexPayload payload; - unifex_payload_alloc(env, UNIFEX_PAYLOAD_BINARY, packet.size, &payload); - memcpy(payload.data, packet.data, packet.size); - result = result_func(env, get_pts(&packet, in_stream), - get_dts(&packet, in_stream), &payload); - unifex_payload_release(&payload); - -end: - av_packet_unref(&packet); - return result; -} - -void handle_init_state(State *s) { - s->input_ctx = avformat_alloc_context(); - s->terminating = false; - const AVBitStreamFilter *h264_filter = av_bsf_get_by_name("h264_mp4toannexb"); - av_bsf_alloc(h264_filter, &s->h264_bsf_ctx); -} - -void handle_destroy_state(UnifexEnv *env, State *s) { - UNIFEX_UNUSED(env); - - s->terminating = true; - - if (s->h264_bsf_ctx) { - av_bsf_free(&s->h264_bsf_ctx); - } - - if (s->input_ctx) { - avformat_close_input(&s->input_ctx); - } -} diff --git a/c_src/membrane_rtmp_plugin/source/rtmp_source.h b/c_src/membrane_rtmp_plugin/source/rtmp_source.h deleted file mode 100644 index 9e7ae616..00000000 --- a/c_src/membrane_rtmp_plugin/source/rtmp_source.h +++ /dev/null @@ -1,24 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include - -#if LIBAVCODEC_VERSION_MAJOR >= 59 -// In FFmpeg 5.0 (libavcodec 59) AVBSFContext was moved to a separate header -#include -#endif - -typedef struct State State; - -struct State { - AVFormatContext *input_ctx; - int number_of_streams; - bool terminating; - - AVBSFContext *h264_bsf_ctx; -}; - -#include "_generated/rtmp_source.h" diff --git a/c_src/membrane_rtmp_plugin/source/rtmp_source.spec.exs b/c_src/membrane_rtmp_plugin/source/rtmp_source.spec.exs deleted file mode 100644 index fad378be..00000000 --- a/c_src/membrane_rtmp_plugin/source/rtmp_source.spec.exs +++ /dev/null @@ -1,25 +0,0 @@ -module Membrane.RTMP.Source.Native - -state_type "State" -interface [NIF] - -spec create() :: {:ok :: label, state} - -spec await_open(state, url :: string, timeout :: int) :: - {:ok :: label, state} - | {:error :: label, :timeout :: label} - | {:error :: label, :interrupted :: label} - | {:error :: label, reason :: string} - -spec get_video_params(state) :: {:ok :: label, params :: payload} | {:error :: label, :no_stream} -spec get_audio_params(state) :: {:ok :: label, params :: payload} | {:error :: label, :no_stream} - -spec set_terminate(state) :: :ok :: label - -spec read_frame(state) :: - {:ok, :audio :: label, pts :: int64, dts :: int64, frame :: payload} - | {:ok, :video :: label, pts :: int64, dts :: int64, frame :: payload} - | {:error :: label, reason :: string} - | (:end_of_stream :: label) - -dirty :io, await_open: 3, read_frame: 1 diff --git a/examples/source.exs b/examples/source.exs index a405025b..e43f5d94 100644 --- a/examples/source.exs +++ b/examples/source.exs @@ -3,7 +3,7 @@ # ffmpeg -re -i test/fixtures/testsrc.flv -f flv -c:v copy -c:a copy rtmp://localhost:5000 Mix.install([ - {:membrane_core, "~> 0.10.1"}, + {:membrane_core, "~> 0.10"}, {:membrane_rtmp_plugin, path: __DIR__ |> Path.join("../") |> Path.expand()}, :membrane_file_plugin, :membrane_mp4_plugin, @@ -11,53 +11,117 @@ Mix.install([ :membrane_aac_plugin ]) -defmodule Example do +defmodule Pipeline do use Membrane.Pipeline - @server_url "localhost" - @server_port 5000 @output_file "received.flv" @impl true - def handle_init(_opts) do + def handle_init(socket: socket) do spec = %ParentSpec{ children: %{ - source: %Membrane.RTMP.SourceBin{ - local_ip: @server_url, - port: @server_port - }, - video_payloader: Membrane.MP4.Payloader.H264, - muxer: Membrane.FLV.Muxer, - sink: %Membrane.File.Sink{location: @output_file} + source: %Membrane.RTMP.SourceBin{ + socket: socket }, - links: [ - link(:source) |> via_out(:audio) |> via_in(Pad.ref(:audio, 0)) |> to(:muxer), - link(:source) |> via_out(:video) |> to(:video_payloader) |> via_in(Pad.ref(:video, 0)) |> to(:muxer), - link(:muxer) |> to(:sink) - ] + video_payloader: Membrane.MP4.Payloader.H264, + muxer: Membrane.FLV.Muxer, + sink: %Membrane.File.Sink{location: @output_file} + }, + links: [ + link(:source) |> via_out(:audio) |> via_in(Pad.ref(:audio, 0)) |> to(:muxer), + link(:source) + |> via_out(:video) + |> to(:video_payloader) + |> via_in(Pad.ref(:video, 0)) + |> to(:muxer), + link(:muxer) |> to(:sink) + ] } {{:ok, spec: spec, playback: :playing}, %{}} end + # Once the source initializes, we grant it the control over the tcp socket + @impl true + def handle_notification( + {:socket_control_needed, _socket, _source} = notification, + :source, + _ctx, + state + ) do + send(self(), notification) + + {:ok, state} + end + + def handle_notification(_notification, _child, _ctx, state) do + {:ok, state} + end + + @impl true + def handle_other({:socket_control_needed, socket, source} = notification, _ctx, state) do + case Membrane.RTMP.SourceBin.pass_control(socket, source) do + :ok -> + :ok + + {:error, :not_owner} -> + Process.send_after(self(), notification, 200) + end + + {:ok, state} + end + # The rest of the module is used for self-termination of the pipeline after processing finishes @impl true def handle_element_end_of_stream({:sink, _pad}, _ctx, state) do Membrane.Pipeline.terminate(self()) - {{:ok, playback: :stopped}, state} + {:ok, state} end @impl true - def handle_element_end_of_stream(_other, _ctx, state), do: {:ok, state} + def handle_element_end_of_stream({_child, _pad}, _ctx, state) do + {:ok, state} + end end -# Initialize and run the pipeline -{:ok, pid} = Example.start() +defmodule Example do + @server_ip {127, 0, 0, 1} + @server_port 5000 + + def run() do + parent = self() -monitor_ref = Process.monitor(pid) + server_options = %Membrane.RTMP.Source.TcpServer{ + port: @server_port, + listen_options: [ + :binary, + packet: :raw, + active: false, + ip: @server_ip + ], + socket_handler: fn socket -> + # On new connection a pipeline is started + {:ok, pipeline} = Pipeline.start_link(socket: socket) + send(parent, {:pipeline_spawned, pipeline}) + {:ok, pipeline} + end + } -# Wait for the pipeline to terminate -receive do - {:DOWN, ^monitor_ref, :process, _pid, _reason} -> - :ok + Membrane.RTMP.Source.TcpServer.start_link(server_options) + + pipeline = + receive do + {:pipeline_spawned, pid} -> + pid + end + + ref = Process.monitor(pipeline) + + receive do + {:DOWN, ^ref, :process, _obj, _reason} -> + :ok + end + end end + +Example.run() diff --git a/lib/membrane_rtmp_plugin/rtmp/source/amf/encoder.ex b/lib/membrane_rtmp_plugin/rtmp/source/amf/encoder.ex new file mode 100644 index 00000000..8c962d28 --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/amf/encoder.ex @@ -0,0 +1,58 @@ +defmodule Membrane.RTMP.AMF.Encoder do + @moduledoc false + + # Encodes a message according to AMF0 (https://en.wikipedia.org/wiki/Action_Message_Format) + + @type basic_object_t :: float() | String.t() | map() | :null + @type list_entry_t :: {key :: String.t(), basic_object_t()} + @type object_t :: basic_object_t() | [list_entry_t()] + + @object_end_marker <<0x00, 0x00, 0x09>> + + @spec encode(object_t() | [object_t]) :: binary() + def encode(objects) when is_list(objects) do + objects + |> Enum.map(&do_encode_object/1) + |> IO.iodata_to_binary() + end + + def encode(object) do + do_encode_object(object) + end + + # encode number + defp do_encode_object(object) when is_number(object) do + <<0x00, object::float-size(64)>> + end + + # encode boolean + defp do_encode_object(object) when is_boolean(object) do + if object do + <<0x01, 1::8>> + else + <<0x01, 0::8>> + end + end + + # encode string + defp do_encode_object(object) when is_binary(object) and byte_size(object) < 65_535 do + <<0x02, byte_size(object)::16, object::binary>> + end + + defp do_encode_object(object) when is_map(object) or is_list(object) do + id = + if is_map(object) do + 0x03 + else + 0x08 + end + + IO.iodata_to_binary([id, Enum.map(object, &encode_key_value_pair/1), @object_end_marker]) + end + + defp do_encode_object(:null), do: <<0x05>> + + defp encode_key_value_pair({<>, value}) when byte_size(key) < 65_535 do + [<>, key, do_encode_object(value)] + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/amf/parser.ex b/lib/membrane_rtmp_plugin/rtmp/source/amf/parser.ex new file mode 100644 index 00000000..3401496f --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/amf/parser.ex @@ -0,0 +1,68 @@ +defmodule Membrane.RTMP.AMF.Parser do + @moduledoc false + + @doc """ + Parses message from AMF format to elixir data types. + """ + @spec parse(binary()) :: list() + def parse(binary) do + do_parse(binary, []) + end + + defp do_parse(<<>>, acc), do: Enum.reverse(acc) + + defp do_parse(payload, acc) do + {value, rest} = parse_value(payload) + + do_parse(rest, [value | acc]) + end + + # parsing a number + defp parse_value(<<0x00, number::float-size(64), rest::binary>>) do + {number, rest} + end + + # parsing a boolean + defp parse_value(<<0x01, boolean::8, rest::binary>>) do + {boolean == 1, rest} + end + + # parsing a string + defp parse_value(<<0x02, size::16, string::binary-size(size), rest::binary>>) do + {string, rest} + end + + # parsing a key-value object + defp parse_value(<<0x03, rest::binary>>) do + {acc, rest} = parse_object_pairs(rest, []) + + {Map.new(acc), rest} + end + + # parsing a null value + defp parse_value(<<0x05, rest::binary>>) do + {:null, rest} + end + + defp parse_value(<<0x08, _array_size::32, rest::binary>>) do + parse_object_pairs(rest, []) + end + + defp parse_value(_data) do + raise "Unknown data type" + end + + # we reached object end + defp parse_object_pairs(<<0x00, 0x00, 0x09, rest::binary>>, acc) do + {Enum.reverse(acc), rest} + end + + defp parse_object_pairs( + <>, + acc + ) do + {value, rest} = parse_value(rest) + + parse_object_pairs(rest, [{key, value} | acc]) + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/bin.ex b/lib/membrane_rtmp_plugin/rtmp/source/bin.ex index 9a4c211f..1e13d2b9 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/bin.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/bin.ex @@ -1,9 +1,22 @@ defmodule Membrane.RTMP.SourceBin do @moduledoc """ - Bin responsible for spawning new RTMP server. + Bin responsible for demuxing and parsing an RTMP stream. - It will receive RTMP stream from the client, parse it and demux it, outputting single audio and video which are ready for further processing with Membrane Elements. - At this moment only AAC and H264 codecs are support + Outputs single audio and video which are ready for further processing with Membrane Elements. + At this moment only AAC and H264 codecs are supported. + + ## Usage + + The bin requires the RTMP client to be already connected to the socket. + The socket passed to the bin must be in non-active mode (`active` set to `false`). + + When the `Membrane.RTMP.Source` is initialized the bin sends `t:Membrane.RTMP.Source.socket_control_needed_t/0` notification. + Then, the control of the socket should be immediately granted to the `Source` with the `pass_control/2`, + and the `Source` will start reading packets from the socket. + + The bin allows for providing custom validator module, that verifies some of the RTMP messages. + The module has to implement the `Membrane.RTMP.MessageValidator` behaviour. + If the validation fails, a `t:Membrane.RTMP.Source.stream_validation_failed_t/0` notification is sent. """ use Membrane.Bin @@ -21,34 +34,27 @@ defmodule Membrane.RTMP.SourceBin do mode: :pull, demand_unit: :buffers - def_options port: [ - spec: 1..65_535, - description: "Port on which the server will listen" - ], - local_ip: [ - spec: binary(), - default: "127.0.0.1", - description: - "IP address on which the server will listen. This is useful if you have more than one network interface" - ], - timeout: [ - spec: Time.t() | :infinity, - default: :infinity, + def_options socket: [ + spec: :gen_tcp.socket(), description: """ - Time during which the connection with the client must be established before handle_prepared_to_playing fails. - - Duration given must be a multiply of one second or atom `:infinity`. + Socket, on which the bin will receive RTMP stream. The socket will be passed to the `RTMP.Source`. + The socket must be already connected to the RTMP client and be in non-active mode (`active` set to `false`). """ + ], + validator: [ + spec: Membrane.RTMP.StreamValidator, + description: """ + A Module implementing `Membrane.RTMP.MessageValidator` behaviour, used for validating the stream. + """, + default: Membrane.RTMP.DefaultMessageValidator ] @impl true - def handle_init(%__MODULE__{} = options) do - url = "rtmp://#{options.local_ip}:#{options.port}" - source = %RTMP.Source{url: url, timeout: options.timeout} - + def handle_init(%__MODULE__{} = opts) do spec = %ParentSpec{ children: %{ - src: source, + src: %RTMP.Source{socket: opts.socket, validator: opts.validator}, + demuxer: Membrane.FLV.Demuxer, video_parser: %Membrane.H264.FFmpeg.Parser{ alignment: :au, attach_nalus?: true, @@ -60,11 +66,50 @@ defmodule Membrane.RTMP.SourceBin do } }, links: [ - link(:src) |> via_out(:audio) |> to(:audio_parser) |> to_bin_output(:audio), - link(:src) |> via_out(:video) |> to(:video_parser) |> to_bin_output(:video) + link(:src) |> to(:demuxer), + # + link(:demuxer) + |> via_out(Pad.ref(:audio, 0)) + |> to(:audio_parser) + |> to_bin_output(:audio), + # + link(:demuxer) + |> via_out(Pad.ref(:video, 0)) + |> to(:video_parser) + |> to_bin_output(:video) ] } {{:ok, spec: spec}, %{}} end + + @impl true + def handle_notification( + {:socket_control_needed, _socket, _pid} = notification, + :src, + _ctx, + state + ) do + {{:ok, [notify: notification]}, state} + end + + def handle_notification( + {type, _reason} = notification, + :src, + _ctx, + state + ) + when type in [:stream_validation_success, :stream_validation_error] do + {{:ok, [notify: notification]}, state} + end + + @doc """ + Passes the control of the socket to the `source`. + + To succeed, the executing process must be in control of the socket, otherwise `{:error, :not_owner}` is returned. + """ + @spec pass_control(:gen_tcp.socket(), pid) :: :ok | {:error, atom} + def pass_control(socket, source) do + :gen_tcp.controlling_process(socket, source) + end end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/default_message_validator.ex b/lib/membrane_rtmp_plugin/rtmp/source/default_message_validator.ex new file mode 100644 index 00000000..2d97300a --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/default_message_validator.ex @@ -0,0 +1,6 @@ +defmodule Membrane.RTMP.DefaultMessageValidator do + @moduledoc """ + A default validator for the `Membrane.RTMP.SourceBin`, that allows all incoming streams. + """ + use Membrane.RTMP.MessageValidator +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/handshake.ex b/lib/membrane_rtmp_plugin/rtmp/source/handshake.ex new file mode 100644 index 00000000..fac498a3 --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/handshake.ex @@ -0,0 +1,101 @@ +defmodule Membrane.RTMP.Handshake do + @moduledoc false + + alias Membrane.RTMP.Handshake.Step + + defmodule State do + @moduledoc false + + @enforce_keys [:step] + defstruct @enforce_keys + + @type t :: %__MODULE__{ + step: Step.t() | nil + } + end + + @handshake_size 1536 + + @doc """ + Initializes handshake process on a server side. + """ + @spec init_server() :: State.t() + def init_server() do + %State{step: nil} + end + + @doc """ + Initializes handshake process as a client. + """ + @spec init_client(non_neg_integer()) :: {Step.t(), State.t()} + def init_client(epoch) do + step = %Step{type: :c0_c1, data: generate_c1_s1(epoch)} + + {step, %State{step: step}} + end + + @spec handle_step(binary(), State.t()) :: + {:continue_handshake, Step.t(), State.t()} + | {:handshake_finished, Step.t(), State.t()} + | {:handshake_finished, State.t()} + | {:error, {:invalid_handshake_step, Step.handshake_type_t()}} + def handle_step(step_data, state) + + def handle_step(step_data, %State{step: %Step{type: :c0_c1} = previous_step}) do + with {:ok, next_step} <- Step.deserialize(:s0_s1_s2, step_data), + :ok <- Step.verify_next_step(previous_step, next_step) do + <> = next_step.data + + step = %Step{type: :c2, data: s1} + + {:handshake_finished, step, %State{step: step}} + end + end + + def handle_step(step_data, %State{step: %Step{type: :s0_s1_s2} = previous_step}) do + with {:ok, next_step} <- Step.deserialize(:c2, step_data), + :ok <- Step.verify_next_step(previous_step, next_step) do + {:handshake_finished, %State{step: next_step}} + end + end + + def handle_step(step_data, %State{step: nil}) do + with {:ok, %Step{data: c1}} <- Step.deserialize(:c0_c1, step_data) do + <> = c1 + + step = %Step{ + type: :s0_s1_s2, + data: generate_c1_s1(time) <> c1 + } + + {:continue_handshake, step, %State{step: step}} + end + end + + @doc """ + Returns how many bytes the next handshake step should consist of. + """ + @spec expects_bytes(State.t()) :: non_neg_integer() + def expects_bytes(%State{step: step}) do + case step do + # expect c0 + c1 + nil -> + @handshake_size + 1 + + # expect s0 + s1 + s2 + %Step{type: :c0_c1} -> + 2 * @handshake_size + 1 + + # expect c2 + %Step{type: :s0_s1_s2} -> + @handshake_size + end + end + + # generates a unique segment of the handshake's step + # accordingly to the spec first 4 bytes are a connection epoch time, + # followed by 4 zero bytes and 1526 random bytes + defp generate_c1_s1(epoch) do + <> + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/handshake/step.ex b/lib/membrane_rtmp_plugin/rtmp/source/handshake/step.ex new file mode 100644 index 00000000..25ec50ac --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/handshake/step.ex @@ -0,0 +1,138 @@ +defmodule Membrane.RTMP.Handshake.Step do + @moduledoc false + + # Describes steps in the process of RTMP handshake + + @enforce_keys [:data, :type] + defstruct @enforce_keys + + @typedoc """ + RTMP handshake types. + + The handshake flow between client and server looks as follows: + + +-------------+ +-------------+ + | Client | TCP/IP Network | Server | + +-------------+ | +-------------+ + | | | + Uninitialized | Uninitialized + | C0 | | + |------------------->| C0 | + | |-------------------->| + | C1 | | + |------------------->| S0 | + | |<--------------------| + | | S1 | + Version sent |<--------------------| + | S0 | | + |<-------------------| | + | S1 | | + |<-------------------| Version sent + | | C1 | + | |-------------------->| + | C2 | | + |------------------->| S2 | + | |<--------------------| + Ack sent | Ack Sent + | S2 | | + |<-------------------| | + | | C2 | + | |-------------------->| + Handshake Done | Handshake Done + | | | + + Where `C0` and `S0` are RTMP protocol version (set to 0x03). + + Both sides exchange random chunks of 1536 bytes and the other side is supposed to + respond with those bytes remaining unchanged. + + In case of `S1` and `S2`, the latter is supposed to be equal to `C1` while + the client has to respond by sending `C2` with the `S1` as the value. + """ + @type handshake_type_t :: :c0_c1 | :s0_s1_s2 | :c2 + + @type t :: %__MODULE__{ + data: binary(), + type: handshake_type_t() + } + + @rtmp_version 0x03 + + @handshake_size 1536 + @s1_s2_size 2 * @handshake_size + + defmacrop invalid_step_error(type) do + quote do + {:error, {:invalid_handshake_step, unquote(type)}} + end + end + + @doc """ + Serializes the step. + """ + @spec serialize(t()) :: binary() + def serialize(%__MODULE__{type: type, data: data}) when type in [:c0_c1, :s0_s1_s2] do + <<@rtmp_version, data::binary>> + end + + def serialize(%__MODULE__{data: data}), do: data + + @doc """ + Deserializes the handshake step given the type. + """ + @spec deserialize(handshake_type_t(), binary()) :: + {:ok, t()} | {:error, :invalid_handshake_step} + def deserialize(:c0_c1 = type, <<0x03, data::binary-size(@handshake_size)>>) do + {:ok, %__MODULE__{type: type, data: data}} + end + + def deserialize(:s0_s1_s2 = type, <<0x03, data::binary-size(@s1_s2_size)>>) do + {:ok, %__MODULE__{type: type, data: data}} + end + + def deserialize(:c2 = type, <>) do + {:ok, %__MODULE__{type: type, data: data}} + end + + def deserialize(_type, _data), do: {:error, :invalid_handshake_step} + + @doc """ + Verifies if the following handshake step matches the previous one. + + C1 should have the same value as S2 and C2 be the same as S1. + """ + @spec verify_next_step(t() | nil, t()) :: + :ok | {:error, {:invalid_handshake_step, handshake_type_t()}} + def verify_next_step(previous_step, next_step) + + def verify_next_step(nil, %__MODULE__{type: :c0_c1}), do: :ok + + def verify_next_step(%__MODULE__{type: :c0_c1, data: c1}, %__MODULE__{ + type: :s0_s1_s2, + data: s1_s2 + }) do + <<_s1::binary-size(@handshake_size), s2::binary-size(@handshake_size)>> = s1_s2 + + if s2 == c1 do + :ok + else + invalid_step_error(:s0_s1_s2) + end + end + + def verify_next_step(%__MODULE__{type: :s0_s1_s2, data: s1_s2}, %__MODULE__{type: :c2, data: c2}) do + <> = s1_s2 + + if c2 == s1 do + :ok + else + invalid_step_error(:c2) + end + end + + @doc """ + Returns epoch timestamp of the connection. + """ + @spec epoch(t()) :: non_neg_integer() + def epoch(%__MODULE__{data: <>}), do: epoch +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/header.ex b/lib/membrane_rtmp_plugin/rtmp/source/header.ex new file mode 100644 index 00000000..0571b12b --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/header.ex @@ -0,0 +1,149 @@ +defmodule Membrane.RTMP.Header do + @moduledoc false + + @enforce_keys [:chunk_stream_id, :type_id] + defstruct @enforce_keys ++ [body_size: 0, timestamp: 0, stream_id: 0] + + @typedoc """ + RTMP header structure. + + Fields: + * `chunk_stream_id` - chunk stream identifier that the following packet body belongs to + * `timestmap` - chunk timestamp, equals 0 when the header is a part of non-media message + * `body_size` - the size in bytes of the following body payload + * `type_id` - the type of the body payload, for more details please refer to the RTMP docs + * `stream_id` - stream identifier that the message belongs to + """ + @type t :: %__MODULE__{ + chunk_stream_id: integer(), + timestamp: integer(), + body_size: integer(), + type_id: integer(), + stream_id: integer() + } + + defmacro type(:set_chunk_size), do: 0x01 + defmacro type(:user_control_message), do: 0x04 + defmacro type(:window_acknowledgement_size), do: 0x05 + defmacro type(:set_peer_bandwidth), do: 0x06 + defmacro type(:audio_message), do: 0x08 + defmacro type(:video_message), do: 0x09 + defmacro type(:amf_data), do: 0x12 + defmacro type(:amf_command), do: 0x14 + + @header_type_0 <<0x0::2>> + @header_type_1 <<0x1::2>> + @header_type_2 <<0x2::2>> + @header_type_3 <<0x3::2>> + + @header_type_0_size 11 + @header_type_1_size 7 + @header_type_2_size 3 + + @spec new(Keyword.t()) :: t() + def new(opts) do + struct!(__MODULE__, opts) + end + + @doc """ + Deserializes given binary into an RTMP header structure. + + RTMP headers can be self contained or may depend on preceding headers. + It depends on the first 2 bits of the header: + * `0b00` - current header is self contained and contains all the header information, see `t:t/0` + * `0b01` - current header derives the `stream_id` from the previous header + * `0b10` - same as above plus derives `type_id` and `body_size` + * `0b11` - all values are derived from the previous header with the same `chunk_stream_id` + """ + @spec deserialize(binary(), t() | nil) :: {t(), rest :: binary()} | {:error, :need_more_data} + def deserialize(binary, previous_headers \\ nil) + + # only the deserialization of the 0b00 type can have `nil` previous header + def deserialize( + <<@header_type_0::bitstring, chunk_stream_id::6, timestamp::24, body_size::24, type_id::8, + stream_id::32, rest::binary>>, + _previous_headers + ) do + header = %__MODULE__{ + chunk_stream_id: chunk_stream_id, + timestamp: timestamp, + body_size: body_size, + type_id: type_id, + stream_id: stream_id + } + + {header, rest} + end + + def deserialize( + <<@header_type_1::bitstring, chunk_stream_id::6, timestamp_delta::24, body_size::24, + type_id::8, rest::binary>>, + previous_headers + ) do + header = %__MODULE__{ + chunk_stream_id: chunk_stream_id, + timestamp: previous_headers[chunk_stream_id].timestamp + timestamp_delta, + body_size: body_size, + type_id: type_id, + stream_id: previous_headers[chunk_stream_id].stream_id + } + + {header, rest} + end + + def deserialize( + <<@header_type_2::bitstring, chunk_stream_id::6, timestamp_delta::24, rest::binary>>, + previous_headers + ) do + header = %__MODULE__{ + chunk_stream_id: chunk_stream_id, + timestamp: previous_headers[chunk_stream_id].timestamp + timestamp_delta, + body_size: previous_headers[chunk_stream_id].body_size, + type_id: previous_headers[chunk_stream_id].type_id, + stream_id: previous_headers[chunk_stream_id].stream_id + } + + {header, rest} + end + + def deserialize( + <<@header_type_3::bitstring, chunk_stream_id::6, rest::binary>>, + previous_headers + ) do + header = %__MODULE__{ + chunk_stream_id: chunk_stream_id, + timestamp: previous_headers[chunk_stream_id].timestamp, + body_size: previous_headers[chunk_stream_id].body_size, + type_id: previous_headers[chunk_stream_id].type_id, + stream_id: previous_headers[chunk_stream_id].stream_id + } + + {header, rest} + end + + def deserialize(<<@header_type_0::bitstring, _chunk_stream_id::6, rest::binary>>, _prev_header) + when byte_size(rest) < @header_type_0_size, + do: {:error, :need_more_data} + + def deserialize(<<@header_type_1::bitstring, _chunk_stream_id::6, rest::binary>>, _prev_header) + when byte_size(rest) < @header_type_1_size, + do: {:error, :need_more_data} + + def deserialize(<<@header_type_2::bitstring, _chunk_stream_id::6, rest::binary>>, _prev_header) + when byte_size(rest) < @header_type_2_size, + do: {:error, :need_more_data} + + @spec serialize(t()) :: binary() + def serialize(%__MODULE__{} = header) do + %{ + chunk_stream_id: chunk_stream_id, + timestamp: timestamp, + body_size: body_size, + type_id: type_id, + stream_id: stream_id + } = header + + <<@header_type_0::bitstring, chunk_stream_id::6, timestamp::24, body_size::24, type_id::8, + stream_id::32>> + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/message.ex b/lib/membrane_rtmp_plugin/rtmp/source/message.ex new file mode 100644 index 00000000..a0c16f7d --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/message.ex @@ -0,0 +1,100 @@ +defmodule Membrane.RTMP.Message do + @moduledoc false + + require Membrane.RTMP.Header + + alias Membrane.RTMP.{Header, Messages} + + @type message_data_t :: map() | number() | String.t() | :null + + @type t :: struct() + + @doc """ + Deserializes message binary to a proper struct. + """ + @callback deserialize(value :: binary()) :: t() + + @doc """ + Create message from arguments list. When the message is a AMF command then + the first argument is a command name and the second a sequence number. + """ + @callback from_data([message_data_t()]) :: t() + + @optional_callbacks deserialize: 1, from_data: 1 + + @amf_command_to_module %{ + "connect" => Messages.Connect, + "releaseStream" => Messages.ReleaseStream, + "FCPublish" => Messages.FCPublish, + "createStream" => Messages.CreateStream, + "publish" => Messages.Publish, + "@setDataFrame" => Messages.SetDataFrame + } + + @amf_data_to_module %{ + "@setDataFrame" => Messages.SetDataFrame + } + + @spec deserialize_message(type_id :: integer(), binary()) :: struct() + def deserialize_message(Header.type(:set_chunk_size), payload), + do: Messages.SetChunkSize.deserialize(payload) + + def deserialize_message(Header.type(:user_control_message), payload), + do: Messages.UserControl.deserialize(payload) + + def deserialize_message(Header.type(:window_acknowledgement_size), payload), + do: Messages.WindowAcknowledgement.deserialize(payload) + + def deserialize_message(Header.type(:set_peer_bandwidth), payload), + do: Messages.SetPeerBandwidth.deserialize(payload) + + def deserialize_message(Header.type(:amf_data), payload), + do: message_from_modules(payload, @amf_data_to_module, true) + + def deserialize_message(Header.type(:amf_command), payload), + do: message_from_modules(payload, @amf_command_to_module) + + def deserialize_message(Header.type(:audio_message), payload), + do: Messages.Audio.deserialize(payload) + + def deserialize_message(Header.type(:video_message), payload), + do: Messages.Video.deserialize(payload) + + @spec chunk_payload(binary(), non_neg_integer(), non_neg_integer()) :: iodata() + def chunk_payload(paylaod, chunk_stream_id, chunk_size) + + def chunk_payload(payload, _chunk_stream_id, chunk_size) + when byte_size(payload) <= chunk_size do + payload + end + + def chunk_payload(payload, chunk_stream_id, chunk_size), + do: do_chunk_payload(payload, chunk_stream_id, chunk_size, []) + + defp do_chunk_payload(payload, chunk_stream_id, chunk_size, acc) + when byte_size(payload) > chunk_size do + <> = payload + + acc = [<<0b11::2, chunk_stream_id::6>>, chunk | acc] + + do_chunk_payload(rest, chunk_stream_id, chunk_size, acc) + end + + defp do_chunk_payload(payload, _chunk_stream_id, _chunk_size, acc) do + [payload | acc] + |> Enum.reverse() + end + + defp message_from_modules(payload, mapping, required? \\ false) do + payload + |> Membrane.RTMP.AMF.Parser.parse() + |> then(fn [command | _rest] = arguments -> + if required? do + Map.fetch!(mapping, command) + else + Map.get(mapping, command, Messages.Anonymous) + end + |> apply(:from_data, [arguments]) + end) + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex b/lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex new file mode 100644 index 00000000..83d2f738 --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/message_handler.ex @@ -0,0 +1,292 @@ +defmodule Membrane.RTMP.MessageHandler do + @moduledoc false + + # Module responsible for processing the RTMP messages + # Appropriate responses are sent to the messages received during the initialization phase + # The data received in video and audio is forwarded to the outputs + + require Membrane.Logger + + alias Membrane.{Buffer, Logger} + + alias Membrane.RTMP.{ + Handshake, + Header, + Message, + MessageParser, + Messages, + Responses + } + + alias Membrane.RTMP.Messages.Serializer + + @windows_acknowledgment_size 2_500_000 + @peer_bandwidth_size 2_500_000 + + @spec handle_client_messages(list(), map()) :: map() + def handle_client_messages([], state) do + request_packet(state.socket) + state + end + + def handle_client_messages(messages, state) do + messages + |> Enum.reduce_while(state, fn {header, message}, acc -> + do_handle_client_message(message, header, acc) + end) + |> case do + {:error, _msg} = error -> + :gen_tcp.shutdown(state.socket, :read_write) + validation_action(state, error) + + state -> + request_packet(state.socket) + %{state | actions: Enum.reverse(state.actions)} + end + end + + # Expected flow of messages: + # 1. [in] c0_c1 handshake -> [out] s0_s1_s2 handshake + # 2. [in] c2 handshake -> [out] empty + # 3. [in] set chunk size -> [out] empty + # 4. [in] connect -> [out] window acknowledgement, set peer bandwidth, set chunk size, connect success, on bw done + # 5. [in] release stream -> [out] default _result + # 6. [in] FC publish, create stream, _checkbw -> [out] onFCPublish, default _result, default _result + # 7. [in] release stream -> [out] _result response + # 8. [in] publish -> [out] user control with stream id, publish success + # 9. CONNECTED + + defp do_handle_client_message(%module{data: data}, header, state) + when module in [Messages.Audio, Messages.Video] do + state = get_media_actions(header, data, state) + {:cont, state} + end + + defp do_handle_client_message(%Handshake.Step{type: :s0_s1_s2} = step, _header, state) do + :gen_tcp.send(state.socket, Handshake.Step.serialize(step)) + + connection_epoch = Handshake.Step.epoch(step) + {:cont, %{state | epoch: connection_epoch}} + end + + defp do_handle_client_message(%Messages.SetChunkSize{chunk_size: chunk_size}, _header, state) do + parser = %{state.message_parser | chunk_size: chunk_size} + {:cont, %{state | message_parser: parser}} + end + + defp do_handle_client_message(%Messages.Connect{}, _header, state) do + chunk_size = state.message_parser.chunk_size + + [ + %Messages.WindowAcknowledgement{size: @windows_acknowledgment_size}, + %Messages.SetPeerBandwidth{size: @peer_bandwidth_size}, + # stream begin type + %Messages.UserControl{event_type: 0x00, data: <<0, 0, 0, 0>>}, + # by default the ffmpeg server uses 128 chunk size + %Messages.SetChunkSize{chunk_size: chunk_size} + ] + |> Enum.each(&send_rtmp_payload(&1, state.socket, chunk_size)) + + {[tx_id], message_parser} = MessageParser.generate_tx_ids(state.message_parser, 1) + + tx_id + |> Responses.connection_success() + |> send_rtmp_payload(state.socket, chunk_size, chunk_stream_id: 3) + + Responses.on_bw_done() + |> send_rtmp_payload(state.socket, chunk_size, chunk_stream_id: 3) + + {:cont, %{state | message_parser: message_parser}} + end + + # According to ffmpeg's documentation, this command should make the server release channel for a media stream + # We are simply acknowleding the message + defp do_handle_client_message( + %Messages.ReleaseStream{tx_id: tx_id} = msg, + _header, + state + ) do + case state.validator.validate_release_stream(msg) do + {:ok, _msg} = result -> + tx_id + |> Responses.default_result() + |> send_rtmp_payload(state.socket, state.message_parser.chunk_size, chunk_stream_id: 3) + + {:cont, validation_action(state, result)} + + {:error, _reason} = error -> + {:halt, error} + end + end + + defp do_handle_client_message( + %Messages.Publish{stream_key: stream_key} = msg, + _header, + state + ) do + case state.validator.validate_publish(msg) do + {:ok, _msg} = result -> + %Messages.UserControl{event_type: 0, data: <<0, 0, 0, 1>>} + |> send_rtmp_payload(state.socket, state.message_parser.chunk_size, chunk_stream_id: 3) + + Responses.publish_success(stream_key) + |> send_rtmp_payload(state.socket, state.message_parser.chunk_size, chunk_stream_id: 3) + + {:cont, validation_action(state, result)} + + {:error, _reason} = error -> + {:halt, error} + end + end + + # A message containing stream metadata + defp do_handle_client_message(%Messages.SetDataFrame{} = msg, _header, state) do + case state.validator.validate_set_data_frame(msg) do + {:ok, _msg} = result -> + {:cont, validation_action(state, result)} + + {:error, _reason} = error -> + {:halt, error} + end + end + + # According to ffmpeg's documentation, this command should prepare the server to receive media streams + # We are simply acknowleding the message + defp do_handle_client_message(%Messages.FCPublish{}, _header, state) do + %Messages.Anonymous{name: "onFCPublish", properties: []} + |> send_rtmp_payload(state.socket, state.message_parser.chunk_size, chunk_stream_id: 3) + + {:cont, state} + end + + defp do_handle_client_message(%Messages.CreateStream{tx_id: tx_id}, _header, state) do + stream_id = [1.0] + + tx_id + |> Responses.default_result(stream_id) + |> send_rtmp_payload(state.socket, state.message_parser.chunk_size, chunk_stream_id: 3) + + {:cont, state} + end + + # Check bandwidth message + defp do_handle_client_message( + %Messages.Anonymous{name: "_checkbw", tx_id: tx_id}, + _header, + state + ) do + tx_id + |> send_rtmp_payload(state.socket, state.message_parser.chunk_size, chunk_stream_id: 3) + + {:cont, state} + end + + defp do_handle_client_message(%Messages.Anonymous{name: "deleteStream"}, _header, state) do + # We could send `:end_of_stream` here, however more reliable method is to wait for `:tcp_closed` message on the socket. + {:cont, state} + end + + defp do_handle_client_message(%Messages.Anonymous{} = message, _header, state) do + Logger.debug("Unknown message: #{inspect(message)}") + + {:cont, state} + end + + defp request_packet(socket) do + :ok = :inet.setopts(socket, active: :once) + end + + defp get_media_actions(rtmp_header, data, state) do + payload = + get_flv_tag(rtmp_header, data) + |> (&if(state.header_sent?, do: &1, else: get_flv_header() <> &1)).() + + actions = [{:buffer, {:output, %Buffer{payload: payload}}} | state.actions] + %{state | header_sent?: true, actions: actions} + end + + defp get_flv_header() do + alias Membrane.FLV + + {header, 0} = + FLV.Serializer.serialize(%FLV.Header{audio_present?: true, video_present?: true}, 0) + + # Add PreviousTagSize, which is 0 for the first tag + header <> <<0::32>> + end + + defp get_flv_tag( + %Membrane.RTMP.Header{ + timestamp: timestamp, + body_size: data_size, + type_id: type_id, + stream_id: stream_id + }, + payload + ) do + tag_size = data_size + 11 + + <> + end + + defp send_rtmp_payload(message, socket, chunk_size, opts \\ []) do + type = Serializer.type(message) + body = Serializer.serialize(message) + + chunk_stream_id = Keyword.get(opts, :chunk_stream_id, 2) + + header = + [chunk_stream_id: chunk_stream_id, type_id: type, body_size: byte_size(body)] + |> Keyword.merge(opts) + |> Header.new() + |> Header.serialize() + + payload = Message.chunk_payload(body, chunk_stream_id, chunk_size) + + :gen_tcp.send(socket, [header | payload]) + end + + defp validation_action(state, result) do + notification = + case result do + {:ok, msg} -> {:notify, {:stream_validation_success, msg}} + {:error, reason} -> {:notify, {:stream_validation_error, reason}} + end + + Map.update!(state, :actions, &[notification | &1]) + end + + # The RTMP connection is based on TCP therefore we are operating on a continuous stream of bytes. + # In such case packets received on TCP sockets may contain a partial RTMP packet or several full packets. + # + # `MessageParser` is already able to request more data if packet is incomplete but it is not aware + # if its current buffer contains more than one message, therefore we need to call the `&MessageParser.handle_packet/2` + # as long as we decide to receive more messages (before starting to relay media packets). + # + # Once we hit `:need_more_data` the function returns the list of parsed messages and the message_parser then is ready + # to receive more data to continue with emitting new messages. + @spec parse_packet_messages(packet :: binary(), message_parser :: struct(), [{any(), any()}]) :: + {[Message.t()], message_parser :: struct()} + def parse_packet_messages(packet, message_parser, messages \\ []) + + def parse_packet_messages(<<>>, %{buffer: <<>>} = message_parser, messages) do + {Enum.reverse(messages), message_parser} + end + + def parse_packet_messages(packet, message_parser, messages) do + case MessageParser.handle_packet(packet, message_parser) do + {header, message, message_parser} -> + parse_packet_messages(<<>>, message_parser, [{header, message} | messages]) + + {:need_more_data, message_parser} -> + {Enum.reverse(messages), message_parser} + + {:handshake_done, message_parser} -> + parse_packet_messages(<<>>, message_parser, messages) + + {%Handshake.Step{} = step, message_parser} -> + parse_packet_messages(<<>>, message_parser, [{nil, step} | messages]) + end + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/message_parser.ex b/lib/membrane_rtmp_plugin/rtmp/source/message_parser.ex new file mode 100644 index 00000000..ac71fefc --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/message_parser.ex @@ -0,0 +1,231 @@ +defmodule Membrane.RTMP.MessageParser do + @moduledoc false + + require Membrane.Logger + + alias Membrane.RTMP.{Handshake, Header, Message, Messages} + + @enforce_keys [:state_machine, :buffer, :chunk_size, :handshake] + defstruct @enforce_keys ++ [previous_headers: %{}, current_tx_id: 1] + + @type state_machine_t :: + :handshake | :connecting | :connected + + @type packet_t :: binary() + + @type t :: %__MODULE__{ + state_machine: state_machine_t(), + buffer: binary(), + previous_headers: map(), + # the chunk size of incoming messages (the other side of connection) + chunk_size: non_neg_integer(), + current_tx_id: non_neg_integer(), + handshake: Handshake.State.t() + } + + @doc """ + Initializes the RTMP MessageParser. + + The MessageParser starts in a handshake process which is dictated by the passed + handshake state. + """ + @spec init(Handshake.State.t(), Keyword.t()) :: t() + def init(handshake, opts \\ []) do + chunk_size = Keyword.get(opts, :chunk_size, 128) + + %__MODULE__{ + state_machine: :handshake, + buffer: <<>>, + # previous header for each of the stream chunks + previous_headers: %{}, + chunk_size: chunk_size, + handshake: handshake + } + end + + @doc """ + Generates a list of the following transaction tx_ids. + + Updates the internal transaction id counter so that + the MessageParser can be further used for generating the next ones. + """ + @spec generate_tx_ids(t(), n :: non_neg_integer()) :: {list(non_neg_integer()), t()} + def generate_tx_ids(%__MODULE__{current_tx_id: tx_id} = message_parser, n) when n > 0 do + tx_ids = Enum.to_list(tx_id..(tx_id + n - 1)) + + {tx_ids, %{message_parser | current_tx_id: tx_id + n}} + end + + @spec handle_packet(packet_t(), t()) :: + {Handshake.Step.t() | :need_more_data | :handshake_done | binary(), t()} + | {Header.t(), Message.t(), t()} + def handle_packet(packet, state) + + def handle_packet( + packet, + %{state_machine: :connected, buffer: buffer, chunk_size: chunk_size} = state + ) do + payload = buffer <> packet + + case read_frame(payload, state.previous_headers, chunk_size) do + {:error, :need_more_data} -> + {:need_more_data, %__MODULE__{state | buffer: payload}} + + {header, message, rest} -> + state = update_state_with_message(state, header, message, rest) + + {header, message, state} + end + end + + def handle_packet( + packet, + %{state_machine: :handshake, buffer: buffer, handshake: handshake} = state + ) do + payload = buffer <> packet + + step_size = Handshake.expects_bytes(handshake) + + if byte_size(payload) >= step_size do + <> = payload + + case Handshake.handle_step(step_data, handshake) do + {:continue_handshake, step, handshake} -> + # continue with the handshake + {step, %__MODULE__{state | buffer: rest, handshake: handshake}} + + # the handshake is done but with last step to return + {:handshake_finished, step, _handshake} -> + {step, + %__MODULE__{ + state + | buffer: rest, + handshake: nil, + state_machine: fsm_transition(:handshake) + }} + + # the handshake is done without further steps + {:handshake_finished, _handshake} -> + {:handshake_done, + %__MODULE__{ + state + | buffer: rest, + handshake: nil, + state_machine: fsm_transition(:handshake) + }} + + {:error, {:invalid_handshake_step, step_type}} -> + raise "Invalid handshake step: #{step_type}" + end + else + {:need_more_data, %__MODULE__{state | buffer: payload}} + end + end + + def handle_packet( + packet, + %{state_machine: :connecting, buffer: buffer, chunk_size: chunk_size} = state + ) do + payload = buffer <> packet + + case read_frame(payload, state.previous_headers, chunk_size) do + {:error, :need_more_data} -> + {:need_more_data, %__MODULE__{state | buffer: payload}} + + {header, message, rest} -> + state = update_state_with_message(state, header, message, rest) + + {header, message, state} + end + end + + defp read_frame(packet, previous_headers, chunk_size) do + case Header.deserialize(packet, previous_headers) do + {%Header{body_size: body_size} = header, rest} -> + chunked_body_size = + if body_size > chunk_size do + # if a message's body is greater than the chunk size then + # after every chunk_size's bytes there is a 0x03 one byte header that + # needs to be stripped and is not counted into the body_size + headers_to_strip = div(body_size - 1, chunk_size) + + body_size + headers_to_strip + else + body_size + end + + if chunked_body_size <= byte_size(rest) do + <> = rest + + combined_body = combine_body_chunks(body, chunk_size) + + message = Message.deserialize_message(header.type_id, combined_body) + + {header, message, rest} + else + {:error, :need_more_data} + end + + {:error, :need_more_data} = error -> + error + end + end + + # message's size can exceed the defined chunk size + # in this case the message gets divided into + # a sequence of smaller packets separated by the a header type 3 byte + # (the first 2 bits has to be 0b11) + defp combine_body_chunks(body, chunk_size) do + if byte_size(body) <= chunk_size do + body + else + do_combine_body_chunks(body, chunk_size, <<>>) + end + end + + defp do_combine_body_chunks(body, chunk_size, acc) when byte_size(body) <= chunk_size do + acc <> body + end + + defp do_combine_body_chunks(body, chunk_size, acc) do + # cut out the header byte (staring with 0b11) + <> = body + + do_combine_body_chunks(rest, chunk_size, acc <> body) + end + + # in case of client interception the Publish message indicates successful connection + # (unless proxy temrinates the connection) and medai can be relayed + defp message_fsm_transition(%Messages.Publish{}), do: :connected + + # when receiving audio or video messages, we are remaining in connected state + defp message_fsm_transition(%Messages.Audio{}), do: :connected + defp message_fsm_transition(%Messages.Video{}), do: :connected + + # in case of server interception the `NetStream.Publish.Start` indicates + # that the connection has been successful and media can be relayed + defp message_fsm_transition(%Messages.Anonymous{ + name: "onStatus", + properties: [:null, %{"code" => "NetStream.Publish.Start"}] + }), + do: :connected + + defp message_fsm_transition(_message), do: :connecting + + defp fsm_transition(:handshake), do: :connecting + + defp update_state_with_message(state, header, message, rest) do + updated_headers = Map.put(state.previous_headers, header.chunk_stream_id, header) + + %__MODULE__{ + state + | chunk_size: maybe_update_chunk_size(message, state), + previous_headers: updated_headers, + buffer: rest, + state_machine: message_fsm_transition(message) + } + end + + defp maybe_update_chunk_size(%Messages.SetChunkSize{chunk_size: size}, _state), do: size + defp maybe_update_chunk_size(_size, %{chunk_size: size}), do: size +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/message_validator.ex b/lib/membrane_rtmp_plugin/rtmp/source/message_validator.ex new file mode 100644 index 00000000..79b1137a --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/message_validator.ex @@ -0,0 +1,44 @@ +defmodule Membrane.RTMP.MessageValidator do + @moduledoc """ + Behaviour module for implementing RTMP Message validators. + + Allows for verifying some of the RTMP messages. To create a custom validator module `use MessageValidator` + and override the specific callbacks. By default all other messages will be allowed. + """ + alias Membrane.RTMP.Messages + + @type validation_result_t :: {:ok, message :: any()} | {:error, reason :: any()} + + defmacro __using__(_) do + quote do + @behaviour Membrane.RTMP.MessageValidator + + alias Membrane.RTMP.Messages + + @impl true + def validate_release_stream(%Messages.ReleaseStream{}), do: {:ok, "release stream success"} + + @impl true + def validate_publish(%Messages.Publish{}), do: {:ok, "publish success"} + + @impl true + def validate_set_data_frame(%Messages.SetDataFrame{}), do: {:ok, "set data frame success"} + defoverridable Membrane.RTMP.MessageValidator + end + end + + @doc """ + Validates the `t:Membrane.RTMP.Messages.ReleaseStream.t/0` message. + """ + @callback validate_release_stream(Messages.ReleaseStream.t()) :: validation_result_t() + + @doc """ + Validates the `t:Membrane.RTMP.Messages.Publish.t/0` message. + """ + @callback validate_publish(Messages.Publish.t()) :: validation_result_t() + + @doc """ + Validates the `t:Membrane.RTMP.Messages.SetDataFrame.t/0` message. + """ + @callback validate_set_data_frame(Messages.SetDataFrame.t()) :: validation_result_t() +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/anonymous.ex b/lib/membrane_rtmp_plugin/rtmp/source/messages/anonymous.ex new file mode 100644 index 00000000..1ee5d54c --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/messages/anonymous.ex @@ -0,0 +1,41 @@ +defmodule Membrane.RTMP.Messages.Anonymous do + @moduledoc false + + @behaviour Membrane.RTMP.Message + + @enforce_keys [:name, :properties] + defstruct [tx_id: nil] ++ @enforce_keys + + @type t :: %__MODULE__{ + name: String.t(), + properties: any(), + tx_id: non_neg_integer() | nil + } + + @impl true + def from_data([name, tx_id | properties]) when is_binary(name) do + %__MODULE__{name: name, tx_id: tx_id, properties: properties} + end + + def from_data([name]) when is_binary(name) do + %__MODULE__{name: name, tx_id: nil, properties: []} + end + + defimpl Membrane.RTMP.Messages.Serializer do + require Membrane.RTMP.Header + + alias Membrane.RTMP.AMF.Encoder + + @impl true + def serialize(%@for{name: name, tx_id: nil, properties: properties}) do + Encoder.encode([name | properties]) + end + + def serialize(%@for{name: name, tx_id: tx_id, properties: properties}) do + Encoder.encode([name, tx_id | properties]) + end + + @impl true + def type(%@for{}), do: Membrane.RTMP.Header.type(:amf_command) + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/audio.ex b/lib/membrane_rtmp_plugin/rtmp/source/messages/audio.ex new file mode 100644 index 00000000..33b96d73 --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/messages/audio.ex @@ -0,0 +1,29 @@ +defmodule Membrane.RTMP.Messages.Audio do + @moduledoc false + + @behaviour Membrane.RTMP.Message + + @enforce_keys [:data] + defstruct @enforce_keys + + @type t :: %__MODULE__{ + data: binary() + } + + @impl true + def deserialize(<>) do + %__MODULE__{data: data} + end + + defimpl Membrane.RTMP.Messages.Serializer do + require Membrane.RTMP.Header + + @impl true + def serialize(%@for{data: data}) do + data + end + + @impl true + def type(%@for{}), do: Membrane.RTMP.Header.type(:audio_message) + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/command/connect.ex b/lib/membrane_rtmp_plugin/rtmp/source/messages/command/connect.ex new file mode 100644 index 00000000..98d852e1 --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/messages/command/connect.ex @@ -0,0 +1,64 @@ +defmodule Membrane.RTMP.Messages.Connect do + @moduledoc false + + @behaviour Membrane.RTMP.Message + + alias Membrane.RTMP.AMF.Encoder + + @enforce_keys [:app, :type, :supports_go_away, :flash_version, :swf_url, :tc_url] + defstruct @enforce_keys ++ [tx_id: 0] + + @type t :: %__MODULE__{ + app: String.t(), + type: String.t(), + supports_go_away: boolean(), + flash_version: String.t(), + tc_url: String.t(), + tx_id: non_neg_integer() + } + + @name "connect" + + @impl true + def from_data([@name, tx_id, properties]) do + %{ + "app" => app, + "type" => type, + "flashVer" => flash_version, + "tcUrl" => tc_url + } = properties + + %__MODULE__{ + app: app, + type: type, + supports_go_away: Map.get(properties, "supportsGoAway", false), + flash_version: flash_version, + swf_url: Map.get(properties, "swfUrl", tc_url), + tc_url: tc_url, + tx_id: tx_id + } + end + + defimpl Membrane.RTMP.Messages.Serializer do + require Membrane.RTMP.Header + + @impl true + def serialize(%@for{} = msg) do + Encoder.encode([ + "connect", + msg.tx_id, + %{ + "app" => msg.app, + "type" => msg.type, + "supportsGoAway" => msg.supports_go_away, + "flashVer" => msg.flash_version, + "swfUrl" => msg.swf_url, + "tcUrl" => msg.tc_url + } + ]) + end + + @impl true + def type(%@for{}), do: Membrane.RTMP.Header.type(:amf_command) + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/command/create_stream.ex b/lib/membrane_rtmp_plugin/rtmp/source/messages/command/create_stream.ex new file mode 100644 index 00000000..7cd5498f --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/messages/command/create_stream.ex @@ -0,0 +1,32 @@ +defmodule Membrane.RTMP.Messages.CreateStream do + @moduledoc false + + @behaviour Membrane.RTMP.Message + + alias Membrane.RTMP.AMF.Encoder + + defstruct tx_id: 0 + + @type t :: %__MODULE__{ + tx_id: non_neg_integer() + } + + @name "createStream" + + @impl true + def from_data([@name, tx_id, :null]) do + %__MODULE__{tx_id: tx_id} + end + + defimpl Membrane.RTMP.Messages.Serializer do + require Membrane.RTMP.Header + + @impl true + def serialize(%@for{tx_id: tx_id}) do + Encoder.encode(["createStream", tx_id, :null]) + end + + @impl true + def type(%@for{}), do: Membrane.RTMP.Header.type(:amf_command) + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/command/fc_publish.ex b/lib/membrane_rtmp_plugin/rtmp/source/messages/command/fc_publish.ex new file mode 100644 index 00000000..5a39c48a --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/messages/command/fc_publish.ex @@ -0,0 +1,34 @@ +defmodule Membrane.RTMP.Messages.FCPublish do + @moduledoc false + + @behaviour Membrane.RTMP.Message + + alias Membrane.RTMP.AMF.Encoder + + @enforce_keys [:stream_key] + defstruct [tx_id: 0] ++ @enforce_keys + + @type t :: %__MODULE__{ + stream_key: String.t(), + tx_id: non_neg_integer() + } + + @name "FCPublish" + + @impl true + def from_data([@name, tx_id, :null, stream_key]) do + %__MODULE__{tx_id: tx_id, stream_key: stream_key} + end + + defimpl Membrane.RTMP.Messages.Serializer do + require Membrane.RTMP.Header + + @impl true + def serialize(%@for{tx_id: tx_id, stream_key: stream_key}) do + Encoder.encode(["FCPublish", tx_id, :null, stream_key]) + end + + @impl true + def type(%@for{}), do: Membrane.RTMP.Header.type(:amf_command) + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/command/publish.ex b/lib/membrane_rtmp_plugin/rtmp/source/messages/command/publish.ex new file mode 100644 index 00000000..44627b7d --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/messages/command/publish.ex @@ -0,0 +1,37 @@ +defmodule Membrane.RTMP.Messages.Publish do + @moduledoc """ + Defines the RTMP `publish` command. + """ + + @behaviour Membrane.RTMP.Message + + alias Membrane.RTMP.AMF.Encoder + + @enforce_keys [:stream_key, :publish_type] + defstruct [tx_id: 0] ++ @enforce_keys + + @type t :: %__MODULE__{ + stream_key: String.t(), + publish_type: String.t(), + tx_id: non_neg_integer() + } + + @name "publish" + + @impl true + def from_data([@name, tx_id, :null, stream_key, publish_type]) do + %__MODULE__{tx_id: tx_id, stream_key: stream_key, publish_type: publish_type} + end + + defimpl Membrane.RTMP.Messages.Serializer do + require Membrane.RTMP.Header + + @impl true + def serialize(%@for{tx_id: tx_id, stream_key: stream_key, publish_type: publish_type}) do + Encoder.encode(["publish", tx_id, :null, stream_key, publish_type]) + end + + @impl true + def type(%@for{}), do: Membrane.RTMP.Header.type(:amf_command) + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/command/release_stream.ex b/lib/membrane_rtmp_plugin/rtmp/source/messages/command/release_stream.ex new file mode 100644 index 00000000..d0816cf2 --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/messages/command/release_stream.ex @@ -0,0 +1,36 @@ +defmodule Membrane.RTMP.Messages.ReleaseStream do + @moduledoc """ + Defines the RTMP `releaseStream` command. + """ + + @behaviour Membrane.RTMP.Message + + alias Membrane.RTMP.AMF.Encoder + + @enforce_keys [:stream_key] + defstruct [tx_id: 0] ++ @enforce_keys + + @type t :: %__MODULE__{ + stream_key: String.t(), + tx_id: non_neg_integer() + } + + @name "releaseStream" + + @impl true + def from_data([@name, tx_id, :null, stream_key]) do + %__MODULE__{tx_id: tx_id, stream_key: stream_key} + end + + defimpl Membrane.RTMP.Messages.Serializer do + require Membrane.RTMP.Header + + @impl true + def serialize(%@for{tx_id: tx_id, stream_key: stream_key}) do + Encoder.encode(["releaseStream", tx_id, :null, stream_key]) + end + + @impl true + def type(%@for{}), do: Membrane.RTMP.Header.type(:amf_command) + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/serializer.ex b/lib/membrane_rtmp_plugin/rtmp/source/messages/serializer.ex new file mode 100644 index 00000000..5c6c68b0 --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/messages/serializer.ex @@ -0,0 +1,15 @@ +defprotocol Membrane.RTMP.Messages.Serializer do + @moduledoc false + + @doc """ + Serializes an RTMP message (without header) into RTMP body binary format. + """ + @spec serialize(struct()) :: binary() + def serialize(message) + + @doc """ + Returns the message's type required by the RTMP header. + """ + @spec type(struct()) :: non_neg_integer() + def type(message) +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/set_chunk_size.ex b/lib/membrane_rtmp_plugin/rtmp/source/messages/set_chunk_size.ex new file mode 100644 index 00000000..8d9a72a6 --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/messages/set_chunk_size.ex @@ -0,0 +1,29 @@ +defmodule Membrane.RTMP.Messages.SetChunkSize do + @moduledoc false + + @behaviour Membrane.RTMP.Message + + @enforce_keys [:chunk_size] + defstruct @enforce_keys + + @type t :: %__MODULE__{ + chunk_size: String.t() + } + + @impl true + def deserialize(<<0::1, chunk_size::31>>) do + %__MODULE__{chunk_size: chunk_size} + end + + defimpl Membrane.RTMP.Messages.Serializer do + require Membrane.RTMP.Header + + @impl true + def serialize(%@for{chunk_size: chunk_size}) do + <<0::1, chunk_size::31>> + end + + @impl true + def type(%@for{}), do: Membrane.RTMP.Header.type(:set_chunk_size) + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/set_data_frame.ex b/lib/membrane_rtmp_plugin/rtmp/source/messages/set_data_frame.ex new file mode 100644 index 00000000..20f07ec8 --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/messages/set_data_frame.ex @@ -0,0 +1,90 @@ +defmodule Membrane.RTMP.Messages.SetDataFrame do + @moduledoc """ + Defines the RTMP `setDataFrame` command. + """ + + @behaviour Membrane.RTMP.Message + + alias Membrane.RTMP.AMF.Encoder + + @enforce_keys [:duration, :file_size, :encoder] + defstruct @enforce_keys ++ + ~w(width height video_codec_id video_data_rate framerate audio_codec_id + audio_data_rate audio_sample_rate audio_sample_size stereo)a + + @attributes_to_keys %{ + "duration" => :duration, + "fileSize" => :file_size, + "filesize" => :file_size, + "width" => :width, + "height" => :height, + "videocodecid" => :video_codec_id, + "videodatarate" => :video_data_rate, + "framerate" => :framerate, + "audiocodecid" => :audio_codec_id, + "audiodatarate" => :audio_data_rate, + "audiosamplerate" => :audio_sample_rate, + "audiosamplesize" => :audio_sample_size, + "stereo" => :stereo, + "encoder" => :encoder + } + + @keys_to_attributes Map.new(@attributes_to_keys, fn {key, value} -> {value, key} end) + + @type t :: %__MODULE__{ + duration: number(), + file_size: number(), + # video related + width: number(), + height: number(), + video_codec_id: number(), + video_data_rate: number(), + framerate: number(), + # audio related + audio_codec_id: number(), + audio_data_rate: number(), + audio_sample_rate: number(), + audio_sample_size: number(), + stereo: boolean() + } + + @impl true + def from_data(["@setDataFrame", "onMetaData", properties]) do + new(properties) + end + + @spec new([{String.t(), any()}]) :: t() + def new(options) do + params = + options + |> Map.new() + |> Map.take(Map.keys(@attributes_to_keys)) + |> Enum.map(fn {key, value} -> + {Map.fetch!(@attributes_to_keys, key), value} + end) + + struct!(__MODULE__, params) + end + + # helper for message serialization + @doc false + @spec to_map(t()) :: map() + def to_map(%__MODULE__{} = message) do + message + |> Map.from_struct() + |> Enum.map(fn {key, value} -> {Map.fetch!(@keys_to_attributes, key), value} end) + |> Map.new() + end + + defimpl Membrane.RTMP.Messages.Serializer do + require Membrane.RTMP.Header + + @impl true + def serialize(%@for{} = message) do + Encoder.encode([@for.to_map(message)]) + end + + @impl true + def type(%@for{}), do: Membrane.RTMP.Header.type(:amf_data) + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/set_peer_bandwidth.ex b/lib/membrane_rtmp_plugin/rtmp/source/messages/set_peer_bandwidth.ex new file mode 100644 index 00000000..b64c9d89 --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/messages/set_peer_bandwidth.ex @@ -0,0 +1,31 @@ +defmodule Membrane.RTMP.Messages.SetPeerBandwidth do + @moduledoc false + + @behaviour Membrane.RTMP.Message + + @enforce_keys [:size] + defstruct @enforce_keys + + @type t :: %__MODULE__{ + size: non_neg_integer() + } + + @limit_type 0x02 + + @impl true + def deserialize(<>) do + %__MODULE__{size: size} + end + + defimpl Membrane.RTMP.Messages.Serializer do + require Membrane.RTMP.Header + + @impl true + def serialize(%@for{size: size}) do + <> + end + + @impl true + def type(%@for{}), do: Membrane.RTMP.Header.type(:set_peer_bandwidth) + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/user_control.ex b/lib/membrane_rtmp_plugin/rtmp/source/messages/user_control.ex new file mode 100644 index 00000000..f2a8d325 --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/messages/user_control.ex @@ -0,0 +1,30 @@ +defmodule Membrane.RTMP.Messages.UserControl do + @moduledoc false + + @behaviour Membrane.RTMP.Message + + @enforce_keys [:event_type, :data] + defstruct @enforce_keys + + @type t :: %__MODULE__{ + event_type: non_neg_integer(), + data: binary() + } + + @impl true + def deserialize(<>) do + %__MODULE__{event_type: event_type, data: data} + end + + defimpl Membrane.RTMP.Messages.Serializer do + require Membrane.RTMP.Header + + @impl true + def serialize(%@for{event_type: event_type, data: data}) do + <> + end + + @impl true + def type(%@for{}), do: Membrane.RTMP.Header.type(:user_control_message) + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/video.ex b/lib/membrane_rtmp_plugin/rtmp/source/messages/video.ex new file mode 100644 index 00000000..9fc6df48 --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/messages/video.ex @@ -0,0 +1,29 @@ +defmodule Membrane.RTMP.Messages.Video do + @moduledoc false + + @behaviour Membrane.RTMP.Message + + @enforce_keys [:data] + defstruct @enforce_keys + + @type t :: %__MODULE__{ + data: binary() + } + + @impl true + def deserialize(<>) do + %__MODULE__{data: data} + end + + defimpl Membrane.RTMP.Messages.Serializer do + require Membrane.RTMP.Header + + @impl true + def serialize(%@for{data: data}) do + data + end + + @impl true + def type(%@for{}), do: Membrane.RTMP.Header.type(:video_message) + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/messages/window_acknowledgement.ex b/lib/membrane_rtmp_plugin/rtmp/source/messages/window_acknowledgement.ex new file mode 100644 index 00000000..9a148dd8 --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/messages/window_acknowledgement.ex @@ -0,0 +1,29 @@ +defmodule Membrane.RTMP.Messages.WindowAcknowledgement do + @moduledoc false + + @behaviour Membrane.RTMP.Message + + @enforce_keys [:size] + defstruct @enforce_keys + + @type t :: %__MODULE__{ + size: non_neg_integer() + } + + @impl true + def deserialize(<>) do + %__MODULE__{size: size} + end + + defimpl Membrane.RTMP.Messages.Serializer do + require Membrane.RTMP.Header + + @impl true + def serialize(%@for{size: size}) do + <> + end + + @impl true + def type(%@for{}), do: Membrane.RTMP.Header.type(:window_acknowledgement_size) + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/native.ex b/lib/membrane_rtmp_plugin/rtmp/source/native.ex deleted file mode 100644 index 4ef638b0..00000000 --- a/lib/membrane_rtmp_plugin/rtmp/source/native.ex +++ /dev/null @@ -1,81 +0,0 @@ -defmodule Membrane.RTMP.Source.Native do - @moduledoc false - use Unifex.Loader - - require Logger - - alias Membrane.Time - - @one_second Time.second() - - @spec start_link(url :: String.t(), timeout :: integer()) :: pid() - def start_link(url, timeout) do - {:ok, native_ref} = create() - caller_pid = self() - - spawn(fn -> - ref = Process.monitor(caller_pid) - - receive do - {:DOWN, ^ref, :process, ^caller_pid, _reason} -> set_terminate(native_ref) - end - end) - - timeout = get_int_timeout(timeout) - - spawn_link(fn -> - Process.monitor(caller_pid) - send(self(), {:await_connection, url, timeout}) - receive_loop(native_ref, caller_pid) - end) - end - - defp get_int_timeout(:infinity), do: 0 - - defp get_int_timeout(time) when rem(time, @one_second) != 0 do - raise ArgumentError, - "Timeout must be a multiply of one second. #{Time.pretty_duration(time)} is not" - end - - defp get_int_timeout(time) do - time |> Time.as_seconds() |> Ratio.trunc() - end - - defp receive_loop(native_ref, target) do - receive do - {:await_connection, url, timeout} -> - await_connection(native_ref, target, url, timeout) - - :get_frame -> - result = read_frame(native_ref) - send(target, {__MODULE__, :read_frame, result}) - if result == :end_of_stream, do: :stop, else: :continue - - {:DOWN, _ref, :process, _pid, _reason} -> - :stop - - :terminate -> - :stop - end - |> case do - :continue -> receive_loop(native_ref, target) - :stop -> :ok - end - end - - defp await_connection(native, target, url, timeout) do - case await_open(native, url, timeout) do - {:ok, native_ref} -> - Logger.debug("Connection established @ #{url}") - send(self(), :get_frame) - send(target, {__MODULE__, :format_info_ready, native_ref}) - :continue - - {:error, :interrupted} -> - :stop - - {:error, reason} -> - raise "Failed to open input from #{url}. Reason: `#{reason}`" - end - end -end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/responses.ex b/lib/membrane_rtmp_plugin/rtmp/source/responses.ex new file mode 100644 index 00000000..780dbd62 --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/responses.ex @@ -0,0 +1,79 @@ +defmodule Membrane.RTMP.Responses do + @moduledoc false + + alias Membrane.RTMP.Messages + + @type transaction_id_t :: float() | non_neg_integer() + + @doc """ + Returns a default success response on connect request. + """ + @spec connection_success(transaction_id_t) :: struct() + def connection_success(tx_id) do + %Messages.Anonymous{ + name: "_result", + tx_id: tx_id, + properties: [ + %{ + "fmsVer" => "FMS/3,0,1,123", + "capabilities" => 31.0 + }, + %{ + "level" => "status", + "code" => "NetConnection.Connect.Success", + "description" => "Connection succeeded.", + "objectEncoding" => 0.0 + } + ] + } + end + + @doc """ + Returns a publishment success message. + """ + @spec publish_success(String.t()) :: struct() + def publish_success(stream_key) do + %Messages.Anonymous{ + name: "onStatus", + tx_id: 0, + properties: [ + :null, + %{ + "level" => "status", + "code" => "NetStream.Publish.Start", + "description" => "#{stream_key} is now published", + "details" => stream_key + } + ] + } + end + + @doc """ + Returns a bandwidth measurement done message. + """ + @spec on_bw_done() :: struct() + def on_bw_done() do + %Messages.Anonymous{ + name: "onStatus", + tx_id: 0, + properties: [ + :null, + 8192.0 + ] + } + end + + @doc """ + Returns a default `_result` response with arbitrary body. + + The body can be set by specifying the properties list. + """ + @spec default_result(transaction_id_t(), [any()]) :: struct() + def default_result(tx_id, properties \\ []) do + %Messages.Anonymous{ + name: "_result", + tx_id: tx_id, + properties: properties + } + end +end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/source.ex b/lib/membrane_rtmp_plugin/rtmp/source/source.ex index ba90e36e..5fa56acb 100644 --- a/lib/membrane_rtmp_plugin/rtmp/source/source.ex +++ b/lib/membrane_rtmp_plugin/rtmp/source/source.ex @@ -1,150 +1,171 @@ defmodule Membrane.RTMP.Source do @moduledoc """ - Membrane Element for receiving RTMP streams. Acts as a RTMP Server. - This implementation is limited to only AAC and H264 streams. + Membrane Element for receiving an RTMP stream. Acts as a RTMP Server. + + When initializing, the source sends `t:socket_control_needed_t/0` notification, + upon which it should be granted the control over the `socket` via `:gen_tcp.controlling_process/2`. + + The Source allows for providing custom validator module, that verifies some of the RTMP messages. + The module has to implement the `Membrane.RTMP.MessageValidator` behaviour. + If the validation fails, a `t:stream_validation_failed_t/0` notification is sent. - Implementation based on FFmpeg + This implementation is limited to only AAC and H264 streams. """ use Membrane.Source require Membrane.Logger - alias __MODULE__.Native - alias Membrane.{Buffer, Time} - - alias Membrane.RTMP.AVC.Utils + alias Membrane.RTMP.{Handshake, MessageHandler, MessageParser} - def_output_pad :audio, + def_output_pad :output, availability: :always, - caps: Membrane.AAC.RemoteStream, + caps: Membrane.RemoteStream, mode: :pull - def_output_pad :video, - availability: :always, - caps: Membrane.H264.RemoteStream, - mode: :pull - - def_options url: [ - spec: binary(), + def_options socket: [ + spec: :gen_tcp.socket(), description: """ - URL on which the FFmpeg instance will be created + Socket on which the source will be receiving the RTMP stream. The socket must be already connected to the RTMP client and be in non-active mode (`active` set to `false`). """ ], - timeout: [ - spec: Time.t() | :infinity, - default: :infinity, + validator: [ + spec: Membrane.RTMP.MessageValidator, description: """ - Time the server will wait for a connection from the client - - Duration given must be a multiply of one second or atom `:infinity`. + A Module implementing `Membrane.RTMP.MessageValidator` behaviour, used for validating the stream. """ ] + @typedoc """ + Notification sent when the RTMP Source element is initialized and it should be granted control over the socket using `:gen_tcp.controlling_process/2`. + """ + @type socket_control_needed_t() :: {:socket_control_needed, :gen_tcp.socket(), pid()} + @typedoc """ + Notification sent when the validator denies incoming RTMP stream. + """ + @type stream_validation_failed_t() :: {:stream_validation_failed, String.t()} + @impl true def handle_init(%__MODULE__{} = opts) do - {:ok, + {{:ok, [notify: {:socket_control_needed, opts.socket, self()}]}, Map.from_struct(opts) - |> Map.merge(%{provider: nil, stale_frame: nil})} + |> Map.merge(%{ + actions: [], + header_sent?: false, + message_parser: MessageParser.init(Handshake.init_server()), + receiver_pid: nil, + socket_ready?: false, + # how many times the Source tries to get control of the socket + socket_retries: 3, + # epoch required for performing a handshake with the pipeline + epoch: 0 + })} end @impl true def handle_prepared_to_playing(_ctx, state) do - pid = Native.start_link(state.url, state.timeout) - {:ok, %{state | provider: pid}} + target_pid = self() + + {:ok, receiver_process} = + Task.start_link(fn -> + receive_loop(state.socket, target_pid) + end) + + send(self(), :start_receiving) + + actions = [ + caps: {:output, %Membrane.RemoteStream{content_format: Membrane.FLV, type: :bytestream}} + ] + + {{:ok, actions}, %{state | receiver_pid: receiver_process}} end - @impl true - def handle_demand(type, _size, _unit, _ctx, %{stale_frame: {type, buffer}} = state) do - # There is stale frame, which indicates that that the source was blocked waiting for demand from one of the outputs - # It now arrived, so we request next frame and output the one that blocked us - send(state.provider, :get_frame) - {{:ok, buffer: {type, buffer}}, %{state | stale_frame: nil}} + defp receive_loop(socket, target) do + receive do + {:tcp, _port, packet} -> + send(target, {:tcp, socket, packet}) + + {:tcp_closed, _port} -> + send(target, {:tcp_closed, socket}) + + :terminate -> + exit(:normal) + + _message -> + :noop + end + + receive_loop(socket, target) end @impl true - def handle_demand(_type, _size, _unit, _ctx, state) do + def handle_demand(_pad, _size, _unit, _ctx, state) when state.socket_ready? do + :ok = :inet.setopts(state.socket, active: :once) {:ok, state} end @impl true - def handle_other({Native, :format_info_ready, native_ref}, _ctx, state) do - actions = get_format_info_actions(native_ref) - {{:ok, actions}, state} + def handle_demand(_pad, _size, _unit, _ctx, state) do + {:ok, state} end @impl true - def handle_other({Native, :read_frame, {:ok, type, pts, dts, frame}}, ctx, state) - when ctx.playback_state == :playing do - pts = Time.milliseconds(pts) - dts = Time.milliseconds(dts) - - buffer = %Buffer{ - pts: pts, - dts: dts, - payload: prepare_payload(type, frame) - } - - if get_in(ctx.pads, [type, :demand]) > 0 do - send(state.provider, :get_frame) - {{:ok, buffer: {type, buffer}}, state} - else - # if there is no demand for element of this type so we wait until it appears - # effectively, it results in source adapting to the slower of the two outputs - {:ok, %{state | stale_frame: {type, buffer}}} - end + def handle_playing_to_prepared(_ctx, state) do + send(state.receiver_pid, :terminate) + {:ok, %{state | receiver_pid: nil}} end @impl true - def handle_other({Native, :read_frame, :end_of_stream}, _ctx, state) do - Membrane.Logger.debug("Received end of stream") - {{:ok, end_of_stream: :audio, end_of_stream: :video}, state} + def handle_other(:start_receiving, _ctx, %{socket_retries: 0} = state) do + Membrane.Logger.warn("Failed to take control of the socket") + {:ok, state} end - @impl true - def handle_other({Native, :read_frame, {:error, reason}}, _ctx, _state) do - raise "Fetching of the frame failed. Reason: #{inspect(reason)}" + def handle_other(:start_receiving, _ctx, %{socket_retries: retries} = state) do + case :gen_tcp.controlling_process(state.socket, state.receiver_pid) do + :ok -> + :ok = :inet.setopts(state.socket, active: :once) + {:ok, %{state | socket_ready?: true}} + + {:error, :not_owner} -> + Process.send_after(self(), :start_receiving, 200) + {:ok, %{state | socket_retries: retries - 1}} + end end @impl true - def handle_playing_to_prepared(_ctx, state) do - send(state.provider, :terminate) - Process.unlink(state.provider) - {:ok, %{state | provider: nil}} - end + def handle_other({:tcp, socket, packet}, _ctx, state) do + state = %{state | socket: socket} - defp prepare_payload(:video, payload), do: Utils.to_annex_b(payload) - defp prepare_payload(:audio, payload), do: payload + {messages, message_parser} = + MessageHandler.parse_packet_messages(packet, state.message_parser) - defp get_format_info_actions(native) do - [ - get_audio_params(native), - get_video_params(native) - ] - |> Enum.concat() + state = MessageHandler.handle_client_messages(messages, state) + {actions, state} = get_actions(state) + + {{:ok, actions}, %{state | message_parser: message_parser}} end - defp get_audio_params(native) do - with {:ok, asc} <- Native.get_audio_params(native) do - caps = %Membrane.AAC.RemoteStream{ - audio_specific_config: asc - } + @impl true + def handle_other({:tcp_closed, _port}, _ctx, state) do + {{:ok, end_of_stream: :output}, state} + end - [caps: {:audio, caps}] - else - {:error, _reason} -> [] - end + @impl true + def handle_other(_message, _ctx, state) do + {:ok, state} end - defp get_video_params(native) do - with {:ok, config} <- Native.get_video_params(native) do - caps = %Membrane.H264.RemoteStream{ - decoder_configuration_record: config, - stream_format: :byte_stream - } + defp get_actions(state) do + case state do + %{validation_error: notification} -> + state = %{state | actions: [], socket_ready?: false} |> Map.delete(:validation_error) + {[notification], state} + + %{actions: [_buf | _rest] = actions} -> + {actions, %{state | actions: []}} - [caps: {:video, caps}] - else - {:error, _reason} -> [] + _state -> + {[], state} end end end diff --git a/lib/membrane_rtmp_plugin/rtmp/source/tcp_server.ex b/lib/membrane_rtmp_plugin/rtmp/source/tcp_server.ex new file mode 100644 index 00000000..9b3e6366 --- /dev/null +++ b/lib/membrane_rtmp_plugin/rtmp/source/tcp_server.ex @@ -0,0 +1,60 @@ +defmodule Membrane.RTMP.Source.TcpServer do + @moduledoc """ + A simple tcp server, which handles each new incoming connection. + + The `socket_handler` function passed inside the options should take the socket returned by `:gen_tcp.accept/1` + and return `{:ok, pid}`, where the `pid` describes a process, which will be interacting with the socket. + `Membrane.RTMP.Source.TcpServer` will grant that process control over the socket via `:gen_tcp.controlling_process/2`. + """ + + use Task + + @enforce_keys [:port, :listen_options, :socket_handler] + + defstruct @enforce_keys ++ [:parent] + + @typedoc """ + Defines options for the TCP server. + The `listen_options` are passed to the `:gen_tcp.listen/2` function. + The `socket_handler` is a function that takes socket returned by `:gen_tcp.accept/1` and returns the pid of a process, + which will be interacting with the socket. TcpServer will grant that process control over the socket via `:gen_tcp.controlling_process/2`. + """ + @type t :: %__MODULE__{ + port: :inet.port_number(), + listen_options: [:inet.inet_backend() | :gen_tcp.listen_option()], + socket_handler: (:gen_tcp.socket() -> {:ok, pid} | {:error, reason :: any()}), + parent: pid() + } + + @spec start_link(t()) :: {:ok, pid} + def start_link(options) do + Task.start_link(__MODULE__, :run, [options]) + end + + @spec run(t()) :: nil + def run(options) do + {:ok, socket} = :gen_tcp.listen(options.port, options.listen_options) + if options.parent, do: send(options.parent, {:tcp_server_started, socket}) + + accept_loop(socket, options.socket_handler) + end + + defp accept_loop(socket, socket_handler) do + {:ok, client} = :gen_tcp.accept(socket) + {:ok, handler_task} = Task.start(fn -> serve(client, socket_handler) end) + + :ok = :gen_tcp.controlling_process(client, handler_task) + send(handler_task, :control_granted) + + accept_loop(socket, socket_handler) + end + + defp serve(socket, socket_handler) do + {:ok, pid} = socket_handler.(socket) + + receive do + :control_granted -> + :ok = :gen_tcp.controlling_process(socket, pid) + end + end +end diff --git a/mix.exs b/mix.exs index 7f80594c..3ce97f37 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.RTMP.Mixfile do use Mix.Project - @version "0.8.1" + @version "0.9.0" @github_url "https://github.com/membraneframework/membrane_rtmp_plugin" def project do @@ -38,16 +38,17 @@ defmodule Membrane.RTMP.Mixfile do defp deps do [ - {:membrane_core, "~> 0.10.0"}, + {:membrane_core, "~> 0.10"}, {:unifex, "~> 1.0"}, {:membrane_h264_ffmpeg_plugin, "~> 0.22.0"}, {:membrane_aac_plugin, "~> 0.12.1"}, {:membrane_mp4_plugin, "~> 0.16.0"}, + {:membrane_flv_plugin, "~> 0.3.1"}, {:membrane_file_plugin, "~> 0.12.0"}, # testing - {:membrane_hackney_plugin, "~> 0.8.0", only: :test}, - {:ffmpex, "~> 0.7", only: :test}, - {:membrane_stream_plugin, "~> 0.1", only: :test}, + {:membrane_hackney_plugin, "~> 0.8.2", only: :test}, + {:ffmpex, "~> 0.10.0", only: :test}, + {:membrane_stream_plugin, "~> 0.1.0", only: :test}, # development {:ex_doc, "~> 0.28", only: :dev, runtime: false}, {:dialyxir, "~> 1.1", only: :dev, runtime: false}, diff --git a/mix.lock b/mix.lock index af1f012c..83c15ae0 100644 --- a/mix.lock +++ b/mix.lock @@ -12,7 +12,7 @@ "earmark_parser": {:hex, :earmark_parser, "1.4.28", "0bf6546eb7cd6185ae086cbc5d20cd6dbb4b428aad14c02c49f7b554484b4586", [:mix], [], "hexpm", "501cef12286a3231dc80c81352a9453decf9586977f917a96e619293132743fb"}, "elixir_make": {:hex, :elixir_make, "0.6.3", "bc07d53221216838d79e03a8019d0839786703129599e9619f4ab74c8c096eac", [:mix], [], "hexpm", "f5cbd651c5678bcaabdbb7857658ee106b12509cd976c2c2fca99688e1daf716"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, - "ex_doc": {:hex, :ex_doc, "0.28.5", "3e52a6d2130ce74d096859e477b97080c156d0926701c13870a4e1f752363279", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d2c4b07133113e9aa3e9ba27efb9088ba900e9e51caa383919676afdf09ab181"}, + "ex_doc": {:hex, :ex_doc, "0.28.6", "2bbd7a143d3014fc26de9056793e97600ae8978af2ced82c2575f130b7c0d7d7", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bca1441614654710ba37a0e173079273d619f9160cbcc8cd04e6bd59f1ad0e29"}, "ffmpex": {:hex, :ffmpex, "0.10.0", "ce29281eac60bf109c05acb4342eecf813a3cd3f08c1bce350423caad86128af", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}, {:rambo, "~> 0.3.0", [hex: :rambo, repo: "hexpm", optional: false]}], "hexpm", "de8d81f8c51cc258dcee9a3e0b1568b0659c97be004557d9af47795206cff53b"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, "hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~>2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"}, @@ -27,6 +27,7 @@ "membrane_common_c": {:hex, :membrane_common_c, "0.13.0", "c314623f93209eb2fa092379954c686f6e50ac89baa48360f836d24f4d53f5ee", [:mix], [{:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "90181fbbe481ccd0a4a76daf0300f8ad1b5b0bf0ebd8b42c133904f8839663ca"}, "membrane_core": {:hex, :membrane_core, "0.10.2", "d2d17039f6df746e4a3c47da32f51867fbafe528272cdd9b226d16b1032bc337", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 2.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6a4f290f919ada66c772807d64d5830be2962b7c13a2f2bc9ace416a1cd19ee1"}, "membrane_file_plugin": {:hex, :membrane_file_plugin, "0.12.0", "eb940e7a2f2abf30e048bd0b7c2bef9c17c18aa58875b9a833c0bc7e7b1fd709", [:mix], [{:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "281b9bf9467beead3f973adce55b9844bc4206bb3f3f60f0db8320a4af4fc5ca"}, + "membrane_flv_plugin": {:hex, :membrane_flv_plugin, "0.3.1", "3d831ca93b4c28775f532148ed1b1f6f8bf41b37f2fd137ab3940b61dd53112d", [:mix], [{:bimap, "~> 1.2", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.7.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.3.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.7.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}], "hexpm", "5918f4a1bef78c8a3bc7ecd899de5a7945eacb1e6da06cc7f9e83135b3b8347d"}, "membrane_h264_ffmpeg_plugin": {:hex, :membrane_h264_ffmpeg_plugin, "0.22.0", "978ece9ad7c7cc211f73f9fa9d286f9ab3ebb64a05e09671c7c118a0d25cc9bc", [:mix], [{:bunch, "~> 1.3.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.13.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.3.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.2.0", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}, {:ratio, "~> 2.4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "aac73ba384a3e959c7957d1cac5dfb82f074fc16e5b64924238327decc3b2b2d"}, "membrane_h264_format": {:hex, :membrane_h264_format, "0.3.0", "84426aac86c3f4d3e8110438c3514ad94aa528e7002650d40e3b3862e2af5e3e", [:mix], [], "hexpm", "8254e52cea3c5d7c078c960a32f1ba338eeae9e301515302fd293f1683fa8dd9"}, "membrane_hackney_plugin": {:hex, :membrane_hackney_plugin, "0.8.2", "6b83628cc2019aa0b143c09e77f2dd9199a05528599d93c289dcab2e947369fa", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:membrane_core, "~> 0.10.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3", [hex: :mockery, repo: "hexpm", optional: false]}], "hexpm", "42906166b3692ba2270deb61721225ca7edadd1dbde6a44435664234a93597e2"}, diff --git a/test/fixtures/audio.aac b/test/fixtures/audio.aac new file mode 100644 index 00000000..a8607974 Binary files /dev/null and b/test/fixtures/audio.aac differ diff --git a/test/fixtures/testsrc.flv b/test/fixtures/testsrc.flv index 10cc3350..9b9901a1 100644 Binary files a/test/fixtures/testsrc.flv and b/test/fixtures/testsrc.flv differ diff --git a/test/fixtures/testvideo.h264 b/test/fixtures/video.h264 similarity index 100% rename from test/fixtures/testvideo.h264 rename to test/fixtures/video.h264 diff --git a/test/membrane_rtmp_plugin/rtmp_sink_test.exs b/test/membrane_rtmp_plugin/rtmp_sink_test.exs index 5a1e7300..4072453a 100644 --- a/test/membrane_rtmp_plugin/rtmp_sink_test.exs +++ b/test/membrane_rtmp_plugin/rtmp_sink_test.exs @@ -1,4 +1,4 @@ -defmodule Membrane.RTMP.Sink.Test do +defmodule Membrane.RTMP.SinkTest do use ExUnit.Case import Membrane.Testing.Assertions diff --git a/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs new file mode 100644 index 00000000..e7e8b6ef --- /dev/null +++ b/test/membrane_rtmp_plugin/rtmp_source_bin_test.exs @@ -0,0 +1,188 @@ +defmodule Membrane.RTMP.SourceBin.IntegrationTest do + use ExUnit.Case, async: true + + import Membrane.Testing.Assertions + + require Logger + + alias Membrane.RTMP.Source.TcpServer + alias Membrane.Testing + + @input_file "test/fixtures/testsrc.flv" + @local_ip "127.0.0.1" + @stream_key "ala2137" + + @stream_length_ms 3000 + @video_frame_duration_ms 42 + @audio_frame_duration_ms 23 + + test "SourceBin outputs the correct number of audio and video buffers" do + {:ok, port} = start_tcp_server() + + ffmpeg_task = + Task.async(fn -> + get_stream_url(port) |> start_ffmpeg() + end) + + pipeline = await_pipeline_started() + + assert_pipeline_playback_changed(pipeline, :prepared, :playing) + + assert_buffers(%{ + pipeline: pipeline, + sink: :video_sink, + stream_length: @stream_length_ms, + buffers_expected: div(@stream_length_ms, @video_frame_duration_ms) + }) + + assert_buffers(%{ + pipeline: pipeline, + sink: :audio_sink, + stream_length: @stream_length_ms, + buffers_expected: div(@stream_length_ms, @audio_frame_duration_ms) + }) + + assert_end_of_stream(pipeline, :audio_sink, :input) + assert_end_of_stream(pipeline, :video_sink, :input) + + # Cleanup + Testing.Pipeline.terminate(pipeline, blocking?: true) + assert :ok = Task.await(ffmpeg_task) + end + + test "Correct Stream ID is correctly verified" do + {:ok, port} = start_tcp_server(Membrane.RTMP.Source.TestVerifier) + + ffmpeg_task = + Task.async(fn -> + get_stream_url(port, @stream_key) |> start_ffmpeg() + end) + + pipeline = await_pipeline_started() + assert_pipeline_playback_changed(pipeline, :prepared, :playing) + + assert_pipeline_notified( + pipeline, + :src, + {:stream_validation_success, "correct stream key"} + ) + + assert_end_of_stream(pipeline, :audio_sink, :input, @stream_length_ms + 500) + assert_end_of_stream(pipeline, :video_sink, :input) + + # Cleanup + Testing.Pipeline.terminate(pipeline, blocking?: true) + assert :ok = Task.await(ffmpeg_task) + end + + test "Wrong Stream ID is denied" do + {:ok, port} = start_tcp_server(Membrane.RTMP.Source.TestVerifier) + + ffmpeg_task = + Task.async(fn -> + get_stream_url(port, @stream_key <> "wrong") |> start_ffmpeg() + end) + + pipeline = await_pipeline_started() + assert_pipeline_playback_changed(pipeline, :prepared, :playing) + + assert_pipeline_notified( + pipeline, + :src, + {:stream_validation_error, "wrong stream key"} + ) + + # Cleanup + Testing.Pipeline.terminate(pipeline, blocking?: true) + assert :error = Task.await(ffmpeg_task) + end + + defp start_tcp_server(verifier \\ Membrane.RTMP.DefaultMessageValidator) do + test_process = self() + + options = %TcpServer{ + port: 0, + listen_options: [ + :binary, + packet: :raw, + active: false, + ip: @local_ip |> String.to_charlist() |> :inet.parse_address() |> elem(1) + ], + socket_handler: fn socket -> + options = [ + module: Membrane.RTMP.Source.TestPipeline, + custom_args: [socket: socket, test_process: test_process, verifier: verifier], + test_process: test_process + ] + + Testing.Pipeline.start_link(options) + end, + parent: test_process + } + + {:ok, _tcp_server} = TcpServer.start_link(options) + + receive do + {:tcp_server_started, socket} -> + :inet.port(socket) + end + end + + defp get_stream_url(port, key \\ nil) do + "rtmp://#{@local_ip}:#{port}" <> + if key, do: "/app/" <> key, else: "" + end + + defp start_ffmpeg(stream_url) do + import FFmpex + use FFmpex.Options + Logger.debug("Starting ffmpeg") + + command = + FFmpex.new_command() + |> add_global_option(option_y()) + |> add_input_file(@input_file) + |> add_file_option(option_re()) + |> add_output_file(stream_url) + |> add_file_option(option_f("flv")) + |> add_file_option(option_vcodec("copy")) + |> add_file_option(option_acodec("copy")) + + case FFmpex.execute(command) do + {:ok, ""} -> + :ok + + error -> + Logger.error(inspect(error)) + :error + end + end + + defp await_pipeline_started() do + receive do + {:pipeline_started, pid} -> pid + end + end + + defp assert_buffers(%{last_dts: _dts} = state) do + assert_sink_buffer(state.pipeline, state.sink, %Membrane.Buffer{dts: dts}) + assert dts >= state.last_dts + + buffers = state.buffers + 1 + state = %{state | last_dts: dts, buffers: buffers} + + if dts < state.stream_length do + assert_buffers(state) + else + assert state.buffers >= state.buffers_expected + end + end + + defp assert_buffers(state) do + stream_length = Membrane.Time.milliseconds(state.stream_length) + + state + |> Map.merge(%{stream_length: stream_length, last_dts: -1, buffers: 0}) + |> assert_buffers() + end +end diff --git a/test/membrane_rtmp_plugin/rtmp_source_test.exs b/test/membrane_rtmp_plugin/rtmp_source_test.exs deleted file mode 100644 index 5595d38d..00000000 --- a/test/membrane_rtmp_plugin/rtmp_source_test.exs +++ /dev/null @@ -1,102 +0,0 @@ -defmodule Membrane.RTMP.Source.Test do - use ExUnit.Case - import Membrane.Testing.Assertions - - require Logger - - alias Membrane.Testing - alias Membrane.Testing.Pipeline - - @input_file "test/fixtures/testsrc.flv" - @port 9009 - @rtmp_stream_url "rtmp://127.0.0.1:#{@port}/" - - test "Check if the stream started and that it ends" do - assert {:ok, pipeline} = get_testing_pipeline() - assert_pipeline_playback_changed(pipeline, :prepared, :playing) - - ffmpeg_task = Task.async(&start_ffmpeg/0) - - assert_sink_buffer(pipeline, :video_sink, %Membrane.Buffer{}) - assert_sink_buffer(pipeline, :audio_sink, %Membrane.Buffer{}) - assert_end_of_stream(pipeline, :audio_sink, :input, 11_000) - assert_end_of_stream(pipeline, :video_sink, :input) - - # Cleanup - Pipeline.terminate(pipeline, blocking?: true) - assert :ok = Task.await(ffmpeg_task) - end - - test "blocking calls are cancelled properly" do - alias Membrane.RTMP.Source.Native - - test_pid = self() - - {pid, ref} = - spawn_monitor(fn -> - native_pid = Native.start_link(@rtmp_stream_url, :infinity) - send(test_pid, {:native_pid, native_pid}) - # Ensure avformat_open_input gets stuck at listening - Process.sleep(100) - Process.exit(self(), :shutdown) - end) - - assert_receive {:native_pid, native_pid} - native_monitor_ref = Process.monitor(native_pid) - - # First, the spawned process will exit - assert_receive {:DOWN, ^ref, :process, ^pid, :shutdown}, 200 - - # Now, after interrupt_callback fires, the native process should exit as well - assert_receive {:DOWN, ^native_monitor_ref, :process, ^native_pid, :shutdown}, 500 - Process.sleep(100) - - # Ensure the ffmpeg does not listen on this port anymore - assert :gen_tcp.connect('127.0.0.1', @port, [:binary]) == {:error, :econnrefused} - end - - defp get_testing_pipeline() do - import Membrane.ParentSpec - timeout = Membrane.Time.seconds(10) - - options = [ - children: [ - src: %Membrane.RTMP.SourceBin{port: @port, timeout: timeout}, - audio_sink: Testing.Sink, - video_sink: Testing.Sink - ], - links: [ - link(:src) |> via_out(:audio) |> to(:audio_sink), - link(:src) |> via_out(:video) |> to(:video_sink) - ], - test_process: self() - ] - - Pipeline.start_link(options) - end - - defp start_ffmpeg() do - import FFmpex - use FFmpex.Options - Logger.debug("Starting ffmpeg") - - command = - FFmpex.new_command() - |> add_global_option(option_y()) - |> add_input_file(@input_file) - |> add_file_option(option_re()) - |> add_output_file(@rtmp_stream_url) - |> add_file_option(option_f("flv")) - |> add_file_option(option_vcodec("copy")) - |> add_file_option(option_acodec("copy")) - - case FFmpex.execute(command) do - {:ok, ""} -> - :ok - - error -> - Logger.error(inspect(error)) - :error - end - end -end diff --git a/test/membrane_rtmp_plugin/tcp_server_test.exs b/test/membrane_rtmp_plugin/tcp_server_test.exs new file mode 100644 index 00000000..bbb01563 --- /dev/null +++ b/test/membrane_rtmp_plugin/tcp_server_test.exs @@ -0,0 +1,51 @@ +defmodule Membrane.RTMP.Source.TcpServerTest do + use ExUnit.Case + + alias Membrane.RTMP.Source.TcpServer + + @port 9000 + @local_ip "127.0.0.1" |> String.to_charlist() |> :inet.parse_address() |> elem(1) + @sample_data "Hello World" + + test "TcpServer transfers the control to the process " do + testing_process = self() + + server_options = %TcpServer{ + port: @port, + listen_options: [ + :binary, + packet: :raw, + active: false, + reuseaddr: true, + ip: @local_ip + ], + socket_handler: fn socket -> + {:ok, receive_task} = + Task.start(fn -> + testing_process |> Process.link() + :inet.setopts(socket, active: true) + + data = @sample_data + assert_receive({:tcp, ^socket, ^data}, 1000) + end) + + {:ok, receive_task} + end + } + + TcpServer.start_link(server_options) + + Process.sleep(500) + + {:ok, socket} = + :gen_tcp.connect( + @local_ip, + @port, + [], + :infinity + ) + + :gen_tcp.send(socket, @sample_data) + assert_receive({:tcp_closed, ^socket}, 1000) + end +end diff --git a/test/support/rtmp_source_test_pipeline.ex b/test/support/rtmp_source_test_pipeline.ex new file mode 100644 index 00000000..d26d7bf8 --- /dev/null +++ b/test/support/rtmp_source_test_pipeline.ex @@ -0,0 +1,60 @@ +defmodule Membrane.RTMP.Source.TestPipeline do + @moduledoc false + use Membrane.Pipeline + + import Membrane.ParentSpec + + alias Membrane.RTMP.SourceBin + alias Membrane.Testing + + @impl true + def handle_init(socket: socket, test_process: test_process, verifier: verifier) do + spec = %Membrane.ParentSpec{ + children: [ + src: %SourceBin{ + socket: socket, + validator: verifier + }, + audio_sink: Testing.Sink, + video_sink: Testing.Sink + ], + links: [ + link(:src) |> via_out(:audio) |> to(:audio_sink), + link(:src) |> via_out(:video) |> to(:video_sink) + ] + } + + send(test_process, {:pipeline_started, self()}) + + {{:ok, [spec: spec, playback: :playing]}, %{socket: socket}} + end + + @impl true + def handle_notification( + {:socket_control_needed, _socket, _source} = notification, + :src, + _ctx, + state + ) do + send(self(), notification) + + {:ok, state} + end + + def handle_notification(_notification, _child, _ctx, state) do + {:ok, state} + end + + @impl true + def handle_other({:socket_control_needed, socket, source} = notification, _ctx, state) do + case SourceBin.pass_control(socket, source) do + :ok -> + :ok + + {:error, :not_owner} -> + Process.send_after(self(), notification, 200) + end + + {:ok, state} + end +end diff --git a/test/support/test_verifier.ex b/test/support/test_verifier.ex new file mode 100644 index 00000000..da00ef46 --- /dev/null +++ b/test/support/test_verifier.ex @@ -0,0 +1,17 @@ +defmodule Membrane.RTMP.Source.TestVerifier do + @moduledoc false + use Membrane.RTMP.MessageValidator + + alias Membrane.RTMP.Messages + + @stream_key "ala2137" + + @impl true + def validate_publish(%Messages.Publish{stream_key: stream_key}) do + if stream_key == @stream_key do + {:ok, "correct stream key"} + else + {:error, "wrong stream key"} + end + end +end