Skip to content

Commit

Permalink
Merge pull request #37 from membraneframework/core-0.11
Browse files Browse the repository at this point in the history
Update Core to `0.11`, bump package version
  • Loading branch information
balins authored Jan 5, 2023
2 parents 7988185 + da360c7 commit 1b3d00c
Show file tree
Hide file tree
Showing 20 changed files with 384 additions and 369 deletions.
25 changes: 22 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,25 @@ workflows:
version: 2
build:
jobs:
- elixir/build_test
- elixir/test
- elixir/lint
- elixir/build_test:
filters: &filters
tags:
only: /v.*/
- elixir/test:
filters:
<<: *filters
- elixir/lint:
filters:
<<: *filters
- elixir/hex_publish:
requires:
- elixir/build_test
- elixir/test
- elixir/lint
context:
- Deployment
filters:
branches:
ignore: /.*/
tags:
only: /v.*/
2 changes: 1 addition & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
inputs: [
"{lib,test,config}/**/*.{ex,exs}",
"{lib,test,config,examples}/**/*.{ex,exs}",
"c_src/**/*.spec.exs",
".formatter.exs",
"*.exs"
Expand Down
11 changes: 5 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[![CircleCI](https://circleci.com/gh/membraneframework/membrane_rtmp_plugin.svg?style=svg)](https://circleci.com/gh/membraneframework/membrane_rtmp_plugin)

This package provides RTMP server which receives an RTMP stream from a client and an element for streaming to an RTMP server.
It is part of [Membrane Multimedia Framework](https://membraneframework.org).
It is a part of [Membrane Multimedia Framework](https://membraneframework.org).

## Installation

Expand All @@ -14,22 +14,21 @@ The package can be installed by adding `membrane_rtmp_plugin` to your list of de
```elixir
def deps do
[
{:membrane_rtmp_plugin, "~> 0.9.1"}
{:membrane_rtmp_plugin, "~> 0.10.0"}
]
end
```

## SourceBin

Requires a socket, which has been connected to the client. It receives RTMP stream, demuxes it and outputs H264 video and AAC audio.
Requires a socket, which has been connected to the client. It receives RTMP stream, demuxes it and outputs H264 video and AAC audio.

## Client
After establishing connection with server it waits to receive video and audio streams. Once both streams are received they are streamed to the server.
Currently only the following codecs are supported:
- H264 for video
- AAC for audio

## Tcp Server
## TCP Server
It's a simple implementation of tcp server. It opens a tcp port and listens for incoming connections. For each new connection, a user-provided function is executed.

### Prerequisites
Expand Down Expand Up @@ -62,7 +61,7 @@ RTMP server that will receive this stream can be launched with ffmpeg by running

```bash
export RTMP_URL=rtmp://localhost:1935
ffmpeg -listen 1 -f flv -i rtmp://localhost:1935 -c copy dest.flv
ffmpeg -y -listen 1 -f flv -i rtmp://localhost:1935 -c copy dest.flv
```

It will receive stream and once streaming is completed dump it to .flv file. If you are using the command above, please remember to run it **before** the streaming script.
Expand Down
27 changes: 16 additions & 11 deletions c_src/membrane_rtmp_plugin/sink/rtmp_sink.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ UNIFEX_TERM init_video_stream(UnifexEnv *env, State *state, int width,
int height, UnifexPayload *avc_config) {
AVStream *video_stream;
if (state->video_stream_index != -1) {
return init_video_stream_result_error_caps_resent(env);
return init_video_stream_result_error_stream_format_resent(env);
}

video_stream = avformat_new_stream(state->output_ctx, NULL);
Expand Down Expand Up @@ -89,7 +89,7 @@ UNIFEX_TERM init_audio_stream(UnifexEnv *env, State *state, int channels,
int sample_rate, UnifexPayload *aac_config) {
AVStream *audio_stream;
if (state->audio_stream_index != -1) {
return init_audio_stream_result_error_caps_resent(env);
return init_audio_stream_result_error_stream_format_resent(env);
}

audio_stream = avformat_new_stream(state->output_ctx, NULL);
Expand Down Expand Up @@ -127,14 +127,15 @@ UNIFEX_TERM write_video_frame(UnifexEnv *env, State *state,
int is_key_frame) {
if (state->video_stream_index == -1) {
return write_video_frame_result_error(
env, "Video stream is not initialized. Caps has not been received");
env,
"Video stream is not initialized. Stream format has not been received");
}

AVRational video_stream_time_base =
state->output_ctx->streams[state->video_stream_index]->time_base;
AVPacket *packet = av_packet_alloc();

uint8_t* data = (uint8_t *)av_malloc(frame->size);
uint8_t *data = (uint8_t *)av_malloc(frame->size);
memcpy(data, frame->data, frame->size);
av_packet_from_data(packet, data, frame->size);

Expand Down Expand Up @@ -162,9 +163,11 @@ UNIFEX_TERM write_video_frame(UnifexEnv *env, State *state,
packet->duration = dts_scaled - state->current_video_dts;
state->current_video_dts = dts_scaled;

if (av_write_frame(state->output_ctx, packet)) {
int result = av_write_frame(state->output_ctx, packet);

if (result) {
write_frame_result =
write_video_frame_result_error(env, "Failed writing video frame");
write_video_frame_result_error(env, av_err2str(result));
goto end;
}
write_frame_result = write_video_frame_result_ok(env, state);
Expand All @@ -178,15 +181,15 @@ UNIFEX_TERM write_audio_frame(UnifexEnv *env, State *state,
UnifexPayload *frame, int64_t pts) {
if (state->audio_stream_index == -1) {
return write_audio_frame_result_error(
env,
"Audio stream has not been initialized. Caps has not been received");
env, "Audio stream has not been initialized. Stream format has not "
"been received");
}

AVRational audio_stream_time_base =
state->output_ctx->streams[state->audio_stream_index]->time_base;
AVPacket *packet = av_packet_alloc();

uint8_t* data = (uint8_t *)av_malloc(frame->size);
uint8_t *data = (uint8_t *)av_malloc(frame->size);
memcpy(data, frame->data, frame->size);
av_packet_from_data(packet, data, frame->size);

Expand All @@ -209,9 +212,11 @@ UNIFEX_TERM write_audio_frame(UnifexEnv *env, State *state,
packet->duration = pts_scaled - state->current_audio_pts;
state->current_audio_pts = pts_scaled;

if (av_write_frame(state->output_ctx, packet)) {
int result = av_write_frame(state->output_ctx, packet);

if (result) {
write_frame_result =
write_audio_frame_result_error(env, "Failed writing audio frame");
write_audio_frame_result_error(env, av_err2str(result));
goto end;
}
write_frame_result = write_audio_frame_result_ok(env, state);
Expand Down
2 changes: 1 addition & 1 deletion c_src/membrane_rtmp_plugin/sink/rtmp_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ struct State {
bool header_written;
};

#include "_generated/rtmp_sink.h"
#include "_generated/rtmp_sink.h"
4 changes: 2 additions & 2 deletions c_src/membrane_rtmp_plugin/sink/rtmp_sink.spec.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ spec try_connect(state) ::
spec finalize_stream(state) :: :ok :: label

spec init_video_stream(state, width :: int, height :: int, avc_config :: payload) ::
{:ok :: label, ready :: bool, state} | {:error :: label, :caps_resent :: label}
{:ok :: label, ready :: bool, state} | {:error :: label, :stream_format_resent :: label}

spec write_video_frame(state, frame :: payload, dts :: int64, pts :: int64, is_key_frame :: bool) ::
{:ok :: label, state} | {:error :: label, reason :: string}

spec init_audio_stream(state, channels :: int, sample_rate :: int, aac_config :: payload) ::
{:ok :: label, ready :: bool, state} | {:error :: label, :caps_resent :: label}
{:ok :: label, ready :: bool, state} | {:error :: label, :stream_format_resent :: label}

spec write_audio_frame(state, frame :: payload, pts :: int64) ::
{:ok :: label, state} | {:error :: label, reason :: string}
Expand Down
81 changes: 36 additions & 45 deletions examples/sink.exs
Original file line number Diff line number Diff line change
@@ -1,85 +1,76 @@
# Before running this example, make sure that target RTMP server is live.
# If you are streaming to eg. Youtube, you don't need to worry about it.
# If you want to test it locally, you can run the FFmpeg server with:
# ffmpeg -listen 1 -f flv -i rtmp://localhost:1935 -c copy dest.flv
# ffmpeg -y -listen 1 -f flv -i rtmp://localhost:1935 -c copy dest.flv

Mix.install([
{:membrane_core, "~> 0.10.1"},
{:membrane_rtmp_plugin, path: __DIR__ |> Path.join("..") |> Path.expand()},
:membrane_realtimer_plugin,
:membrane_hackney_plugin,
:membrane_h264_ffmpeg_plugin,
:membrane_aac_plugin,
:membrane_mp4_plugin
{:membrane_rtmp_plugin, path: __DIR__ |> Path.join("..") |> Path.expand()}
])

defmodule Example do
use Membrane.Pipeline

@samples_url "https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/"
@video_url @samples_url <> "bun33s_480x270.h264"
@audio_url @samples_url <> "bun33s.aac"

@impl true
def handle_init(_opts) do
children = [
video_source: %Membrane.Hackney.Source{
location: "https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/bun33s_480x270.h264",
def handle_init(_ctx, destination: destination) do
structure = [
child(:rtmp_sink, %Membrane.RTMP.Sink{rtmp_url: destination}),
child(:video_source, %Membrane.Hackney.Source{
location: @video_url,
hackney_opts: [follow_redirect: true]
},
video_parser: %Membrane.H264.FFmpeg.Parser{
})
|> child(:video_parser, %Membrane.H264.FFmpeg.Parser{
framerate: {25, 1},
alignment: :au,
attach_nalus?: true,
skip_until_keyframe?: true
},
audio_parser: %Membrane.AAC.Parser{
})
|> child(:video_realtimer, Membrane.Realtimer)
|> child(:video_payloader, Membrane.MP4.Payloader.H264)
|> via_in(:video)
|> get_child(:rtmp_sink),
child(:audio_source, %Membrane.Hackney.Source{
location: @audio_url,
hackney_opts: [follow_redirect: true]
})
|> child(:audio_parser, %Membrane.AAC.Parser{
in_encapsulation: :ADTS,
out_encapsulation: :none
},
audio_source: %Membrane.Hackney.Source{
location: "https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/bun33s.aac",
hackney_opts: [follow_redirect: true]
},
video_realtimer: Membrane.Realtimer,
audio_realtimer: Membrane.Realtimer,
video_payloader: Membrane.MP4.Payloader.H264,
rtmp_sink: %Membrane.RTMP.Sink{rtmp_url: System.get_env("RTMP_URL", "rtmp://localhost:1935")}
]

links = [
link(:video_source)
|> to(:video_parser)
|> to(:video_realtimer)
|> to(:video_payloader)
|> via_in(:video)
|> to(:rtmp_sink),
link(:audio_source)
|> to(:audio_parser)
|> to(:audio_realtimer)
})
|> child(:audio_realtimer, Membrane.Realtimer)
|> via_in(:audio)
|> to(:rtmp_sink)
|> get_child(:rtmp_sink)
]

{{:ok, spec: %ParentSpec{children: children, links: links}, playback: :playing}, %{finished_streams: []}}
{[spec: structure, playback: :playing], %{streams_to_end: 2}}
end

# The rest of the example module is only used for self-termination of the pipeline after processing finishes
@impl true
def handle_element_end_of_stream({:rtmp_sink, pad}, _ctx, state) when length(state.finished_streams) == 1 do
Membrane.Pipeline.terminate(self())
{:ok, Map.put(state, :finished_streams, &[pad | &1])}
def handle_element_end_of_stream(:rtmp_sink, _pad, _ctx, %{streams_to_end: 1} = state) do
{[terminate: :shutdown], %{state | streams_to_end: 0}}
end

@impl true
def handle_element_end_of_stream({:rtmp_sink, pad}, _ctx, state) do
{:ok, Map.put(state, :finished_streams, [pad])}
def handle_element_end_of_stream(:rtmp_sink, _pad, _ctx, state) do
{[], %{state | streams_to_end: 1}}
end

@impl true
def handle_element_end_of_stream(_element, _ctx, state) do
{:ok, state}
def handle_element_end_of_stream(_child, _pad, _ctx, state) do
{[], state}
end
end

destination = System.get_env("RTMP_URL", "rtmp://localhost:1935")

# Initialize the pipeline and start it
{:ok, pipeline} = Example.start_link()
{:ok, _supervisor, pipeline} = Example.start_link(destination: destination)

monitor_ref = Process.monitor(pipeline)

Expand Down
Loading

0 comments on commit 1b3d00c

Please sign in to comment.