diff --git a/examples/save_to_file/lib/save_to_file/peer_handler.ex b/examples/save_to_file/lib/save_to_file/peer_handler.ex index 11a9f73..e0e72ab 100644 --- a/examples/save_to_file/lib/save_to_file/peer_handler.ex +++ b/examples/save_to_file/lib/save_to_file/peer_handler.ex @@ -78,18 +78,21 @@ defmodule SaveToFile.PeerHandler do end @impl true - def handle_info({:jitter_buffer, buffer, msg}, state) do - cond do - buffer == state.video_buffer -> handle_video_buffer_msg(msg, state) - buffer == state.audio_buffer -> handle_audio_buffer_msg(msg, state) - true -> {:ok, state} + def handle_info({:jitter_buffer_timer, kind}, state) do + case kind do + :video -> state.video_buffer + :audio -> state.audio_buffer end + |> JitterBuffer.handle_timer() + |> handle_jitter_buffer_result(kind, state) end @impl true def terminate(reason, state) do Logger.warning("WebSocket connection was terminated, reason: #{inspect(reason)}") + state = flush_jitter_buffers(state) + if state.video_writer, do: IVF.Writer.close(state.video_writer) if state.audio_writer, do: Ogg.Writer.close(state.audio_writer) end @@ -155,8 +158,7 @@ defmodule SaveToFile.PeerHandler do ) {:ok, video_depayloader} = @video_codecs |> hd() |> Depayloader.new() - {:ok, video_buffer} = JitterBuffer.start_link(latency: @jitter_buffer_latency_ms) - JitterBuffer.start_timer(video_buffer) + video_buffer = JitterBuffer.new(latency: @jitter_buffer_latency_ms) state = %{ state @@ -173,8 +175,7 @@ defmodule SaveToFile.PeerHandler do # by default uses 1 mono channel and 48k clock rate {:ok, audio_writer} = Ogg.Writer.open(@audio_file) {:ok, audio_depayloader} = @audio_codecs |> hd() |> Depayloader.new() - {:ok, audio_buffer} = JitterBuffer.start_link(latency: @jitter_buffer_latency_ms) - JitterBuffer.start_timer(audio_buffer) + audio_buffer = JitterBuffer.new(latency: @jitter_buffer_latency_ms) state = %{ state @@ -194,42 +195,78 @@ defmodule SaveToFile.PeerHandler do end defp handle_webrtc_msg({:rtp, id, nil, packet}, %{video_track_id: id} = state) do - JitterBuffer.place_packet(state.video_buffer, packet) - {:ok, state} + state.video_buffer + |> JitterBuffer.place_packet(packet) + |> handle_jitter_buffer_result(:video, state) end defp handle_webrtc_msg({:rtp, id, nil, packet}, %{audio_track_id: id} = state) do - JitterBuffer.place_packet(state.audio_buffer, packet) - {:ok, state} + state.audio_buffer + |> JitterBuffer.place_packet(packet) + |> handle_jitter_buffer_result(:audio, state) end defp handle_webrtc_msg(_msg, state), do: {:ok, state} - defp handle_video_buffer_msg({:packet, packet}, state) do + defp handle_jitter_buffer_result({buffer, packets, timer}, kind, state) do state = - case Depayloader.depayload(state.video_depayloader, packet) do - {nil, video_depayloader} -> - %{state | video_depayloader: video_depayloader} - - {vp8_frame, video_depayloader} -> - frame = %IVF.Frame{timestamp: state.frames_cnt, data: vp8_frame} - {:ok, video_writer} = IVF.Writer.write_frame(state.video_writer, frame) - - %{ - state - | video_depayloader: video_depayloader, - video_writer: video_writer, - frames_cnt: state.frames_cnt + 1 - } + case kind do + :video -> %{state | video_buffer: buffer} + :audio -> %{state | audio_buffer: buffer} end + state = + Enum.reduce(packets, state, fn packet, state -> handle_packet(packet, kind, state) end) + + unless is_nil(timer), do: Process.send_after(self(), {:jitter_buffer_timer, kind}, timer) + {:ok, state} end - defp handle_audio_buffer_msg({:packet, packet}, state) do + defp handle_packet(packet, :video, state) do + case Depayloader.depayload(state.video_depayloader, packet) do + {nil, video_depayloader} -> + %{state | video_depayloader: video_depayloader} + + {vp8_frame, video_depayloader} -> + frame = %IVF.Frame{timestamp: state.frames_cnt, data: vp8_frame} + {:ok, video_writer} = IVF.Writer.write_frame(state.video_writer, frame) + + %{ + state + | video_depayloader: video_depayloader, + video_writer: video_writer, + frames_cnt: state.frames_cnt + 1 + } + end + end + + defp handle_packet(packet, :audio, state) do {opus_packet, depayloader} = Depayloader.depayload(state.audio_depayloader, packet) {:ok, audio_writer} = Ogg.Writer.write_packet(state.audio_writer, opus_packet) - {:ok, %{state | audio_depayloader: depayloader, audio_writer: audio_writer}} + %{state | audio_depayloader: depayloader, audio_writer: audio_writer} + end + + defp flush_jitter_buffers(state), + do: state |> flush_jitter_buffer(:video) |> flush_jitter_buffer(:audio) + + defp flush_jitter_buffer(state, kind) do + buffer = + case kind do + :video -> state.video_buffer + :audio -> state.audio_buffer + end + + if is_nil(buffer) do + state + else + {:ok, state} = + buffer + |> JitterBuffer.flush() + |> handle_jitter_buffer_result(kind, state) + + state + end end end