Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RTP Muxer #173

Open
wants to merge 17 commits into
base: rtp-demuxer
Choose a base branch
from
46 changes: 28 additions & 18 deletions lib/membrane/rtp/muxer.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
defmodule Membrane.RTP.Muxer do
@moduledoc false
@moduledoc """
Element that combines multiple streams into a single RTP stream. Each new input stream is assigned a unique SSRC, that the packets
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first line is used as a short description

Suggested change
Element that combines multiple streams into a single RTP stream. Each new input stream is assigned a unique SSRC, that the packets
Element that combines multiple streams into a single RTP stream.
Each new input stream is assigned a unique SSRC, that the packets

transporting this stream will have. When a new pad is conneted, it's required to pass it options sufficient for it to be able to
resolve what `payload_type` and `clock_rate` should be assumed. Timestamps are calculated based on assumed `clock_rate`.
"""
use Membrane.Filter

require Membrane.Pad
Expand Down Expand Up @@ -48,18 +52,26 @@ defmodule Membrane.RTP.Muxer do

defmodule State do
@moduledoc false
defmodule StreamState do
@moduledoc false
alias Membrane.RTP

@type t :: %__MODULE__{
ssrc: RTP.ssrc(),
sequence_number: ExRTP.Packet.uint16(),
initial_timestamp: ExRTP.Packet.uint32(),
clock_rate: RTP.clock_rate(),
payload_type: RTP.payload_type(),
end_of_stream: boolean()
}

@enforce_keys [:ssrc, :sequence_number, :initial_timestamp, :clock_rate, :payload_type]

defstruct @enforce_keys ++ [end_of_stream: false]
end

@type t :: %__MODULE__{
stream_states: %{
Pad.ref() => %{
ssrc: RTP.ssrc(),
sequence_number: ExRTP.Packet.uint16(),
initial_timestamp: ExRTP.Packet.uint32(),
clock_rate: RTP.clock_rate(),
payload_type: RTP.payload_type(),
end_of_stream: boolean()
}
}
stream_states: %{Pad.ref() => StreamState.t()}
}

@enforce_keys []
Expand Down Expand Up @@ -87,16 +99,15 @@ defmodule Membrane.RTP.Muxer do
if payload_type == nil, do: raise("Could not resolve payload type")
if clock_rate == nil, do: raise("Could not resolve clock rate")

new_stream_state = %{
new_stream_state = %State.StreamState{
ssrc: ssrc,
sequence_number: Enum.random(0..@max_sequence_number),
initial_timestamp: Enum.random(0..@max_timestamp),
clock_rate: clock_rate,
payload_type: payload_type,
end_of_stream: false
payload_type: payload_type
}

state = Bunch.Struct.put_in(state, [:stream_states, pad_ref], new_stream_state)
state = put_in(state.stream_states[pad_ref], new_stream_state)

{[], state}
end
Expand Down Expand Up @@ -125,8 +136,7 @@ defmodule Membrane.RTP.Muxer do
timestamp = rem(stream_state.initial_timestamp + rtp_offset, @max_timestamp + 1)
sequence_number = rem(stream_state.sequence_number + 1, @max_sequence_number + 1)

state =
Bunch.Struct.put_in(state, [:stream_states, pad_ref, :sequence_number], sequence_number)
state = put_in(state.stream_states[pad_ref].sequence_number, sequence_number)

packet =
ExRTP.Packet.new(buffer.payload,
Expand All @@ -151,7 +161,7 @@ defmodule Membrane.RTP.Muxer do

@impl true
def handle_end_of_stream(Pad.ref(:input, _ref) = pad_ref, _ctx, state) do
state = Bunch.Struct.put_in(state, [:stream_states, pad_ref, :end_of_stream], true)
state = put_in(state.stream_states[pad_ref].end_of_stream, true)

if Enum.all?(Enum.map(state.stream_states, fn {_pad_ref, %{end_of_stream: eos}} -> eos end)) do
{[end_of_stream: :output], state}
Expand Down
Binary file not shown.
8 changes: 2 additions & 6 deletions test/membrane/rtp/demuxer_muxer_integration_test.exs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a test where we demux and mux packets two times and check if we get the same results? That would need to happen on a stream that's already past the jitter buffer.

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ defmodule Membrane.RTP.DemuxerMuxerTest do

@impl true
def handle_child_notification(
{:new_rtp_stream, ssrc, pt, _extensions},
{:new_rtp_stream, %{ssrc: ssrc, payload_type: pt}},
:rtp_demuxer,
_ctx,
state
Expand All @@ -52,11 +52,7 @@ defmodule Membrane.RTP.DemuxerMuxerTest do
get_child(:rtp_demuxer)
|> via_out(:output, options: [stream_id: {:ssrc, ssrc}])
|> child({:jitter_buffer, ssrc}, %Membrane.RTP.JitterBuffer{clock_rate: clock_rate})
|> via_in(:input,
options: [
encoding: encoding_name
]
)
|> via_in(:input, options: [encoding: encoding_name])
|> get_child(:rtp_muxer)

{[spec: spec], state}
Expand Down
54 changes: 54 additions & 0 deletions test/membrane/rtp/muxer_demuxer_integration_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
defmodule Membrane.RTP.MuxerDemuxerTest do
@moduledoc false
use ExUnit.Case
import Membrane.Testing.Assertions
alias Membrane.RTP
alias Membrane.Testing

@input_path "test/fixtures/rtp/h264/bun.h264"

defmodule MuxerDemuxerPipeline do
use Membrane.Pipeline

@impl true
def handle_init(_ctx, opts) do
%{clock_rate: clock_rate} = RTP.PayloadFormat.resolve(encoding_name: :H264)

spec = [
child(:source, %Membrane.File.Source{location: opts.input_path})
|> child(:h264_parser, %Membrane.H264.Parser{
output_alignment: :nalu,
generate_best_effort_timestamps: %{framerate: {30, 1}}
})
|> child(:rtp_h264_payloader, Membrane.RTP.H264.Payloader)
|> via_in(:input, options: [encoding: :H264])
|> child(:rtp_muxer, Membrane.RTP.Muxer)
|> child(:rtp_demuxer, Membrane.RTP.Demuxer)
|> via_out(:output, options: [stream_id: {:encoding_name, :H264}])
|> child(:jitter_buffer, %Membrane.RTP.JitterBuffer{clock_rate: clock_rate})
|> child(:rtp_h264_depayloader, Membrane.RTP.H264.Depayloader)
|> child(:sink, %Membrane.File.Sink{location: opts.output_path})
]

{[spec: spec], %{}}
end
end

@tag :tmp_dir
test "Muxed and demuxed stream is the same as unchanged one", %{tmp_dir: tmp_dir} do
output_path = Path.join(tmp_dir, "output.h264")

pipeline =
Testing.Pipeline.start_supervised!(
module: MuxerDemuxerPipeline,
custom_args: %{input_path: @input_path, output_path: output_path}
)

assert_start_of_stream(pipeline, :sink)
assert_end_of_stream(pipeline, :sink)

assert File.read!(@input_path) == File.read!(output_path)

Testing.Pipeline.terminate(pipeline)
end
end