Skip to content

Commit

Permalink
Update demo
Browse files Browse the repository at this point in the history
  • Loading branch information
sgfn committed Aug 21, 2024
1 parent 9f86562 commit e71b3e5
Showing 1 changed file with 67 additions and 30 deletions.
97 changes: 67 additions & 30 deletions examples/save_to_file/lib/save_to_file/peer_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,21 @@ defmodule SaveToFile.PeerHandler do
end

@impl true
def handle_info({:jitter_buffer, buffer, msg}, state) do
cond do
buffer == state.video_buffer -> handle_video_buffer_msg(msg, state)
buffer == state.audio_buffer -> handle_audio_buffer_msg(msg, state)
true -> {:ok, state}
def handle_info({:jitter_buffer_timer, kind}, state) do
case kind do
:video -> state.video_buffer
:audio -> state.audio_buffer
end
|> JitterBuffer.handle_timer()
|> handle_jitter_buffer_result(kind, state)
end

@impl true
def terminate(reason, state) do
Logger.warning("WebSocket connection was terminated, reason: #{inspect(reason)}")

state = flush_jitter_buffers(state)

if state.video_writer, do: IVF.Writer.close(state.video_writer)
if state.audio_writer, do: Ogg.Writer.close(state.audio_writer)
end
Expand Down Expand Up @@ -155,8 +158,7 @@ defmodule SaveToFile.PeerHandler do
)

{:ok, video_depayloader} = @video_codecs |> hd() |> Depayloader.new()
{:ok, video_buffer} = JitterBuffer.start_link(latency: @jitter_buffer_latency_ms)
JitterBuffer.start_timer(video_buffer)
video_buffer = JitterBuffer.new(latency: @jitter_buffer_latency_ms)

state = %{
state
Expand All @@ -173,8 +175,7 @@ defmodule SaveToFile.PeerHandler do
# by default uses 1 mono channel and 48k clock rate
{:ok, audio_writer} = Ogg.Writer.open(@audio_file)
{:ok, audio_depayloader} = @audio_codecs |> hd() |> Depayloader.new()
{:ok, audio_buffer} = JitterBuffer.start_link(latency: @jitter_buffer_latency_ms)
JitterBuffer.start_timer(audio_buffer)
audio_buffer = JitterBuffer.new(latency: @jitter_buffer_latency_ms)

state = %{
state
Expand All @@ -194,42 +195,78 @@ defmodule SaveToFile.PeerHandler do
end

defp handle_webrtc_msg({:rtp, id, nil, packet}, %{video_track_id: id} = state) do
JitterBuffer.place_packet(state.video_buffer, packet)
{:ok, state}
state.video_buffer
|> JitterBuffer.place_packet(packet)
|> handle_jitter_buffer_result(:video, state)
end

defp handle_webrtc_msg({:rtp, id, nil, packet}, %{audio_track_id: id} = state) do
JitterBuffer.place_packet(state.audio_buffer, packet)
{:ok, state}
state.audio_buffer
|> JitterBuffer.place_packet(packet)
|> handle_jitter_buffer_result(:audio, state)
end

defp handle_webrtc_msg(_msg, state), do: {:ok, state}

defp handle_video_buffer_msg({:packet, packet}, state) do
defp handle_jitter_buffer_result({buffer, packets, timer}, kind, state) do
state =
case Depayloader.depayload(state.video_depayloader, packet) do
{nil, video_depayloader} ->
%{state | video_depayloader: video_depayloader}

{vp8_frame, video_depayloader} ->
frame = %IVF.Frame{timestamp: state.frames_cnt, data: vp8_frame}
{:ok, video_writer} = IVF.Writer.write_frame(state.video_writer, frame)

%{
state
| video_depayloader: video_depayloader,
video_writer: video_writer,
frames_cnt: state.frames_cnt + 1
}
case kind do
:video -> %{state | video_buffer: buffer}
:audio -> %{state | audio_buffer: buffer}
end

state =
Enum.reduce(packets, state, fn packet, state -> handle_packet(packet, kind, state) end)

unless is_nil(timer), do: Process.send_after(self(), {:jitter_buffer_timer, kind}, timer)

{:ok, state}
end

defp handle_audio_buffer_msg({:packet, packet}, state) do
defp handle_packet(packet, :video, state) do
case Depayloader.depayload(state.video_depayloader, packet) do
{nil, video_depayloader} ->
%{state | video_depayloader: video_depayloader}

{vp8_frame, video_depayloader} ->
frame = %IVF.Frame{timestamp: state.frames_cnt, data: vp8_frame}
{:ok, video_writer} = IVF.Writer.write_frame(state.video_writer, frame)

%{
state
| video_depayloader: video_depayloader,
video_writer: video_writer,
frames_cnt: state.frames_cnt + 1
}
end
end

defp handle_packet(packet, :audio, state) do
{opus_packet, depayloader} = Depayloader.depayload(state.audio_depayloader, packet)
{:ok, audio_writer} = Ogg.Writer.write_packet(state.audio_writer, opus_packet)

{:ok, %{state | audio_depayloader: depayloader, audio_writer: audio_writer}}
%{state | audio_depayloader: depayloader, audio_writer: audio_writer}
end

defp flush_jitter_buffers(state),
do: state |> flush_jitter_buffer(:video) |> flush_jitter_buffer(:audio)

defp flush_jitter_buffer(state, kind) do
buffer =
case kind do
:video -> state.video_buffer
:audio -> state.audio_buffer
end

if is_nil(buffer) do
state
else
{:ok, state} =
buffer
|> JitterBuffer.flush()
|> handle_jitter_buffer_result(kind, state)

state
end
end
end

0 comments on commit e71b3e5

Please sign in to comment.