Skip to content

Commit

Permalink
Ex webrtc 0.4 (#7)
Browse files Browse the repository at this point in the history
* bump ex_webrtc to 0.4

* Keyframe interval (#6)

* Add a keyframe requester

* Fix

* Update last keyframe request timestamp

* Request keyframes on a timer

* Satisfy credo

* Bump version

* Start timers on first received packet

* Fix not updating state

* Apply reviewers suggestion

* send keyframe request events on relevant pads, add tests

* fixes for CR

---------

Co-authored-by: Jakub Pryc <[email protected]>
  • Loading branch information
mat-hek and Noarkhh authored Sep 17, 2024
1 parent 68fb6db commit 7dadcc2
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 76 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_webrtc_plugin` to your list of
```elixir
def deps do
[
{:membrane_webrtc_plugin, "~> 0.21.0"}
{:membrane_webrtc_plugin, "~> 0.22.0"}
]
end
```
Expand Down
56 changes: 27 additions & 29 deletions lib/membrane_webrtc/ex_webrtc/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do

@max_rtp_timestamp 2 ** 32 - 1
@max_rtp_seq_num 2 ** 16 - 1
@keyframe_request_throttle_time Membrane.Time.milliseconds(500)

@impl true
def handle_init(_ctx, opts) do
Expand All @@ -40,8 +41,7 @@ defmodule Membrane.WebRTC.ExWebRTCSink do
audio_params: ExWebRTCUtils.codec_params(:opus),
video_params: ExWebRTCUtils.codec_params(opts.video_codec),
video_codec: opts.video_codec,
ice_servers: opts.ice_servers,
last_keyframe_request_ts: nil
ice_servers: opts.ice_servers
}}
end

Expand Down Expand Up @@ -85,12 +85,14 @@ defmodule Membrane.WebRTC.ExWebRTCSink do
negotiated_tracks = List.delete(negotiated_tracks, track)

params = %{
kind: track.kind,
clock_rate:
case track.kind do
:audio -> ExWebRTCUtils.codec_clock_rate(:opus)
:video -> ExWebRTCUtils.codec_clock_rate(state.video_codec)
end,
seq_num: Enum.random(0..@max_rtp_seq_num)
seq_num: Enum.random(0..@max_rtp_seq_num),
last_keyframe_request_ts: Membrane.Time.monotonic_time() - @keyframe_request_throttle_time
}

input_tracks = Map.put(input_tracks, pad, {track.id, params})
Expand Down Expand Up @@ -137,39 +139,35 @@ defmodule Membrane.WebRTC.ExWebRTCSink do
end

@impl true
def handle_info({:ex_webrtc, _from, {:rtcp, rtcp_packets}}, ctx, state) do
pli? =
def handle_info({:ex_webrtc, _from, {:rtcp, rtcp_packets}}, _ctx, state) do
time = Membrane.Time.monotonic_time()

{keyframe_requests, input_tracks} =
rtcp_packets
|> Enum.reduce(false, fn
%PLI{} = packet, _pli? ->
|> Enum.filter(fn
{_track_id, %PLI{} = packet} ->
Membrane.Logger.debug("Keyframe request received: #{inspect(packet)}")
true

packet, pli? ->
packet ->
Membrane.Logger.debug_verbose("Ignoring RTCP packet: #{inspect(packet)}")
pli?
false
end)
|> Enum.flat_map_reduce(state.input_tracks, fn {track_id, _pli}, input_tracks ->
{pad, {_id, props}} =
Enum.find(input_tracks, fn {_pad, {id, _props}} -> track_id == id end)

if props.kind == :video and
time - props.last_keyframe_request_ts > @keyframe_request_throttle_time do
event = [event: {pad, %Membrane.KeyframeRequestEvent{}}]
props = %{props | last_keyframe_request_ts: time}
{event, %{input_tracks | pad => {track_id, props}}}
else
{[], input_tracks}
end
end)

now = System.os_time(:millisecond) |> Membrane.Time.milliseconds()
then = state.last_keyframe_request_ts

request_keyframe? = pli? and (then == nil or now - then >= Membrane.Time.second())
state = if request_keyframe?, do: %{state | last_keyframe_request_ts: now}, else: state

actions =
if request_keyframe? do
ctx.pads
|> Enum.flat_map(fn {pad_ref, pad_data} ->
case pad_data.options.kind do
:video -> [event: {pad_ref, %Membrane.KeyframeRequestEvent{}}]
:audio -> []
end
end)
else
[]
end

{actions, state}
{keyframe_requests, %{state | input_tracks: input_tracks}}
end

@impl true
Expand Down
113 changes: 86 additions & 27 deletions lib/membrane_webrtc/ex_webrtc/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,57 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
alias ExWebRTC.{ICECandidate, PeerConnection, SessionDescription}
alias Membrane.WebRTC.{ExWebRTCUtils, SignalingChannel, SimpleWebSocketServer}

def_options signaling: [], video_codec: [], ice_servers: []
def_options signaling: [], video_codec: [], ice_servers: [], keyframe_interval: []

def_output_pad :output,
accepted_format: Membrane.RTP,
availability: :on_request,
flow_control: :push,
options: [kind: [default: nil]]

defmodule State do
@moduledoc false

@type output_track :: %{
status: :awaiting | :connected,
pad: Membrane.Pad.ref() | nil,
track: ExWebRTC.MediaStreamTrack.t(),
first_packet_received: boolean()
}

@type t :: %__MODULE__{
pc: pid() | nil,
output_tracks: %{(pad_id :: term()) => output_track()},
awaiting_outputs: [{:video | :audio, Membrane.Pad.ref()}],
awaiting_candidates: [ExWebRTC.ICECandidate.t()],
signaling: SignalingChannel.t() | {:websocket, SimpleWebSocketServer.options()},
status: :init | :connecting | :connected | :closed,
audio_params: [ExWebRTC.RTPCodecParameters.t()],
video_params: [ExWebRTC.RTPCodecParameters.t()],
ice_servers: [ExWebRTC.PeerConnection.Configuration.ice_server()],
keyframe_interval: Membrane.Time.t() | nil
}

@enforce_keys [:signaling, :audio_params, :video_params, :ice_servers, :keyframe_interval]
defstruct @enforce_keys ++
[
pc: nil,
output_tracks: %{},
awaiting_outputs: [],
awaiting_candidates: [],
status: :init
]
end

@impl true
def handle_init(_ctx, opts) do
{[],
%{
pc: nil,
output_tracks: %{},
awaiting_outputs: [],
awaiting_candidates: [],
%State{
signaling: opts.signaling,
status: :init,
audio_params: ExWebRTCUtils.codec_params(:opus),
video_params: ExWebRTCUtils.codec_params(opts.video_codec),
ice_servers: opts.ice_servers
ice_servers: opts.ice_servers,
keyframe_interval: opts.keyframe_interval
}}
end

Expand Down Expand Up @@ -69,7 +99,9 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
def handle_pad_added(Pad.ref(:output, pad_id) = pad, _ctx, state) do
state =
state
|> update_in([:output_tracks, pad_id], fn {:awaiting, _track} -> {:connected, pad} end)
|> Bunch.Struct.update_in([:output_tracks, pad_id], fn output_track ->
%{output_track | status: :connected, pad: pad}
end)
|> maybe_answer()

{[stream_format: {pad, %Membrane.RTP{}}], state}
Expand All @@ -80,7 +112,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
track_id =
state.output_tracks
|> Enum.find_value(fn
{track_id, {:connected, ^pad}} -> track_id
{track_id, %{status: :connected, pad: ^pad}} -> track_id
_other -> false
end)

Expand Down Expand Up @@ -110,19 +142,32 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
metadata: %{rtp: packet |> Map.from_struct() |> Map.delete(:payload)}
}

%{output_tracks: output_tracks} = state

case output_tracks[id] do
{:connected, pad} ->
{[buffer: {pad, buffer}], state}

{:awaiting, track} ->
case state.output_tracks[id] do
%{status: :awaiting, track: track} ->
Membrane.Logger.warning("""
Dropping packet of track #{inspect(id)}, kind #{inspect(track.kind)} \
that arrived before the SDP answer was sent.
""")

{[], state}

%{status: :connected, pad: pad, track: %{kind: kind}, first_packet_received: false} ->
timer_action =
if kind == :video and state.keyframe_interval != nil do
[start_timer: {{:request_keyframe, id}, state.keyframe_interval}]
else
[]
end

state =
Bunch.Struct.update_in(state, [:output_tracks, id], fn output_track ->
%{output_track | first_packet_received: true}
end)

{[buffer: {pad, buffer}] ++ timer_action, state}

%{status: :connected, pad: pad} ->
{[buffer: {pad, buffer}], state}
end
end

Expand Down Expand Up @@ -152,8 +197,15 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
receive_new_tracks()
|> Enum.map_reduce(state.awaiting_outputs, fn track, awaiting_outputs ->
case List.keytake(awaiting_outputs, track.kind, 0) do
nil -> {{track.id, {:awaiting, track}}, awaiting_outputs}
{{_kind, pad}, awaiting_outputs} -> {{track.id, {:connected, pad}}, awaiting_outputs}
nil ->
{{track.id,
%{status: :awaiting, track: track, pad: nil, first_packet_received: false}},
awaiting_outputs}

{{_kind, pad}, awaiting_outputs} ->
{{track.id,
%{status: :connected, track: track, pad: pad, first_packet_received: false}},
awaiting_outputs}
end
end)

Expand All @@ -165,8 +217,8 @@ defmodule Membrane.WebRTC.ExWebRTCSource do

tracks_notification =
Enum.flat_map(new_tracks, fn
{_id, {:awaiting, track}} -> [track]
_other -> []
{_id, %{status: :awaiting, track: track}} -> [track]
_connected_track -> []
end)
|> case do
[] -> []
Expand All @@ -175,8 +227,11 @@ defmodule Membrane.WebRTC.ExWebRTCSource do

stream_formats =
Enum.flat_map(new_tracks, fn
{_id, {:connected, pad}} -> [stream_format: {pad, %Membrane.RTP{}}]
_other -> []
{_id, %{status: :connected, pad: pad}} ->
[stream_format: {pad, %Membrane.RTP{}}]

_other ->
[]
end)

{tracks_notification ++ stream_formats, state}
Expand Down Expand Up @@ -212,11 +267,14 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
handle_close(ctx, state)
end

@impl true
def handle_tick({:request_keyframe, track_id}, _ctx, state) do
:ok = PeerConnection.send_pli(state.pc, track_id)
{[], state}
end

defp maybe_answer(state) do
if Enum.all?(state.output_tracks, fn
{_id, {:connected, _pad}} -> true
_track -> false
end) do
if Enum.all?(state.output_tracks, fn {_id, %{status: status}} -> status == :connected end) do
%{pc: pc} = state
{:ok, answer} = PeerConnection.create_answer(pc)
:ok = PeerConnection.set_local_description(pc, answer)
Expand All @@ -232,6 +290,7 @@ defmodule Membrane.WebRTC.ExWebRTCSource do
end
end

@spec receive_new_tracks() :: [ExWebRTC.MediaStreamTrack.t()]
defp receive_new_tracks(), do: do_receive_new_tracks([])

defp do_receive_new_tracks(acc) do
Expand Down
8 changes: 7 additions & 1 deletion lib/membrane_webrtc/ex_webrtc/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ defmodule Membrane.WebRTC.ExWebRTCUtils do
%RTPCodecParameters{
payload_type: 96,
mime_type: "video/H264",
clock_rate: codec_clock_rate(:h264)
clock_rate: codec_clock_rate(:h264),
sdp_fmtp_line: %ExSDP.Attribute.FMTP{
pt: 96,
level_asymmetry_allowed: 1,
packetization_mode: 1,
profile_level_id: 0x42E01F
}
}
]
end
Expand Down
31 changes: 20 additions & 11 deletions lib/membrane_webrtc/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ defmodule Membrane.WebRTC.Source do
spec: :vp8 | :h264,
default: :vp8
],
keyframe_interval: [
spec: Membrane.Time.t() | nil,
default: nil,
description: """
If set, a keyframe will be requested as often as specified on each video
track.
"""
],
ice_servers: [
spec: [ExWebRTC.PeerConnection.Configuration.ice_server()],
default: [%{urls: "stun:stun.l.google.com:19302"}]
Expand Down Expand Up @@ -68,7 +76,8 @@ defmodule Membrane.WebRTC.Source do
child(:webrtc, %ExWebRTCSource{
signaling: signaling,
video_codec: opts.video_codec,
ice_servers: opts.ice_servers
ice_servers: opts.ice_servers,
keyframe_interval: opts.keyframe_interval
})

state = %{tracks: %{}} |> Map.merge(opts)
Expand All @@ -93,7 +102,7 @@ defmodule Membrane.WebRTC.Source do
spec =
get_child(:webrtc)
|> via_out(pad_ref, options: [kind: kind])
|> then(if state.depayload_rtp, do: &child(&1, get_depayloader(kind, state)), else: & &1)
|> then(if state.depayload_rtp, do: &get_depayloader(&1, kind, state), else: & &1)
|> bin_output(pad_ref)

{[spec: spec], state}
Expand All @@ -106,24 +115,24 @@ defmodule Membrane.WebRTC.Source do
{[notify_parent: {:new_tracks, tracks}], state}
end

defp get_depayloader(:audio, _state) do
%Membrane.RTP.DepayloaderBin{
defp get_depayloader(builder, :audio, _state) do
child(builder, %Membrane.RTP.DepayloaderBin{
depayloader: Membrane.RTP.Opus.Depayloader,
clock_rate: ExWebRTCUtils.codec_clock_rate(:opus)
}
})
end

defp get_depayloader(:video, %{video_codec: :vp8}) do
%Membrane.RTP.DepayloaderBin{
defp get_depayloader(builder, :video, %{video_codec: :vp8}) do
child(builder, %Membrane.RTP.DepayloaderBin{
depayloader: Membrane.RTP.VP8.Depayloader,
clock_rate: ExWebRTCUtils.codec_clock_rate(:vp8)
}
})
end

defp get_depayloader(:video, %{video_codec: :h264}) do
%Membrane.RTP.DepayloaderBin{
defp get_depayloader(builder, :video, %{video_codec: :h264}) do
child(builder, %Membrane.RTP.DepayloaderBin{
depayloader: Membrane.RTP.H264.Depayloader,
clock_rate: ExWebRTCUtils.codec_clock_rate(:h264)
}
})
end
end
Loading

0 comments on commit 7dadcc2

Please sign in to comment.