From 68648b3b2b81ddc5aa2c3a598cec9f9e3f20444f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Thu, 4 Jul 2024 14:02:16 +0200 Subject: [PATCH 01/34] Move sample local user observer to the common directory. Add send_keyframe signal. Use it in onIntraRequestReceived callback. Add dependency do membrane_h264_ffmpeg_plugin where Membrane keyframe request event is. --- .../{source => }/sample_local_user_observer.cpp | 9 +++++++++ .../{source => }/sample_local_user_observer.h | 5 +++-- c_src/membrane_agora_plugin/sink.cpp | 3 +++ c_src/membrane_agora_plugin/sink.spec.exs | 1 + mix.exs | 2 ++ 5 files changed, 18 insertions(+), 2 deletions(-) rename c_src/membrane_agora_plugin/{source => }/sample_local_user_observer.cpp (94%) rename c_src/membrane_agora_plugin/{source => }/sample_local_user_observer.h (96%) diff --git a/c_src/membrane_agora_plugin/source/sample_local_user_observer.cpp b/c_src/membrane_agora_plugin/sample_local_user_observer.cpp similarity index 94% rename from c_src/membrane_agora_plugin/source/sample_local_user_observer.cpp rename to c_src/membrane_agora_plugin/sample_local_user_observer.cpp index 5441815..745fbae 100644 --- a/c_src/membrane_agora_plugin/source/sample_local_user_observer.cpp +++ b/c_src/membrane_agora_plugin/sample_local_user_observer.cpp @@ -71,3 +71,12 @@ void SampleLocalUserObserver::onVideoTrackUnpublished(agora_refptr videoTrack) {} void SampleLocalUserObserver::onAudioTrackUnpublished(agora_refptr audioTrack) {} void SampleLocalUserObserver::onAudioTrackPublishStart(agora_refptr audioTrack) {} +void onIntraRequestReceived() { + printf("Keyframe request received \n"); + + if (_destination.has_value()) { + UnifexEnv *env = unifex_alloc_env(NULL); + send_keyframe_request(env, _destination.value(), UNIFEX_SEND_THREADED); + unifex_free_env(env); + } +} diff --git a/c_src/membrane_agora_plugin/source/sample_local_user_observer.h b/c_src/membrane_agora_plugin/sample_local_user_observer.h similarity index 96% rename from c_src/membrane_agora_plugin/source/sample_local_user_observer.h rename to c_src/membrane_agora_plugin/sample_local_user_observer.h index 6fe6243..d302501 100644 --- a/c_src/membrane_agora_plugin/source/sample_local_user_observer.h +++ b/c_src/membrane_agora_plugin/sample_local_user_observer.h @@ -1,5 +1,6 @@ #pragma once +#include #include "AgoraBase.h" #include "NGIAgoraLocalUser.h" @@ -8,8 +9,8 @@ using namespace agora::rtc; class SampleLocalUserObserver : public ILocalUserObserver { public: - SampleLocalUserObserver() {} - ~SampleLocalUserObserver() {} + SampleLocalUserObserver(): _destination(std::nullopt){} + ~SampleLocalUserObserver(UnifexPid destination): _destination(destination) {} void onAudioTrackPublishSuccess( agora_refptr audioTrack) override; void onLocalAudioTrackStatistics(const LocalAudioStats &stats) override; diff --git a/c_src/membrane_agora_plugin/sink.cpp b/c_src/membrane_agora_plugin/sink.cpp index b420414..b6a53bd 100644 --- a/c_src/membrane_agora_plugin/sink.cpp +++ b/c_src/membrane_agora_plugin/sink.cpp @@ -44,6 +44,8 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, auto connObserver = std::make_shared(state->connection); state->connection->registerObserver(connObserver.get()); + auto localUserObserver = std::make_shared(connection->getLocalUser()); + int connection_res = state->connection->connect(token, channelId, userId); if (connection_res) { AG_LOG(ERROR, "Failed to connect to Agora channel!"); @@ -102,6 +104,7 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, // cleaning up state->connection->unregisterObserver(connObserver.get()); + localUserObserver.reset(); connObserver.reset(); UNIFEX_TERM res = create_result_ok(env, state); unifex_release_state(env, state); diff --git a/c_src/membrane_agora_plugin/sink.spec.exs b/c_src/membrane_agora_plugin/sink.spec.exs index fb9ac9a..86ba215 100644 --- a/c_src/membrane_agora_plugin/sink.spec.exs +++ b/c_src/membrane_agora_plugin/sink.spec.exs @@ -38,5 +38,6 @@ spec( sends {:user_joined :: label, id :: string} sends {:user_left :: label, id :: string} +sends {:keyframe_request :: label} dirty :cpu, [:create, :write_video_data, :update_video_stream_format, :write_audio_data, :update_audio_stream_format] diff --git a/mix.exs b/mix.exs index aa43614..5f457f1 100644 --- a/mix.exs +++ b/mix.exs @@ -47,6 +47,8 @@ defmodule Membrane.Agora.Mixfile do {:membrane_h26x_plugin, "~> 0.10.0", only: :test}, {:membrane_aac_plugin, "~> 0.18.1", only: :test}, {:membrane_realtimer_plugin, "~> 0.9.0", only: :test}, + {:membrane_h264_ffmpeg_plugin, + github: "membraneframework/membrane_h264_ffmpeg_plugin", branch: "force_keyframes"}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:dialyxir, ">= 0.0.0", only: :dev, runtime: false}, {:credo, ">= 0.0.0", only: :dev, runtime: false} From 97c61f78cd69097c8f1d7e38da40cb8e66e024d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Thu, 4 Jul 2024 15:10:32 +0200 Subject: [PATCH 02/34] Send event in the membrane element --- c_src/membrane_agora_plugin/sink.cpp | 5 +++-- c_src/membrane_agora_plugin/sink.spec.exs | 3 ++- c_src/membrane_agora_plugin/source.cpp | 2 +- lib/agora/agora_sink.ex | 7 ++++++- mix.lock | 4 ++++ 5 files changed, 16 insertions(+), 5 deletions(-) diff --git a/c_src/membrane_agora_plugin/sink.cpp b/c_src/membrane_agora_plugin/sink.cpp index b6a53bd..8fe7657 100644 --- a/c_src/membrane_agora_plugin/sink.cpp +++ b/c_src/membrane_agora_plugin/sink.cpp @@ -1,4 +1,5 @@ #include "sink.h" +#include "sample_local_user_observer.h" // Once SIGINT is delivered twice to the process that runs the elixir script // (e.g. with ctrl+c), the script terminates. Sometimes it results in a @@ -7,7 +8,7 @@ // SIGTERM. UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, - char *userId) { + char *userId, UnifexPid destination) { // sink's native state initialization SinkState *state = unifex_alloc_state(env); @@ -44,7 +45,7 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, auto connObserver = std::make_shared(state->connection); state->connection->registerObserver(connObserver.get()); - auto localUserObserver = std::make_shared(connection->getLocalUser()); + auto localUserObserver = std::make_shared(destination); int connection_res = state->connection->connect(token, channelId, userId); if (connection_res) { diff --git a/c_src/membrane_agora_plugin/sink.spec.exs b/c_src/membrane_agora_plugin/sink.spec.exs index 86ba215..5bf6bb8 100644 --- a/c_src/membrane_agora_plugin/sink.spec.exs +++ b/c_src/membrane_agora_plugin/sink.spec.exs @@ -7,7 +7,8 @@ spec( app_id :: string, token :: string, channel_id :: string, - user_id :: string + user_id :: string, + destination :: pid ) :: {:ok :: label, state} | {:error :: label, reason :: atom} ) diff --git a/c_src/membrane_agora_plugin/source.cpp b/c_src/membrane_agora_plugin/source.cpp index 1a6d385..e7bfb94 100644 --- a/c_src/membrane_agora_plugin/source.cpp +++ b/c_src/membrane_agora_plugin/source.cpp @@ -1,7 +1,7 @@ #include "source.h" #include "source/sample_audio_frame_observer.h" -#include "source/sample_local_user_observer.h" #include "source/sample_video_encoded_frame_observer.h" +#include "sample_local_user_observer.h" UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, char *userId, UnifexPid destination) { diff --git a/lib/agora/agora_sink.ex b/lib/agora/agora_sink.ex index c66bb87..c849e9f 100644 --- a/lib/agora/agora_sink.ex +++ b/lib/agora/agora_sink.ex @@ -64,7 +64,7 @@ defmodule Membrane.Agora.Sink do def handle_setup(_ctx, state) do {:ok, native_state} = try do - Native.create(state.app_id, state.token, state.channel_name, state.user_id) + Native.create(state.app_id, state.token, state.channel_name, state.user_id, self()) rescue _e in UndefinedFunctionError -> reraise( @@ -125,4 +125,9 @@ defmodule Membrane.Agora.Sink do :ok = Native.write_audio_data(buffer.payload, state.native_state) {[], state} end + + @impl true + def handle_info(:keyframe_request, _ctx, state) do + {[event: {:input, %Membrane.H264.FFmpeg.KeyframeRequestEvent{}}], state} + end end diff --git a/mix.lock b/mix.lock index 56285f6..a004d90 100644 --- a/mix.lock +++ b/mix.lock @@ -21,12 +21,16 @@ "makeup_erlang": {:hex, :makeup_erlang, "1.0.0", "6f0eff9c9c489f26b69b61440bf1b238d95badae49adac77973cbacae87e3c2e", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "ea7a9307de9d1548d2a72d299058d1fd2339e3d398560a0e46c27dab4891e4d2"}, "membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"}, "membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.18.1", "30433bffd4d5d773f79448dd9afd55d77338721688f09a89b20d742a68cc2c3d", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "8fd048c47d5d2949eb557e19f43f62d534d3af5096187f1a1a3a1694d14b772c"}, + "membrane_common_c": {:hex, :membrane_common_c, "0.16.0", "caf3f29d2f5a1d32d8c2c122866110775866db2726e4272be58e66dfdf4bce40", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "a3c7e91de1ce1f8b23b9823188a5d13654d317235ea0ca781c05353ed3be9b1c"}, "membrane_core": {:hex, :membrane_core, "1.0.1", "08aa546c0d131c66f8b906b3dfb2b8f2749b56859f6fc52bd3ac846b944b3baa", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a35ed68561bdf0a2dbb2f994333be78cf4e1c4d734e4cd927d77d92049bb1273"}, "membrane_file_plugin": {:hex, :membrane_file_plugin, "0.16.0", "7917f6682c22b9bcfc2ca20ed960eee0f7d03ad31fd5f59ed850f1fe3ddd545a", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "b0727998f75a9b4dab8a2baefdfc13c3eac00a04e061ab1b0e61dc5566927acc"}, + "membrane_h264_ffmpeg_plugin": {:git, "https://github.com/membraneframework/membrane_h264_ffmpeg_plugin.git", "c6f8674babf92d7d2b9f78056114a79a6918fd6c", [branch: "force_keyframes"]}, "membrane_h264_format": {:hex, :membrane_h264_format, "0.6.1", "44836cd9de0abe989b146df1e114507787efc0cf0da2368f17a10c47b4e0738c", [:mix], [], "hexpm", "4b79be56465a876d2eac2c3af99e115374bbdc03eb1dea4f696ee9a8033cd4b0"}, "membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"}, "membrane_h26x_plugin": {:hex, :membrane_h26x_plugin, "0.10.2", "caf2790d8c107df35f8d456b45f4e09fb9c56ce6c7669a3a03f7d59972e6ed82", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}], "hexpm", "becf1ac4a589adecd850137ccd61a33058f686083a514a7e39fcd721bcf9fb2e"}, + "membrane_precompiled_dependency_provider": {:hex, :membrane_precompiled_dependency_provider, "0.1.2", "8af73b7dc15ba55c9f5fbfc0453d4a8edfb007ade54b56c37d626be0d1189aba", [:mix], [{:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "7fe3e07361510445a29bee95336adde667c4162b76b7f4c8af3aeb3415292023"}, "membrane_raw_audio_format": {:hex, :membrane_raw_audio_format, "0.12.0", "b574cd90f69ce2a8b6201b0ccf0826ca28b0fbc8245b8078d9f11cef65f7d5d5", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "6e6c98e3622a2b9df19eab50ba65d7eb45949b1ba306fa8423df6cdb12fd0b44"}, + "membrane_raw_video_format": {:hex, :membrane_raw_video_format, "0.3.0", "ba10f475e0814a6fe79602a74536b796047577c7ef5b0e33def27cd344229699", [:mix], [], "hexpm", "2f08760061c8a5386ecf04273480f10e48d25a1a40aa99476302b0bcd34ccb1c"}, "membrane_realtimer_plugin": {:hex, :membrane_realtimer_plugin, "0.9.0", "27210d5e32a5e8bfd101c41e4d8c1876e873a52cc129ebfbee4d0ccbea1cbd21", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "b2e96d62135ee57ef9a5fdea94b3a9ab1198e5ea8ee248391b89c671125d1b51"}, "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, "mint": {:hex, :mint, "1.6.0", "88a4f91cd690508a04ff1c3e28952f322528934be541844d54e0ceb765f01d5e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "3c5ae85d90a5aca0a49c0d8b67360bbe407f3b54f1030a111047ff988e8fefaa"}, From 3f34572eca4282396ef8e3d00ab977b4ffdd1361 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Thu, 4 Jul 2024 15:14:14 +0200 Subject: [PATCH 03/34] Add missing field and constructor --- c_src/membrane_agora_plugin/sample_local_user_observer.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/c_src/membrane_agora_plugin/sample_local_user_observer.h b/c_src/membrane_agora_plugin/sample_local_user_observer.h index d302501..b14ed62 100644 --- a/c_src/membrane_agora_plugin/sample_local_user_observer.h +++ b/c_src/membrane_agora_plugin/sample_local_user_observer.h @@ -10,7 +10,8 @@ using namespace agora::rtc; class SampleLocalUserObserver : public ILocalUserObserver { public: SampleLocalUserObserver(): _destination(std::nullopt){} - ~SampleLocalUserObserver(UnifexPid destination): _destination(destination) {} + SampleLocalUserObserver(UnifexPid destination): _destination(destination) {} + ~SampleLocalUserObserver() {} void onAudioTrackPublishSuccess( agora_refptr audioTrack) override; void onLocalAudioTrackStatistics(const LocalAudioStats &stats) override; @@ -83,4 +84,7 @@ class SampleLocalUserObserver : public ILocalUserObserver { void onVideoTrackPublishStart(agora_refptr videoTrack) override; void onAudioTrackUnpublished(agora_refptr audioTrack) override; void onAudioTrackPublishStart(agora_refptr audioTrack) override; + + private: + std::optional _destination; }; \ No newline at end of file From 53c90baaaa215809422d08e1200c4bb029b1d372 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Thu, 4 Jul 2024 15:17:55 +0200 Subject: [PATCH 04/34] Update bundlex config --- bundlex.exs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bundlex.exs b/bundlex.exs index e4dc80c..b5935dc 100644 --- a/bundlex.exs +++ b/bundlex.exs @@ -12,7 +12,7 @@ defmodule Membrane.Agora.BundlexProject do [ sink: [ - sources: ["sink.cpp", "connection_observer.cpp"], + sources: ["sink.cpp", "connection_observer.cpp", "sample_local_user_observer.cpp"], includes: ["agora_sdk/include/"], libs: ["agora_rtc_sdk", "agora-ffmpeg"], lib_dirs: ["agora_sdk/"], @@ -27,7 +27,7 @@ defmodule Membrane.Agora.BundlexProject do "connection_observer.cpp", "source/sample_audio_frame_observer.cpp", "source/sample_video_encoded_frame_observer.cpp", - "source/sample_local_user_observer.cpp" + "sample_local_user_observer.cpp" ], includes: ["agora_sdk/include/"], libs: ["agora_rtc_sdk", "agora-ffmpeg"], From fd0fe4b794609a1fbbee0b80f1785808080f5ae1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Thu, 4 Jul 2024 15:20:51 +0200 Subject: [PATCH 05/34] Add missing include --- c_src/membrane_agora_plugin/sample_local_user_observer.h | 1 + 1 file changed, 1 insertion(+) diff --git a/c_src/membrane_agora_plugin/sample_local_user_observer.h b/c_src/membrane_agora_plugin/sample_local_user_observer.h index b14ed62..f15e7e6 100644 --- a/c_src/membrane_agora_plugin/sample_local_user_observer.h +++ b/c_src/membrane_agora_plugin/sample_local_user_observer.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include "AgoraBase.h" #include "NGIAgoraLocalUser.h" From bf54a4e407892eb8693777b3f99b176a5e1183cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Thu, 4 Jul 2024 15:22:53 +0200 Subject: [PATCH 06/34] Change spec of the signal. --- c_src/membrane_agora_plugin/connection_observer.h | 1 + c_src/membrane_agora_plugin/sample_local_user_observer.cpp | 2 +- c_src/membrane_agora_plugin/sink.spec.exs | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/c_src/membrane_agora_plugin/connection_observer.h b/c_src/membrane_agora_plugin/connection_observer.h index 9941ed9..560d72c 100644 --- a/c_src/membrane_agora_plugin/connection_observer.h +++ b/c_src/membrane_agora_plugin/connection_observer.h @@ -63,6 +63,7 @@ class ConnectionObserver : public IRtcConnectionObserver { void onLastmileProbeResult(const LastmileProbeResult &result) override; void onChannelMediaRelayStateChanged(int state, int code) override; + void onIntraRequestReceived() override; private: std::atomic _is_connected; diff --git a/c_src/membrane_agora_plugin/sample_local_user_observer.cpp b/c_src/membrane_agora_plugin/sample_local_user_observer.cpp index 745fbae..ea4b989 100644 --- a/c_src/membrane_agora_plugin/sample_local_user_observer.cpp +++ b/c_src/membrane_agora_plugin/sample_local_user_observer.cpp @@ -71,7 +71,7 @@ void SampleLocalUserObserver::onVideoTrackUnpublished(agora_refptr videoTrack) {} void SampleLocalUserObserver::onAudioTrackUnpublished(agora_refptr audioTrack) {} void SampleLocalUserObserver::onAudioTrackPublishStart(agora_refptr audioTrack) {} -void onIntraRequestReceived() { +void SampleLocalUserObserver::onIntraRequestReceived() { printf("Keyframe request received \n"); if (_destination.has_value()) { diff --git a/c_src/membrane_agora_plugin/sink.spec.exs b/c_src/membrane_agora_plugin/sink.spec.exs index 5bf6bb8..1b750e6 100644 --- a/c_src/membrane_agora_plugin/sink.spec.exs +++ b/c_src/membrane_agora_plugin/sink.spec.exs @@ -39,6 +39,6 @@ spec( sends {:user_joined :: label, id :: string} sends {:user_left :: label, id :: string} -sends {:keyframe_request :: label} +sends :keyframe_request :: label dirty :cpu, [:create, :write_video_data, :update_video_stream_format, :write_audio_data, :update_audio_stream_format] From 5566d9fb450ddcd63695b88b254e06278560913c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Thu, 4 Jul 2024 15:26:28 +0200 Subject: [PATCH 07/34] Move onIntraRequestReceived declaration to sample local user observer --- c_src/membrane_agora_plugin/connection_observer.h | 1 - c_src/membrane_agora_plugin/sample_local_user_observer.h | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/c_src/membrane_agora_plugin/connection_observer.h b/c_src/membrane_agora_plugin/connection_observer.h index 560d72c..9941ed9 100644 --- a/c_src/membrane_agora_plugin/connection_observer.h +++ b/c_src/membrane_agora_plugin/connection_observer.h @@ -63,7 +63,6 @@ class ConnectionObserver : public IRtcConnectionObserver { void onLastmileProbeResult(const LastmileProbeResult &result) override; void onChannelMediaRelayStateChanged(int state, int code) override; - void onIntraRequestReceived() override; private: std::atomic _is_connected; diff --git a/c_src/membrane_agora_plugin/sample_local_user_observer.h b/c_src/membrane_agora_plugin/sample_local_user_observer.h index f15e7e6..8955359 100644 --- a/c_src/membrane_agora_plugin/sample_local_user_observer.h +++ b/c_src/membrane_agora_plugin/sample_local_user_observer.h @@ -85,6 +85,8 @@ class SampleLocalUserObserver : public ILocalUserObserver { void onVideoTrackPublishStart(agora_refptr videoTrack) override; void onAudioTrackUnpublished(agora_refptr audioTrack) override; void onAudioTrackPublishStart(agora_refptr audioTrack) override; + void onIntraRequestReceived() override; + private: std::optional _destination; From 0338a2fbb5bed764faf3913bbb0531d3d6c940cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Thu, 4 Jul 2024 15:29:26 +0200 Subject: [PATCH 08/34] Add missing include --- c_src/membrane_agora_plugin/sample_local_user_observer.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/c_src/membrane_agora_plugin/sample_local_user_observer.cpp b/c_src/membrane_agora_plugin/sample_local_user_observer.cpp index ea4b989..a1f5630 100644 --- a/c_src/membrane_agora_plugin/sample_local_user_observer.cpp +++ b/c_src/membrane_agora_plugin/sample_local_user_observer.cpp @@ -1,3 +1,4 @@ +#include "./_generated/sink.h" #include "sample_local_user_observer.h" void SampleLocalUserObserver::onAudioTrackPublishSuccess( From 5e3da34ebff50d94f2d35651253f10c04c91f0ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Thu, 4 Jul 2024 18:00:57 +0200 Subject: [PATCH 09/34] Make environment constraints less strict --- bundlex.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bundlex.exs b/bundlex.exs index b5935dc..ba7f3cb 100644 --- a/bundlex.exs +++ b/bundlex.exs @@ -7,7 +7,7 @@ defmodule Membrane.Agora.BundlexProject do ] end - defp natives(%{architecture: "x86_64", os: "linux"}) do + defp natives(%{os: "linux"}) do unless System.get_env("AGORA_SDK_PRESENT") == "true", do: System.shell("./install.sh") [ From b47b93118d4588ee9f810a5b275fdc94eaf59935 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Fri, 5 Jul 2024 10:50:16 +0200 Subject: [PATCH 10/34] Add a sends spec to the source as a workaround --- c_src/membrane_agora_plugin/source.spec.exs | 1 + 1 file changed, 1 insertion(+) diff --git a/c_src/membrane_agora_plugin/source.spec.exs b/c_src/membrane_agora_plugin/source.spec.exs index 49657a2..35f9300 100644 --- a/c_src/membrane_agora_plugin/source.spec.exs +++ b/c_src/membrane_agora_plugin/source.spec.exs @@ -16,5 +16,6 @@ sends {:agora_audio_payload :: label, payload, id :: string} sends {:agora_video_payload :: label, payload, id :: int} sends {:user_joined :: label, id :: string} sends {:user_left :: label, id :: string} +sends :keyframe_request :: label dirty :cpu, [:create] From 55e2006b4fca615b227884643591522078c4ce87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Fri, 5 Jul 2024 11:42:12 +0200 Subject: [PATCH 11/34] Add log --- c_src/membrane_agora_plugin/sample_local_user_observer.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/c_src/membrane_agora_plugin/sample_local_user_observer.cpp b/c_src/membrane_agora_plugin/sample_local_user_observer.cpp index a1f5630..b565720 100644 --- a/c_src/membrane_agora_plugin/sample_local_user_observer.cpp +++ b/c_src/membrane_agora_plugin/sample_local_user_observer.cpp @@ -1,5 +1,6 @@ #include "./_generated/sink.h" #include "sample_local_user_observer.h" +#include "common.h" void SampleLocalUserObserver::onAudioTrackPublishSuccess( agora_refptr audioTrack) {} @@ -73,7 +74,7 @@ void SampleLocalUserObserver::onVideoTrackPublishStart(agora_refptr audioTrack) {} void SampleLocalUserObserver::onAudioTrackPublishStart(agora_refptr audioTrack) {} void SampleLocalUserObserver::onIntraRequestReceived() { - printf("Keyframe request received \n"); + AG_LOG(ERROR, "Keyframe request received"); if (_destination.has_value()) { UnifexEnv *env = unifex_alloc_env(NULL); From 81acea640f0ea00c5a7be2c32d5389cec9bdfb49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Fri, 5 Jul 2024 11:49:52 +0200 Subject: [PATCH 12/34] Register local user observer. Add elixir log --- c_src/membrane_agora_plugin/sink.cpp | 1 + lib/agora/agora_sink.ex | 1 + 2 files changed, 2 insertions(+) diff --git a/c_src/membrane_agora_plugin/sink.cpp b/c_src/membrane_agora_plugin/sink.cpp index 8fe7657..4fc29e5 100644 --- a/c_src/membrane_agora_plugin/sink.cpp +++ b/c_src/membrane_agora_plugin/sink.cpp @@ -46,6 +46,7 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, state->connection->registerObserver(connObserver.get()); auto localUserObserver = std::make_shared(destination); + state->connection_->getLocalUser()->registerLocalUserObserver(localUserObserver); int connection_res = state->connection->connect(token, channelId, userId); if (connection_res) { diff --git a/lib/agora/agora_sink.ex b/lib/agora/agora_sink.ex index c849e9f..71dbc53 100644 --- a/lib/agora/agora_sink.ex +++ b/lib/agora/agora_sink.ex @@ -128,6 +128,7 @@ defmodule Membrane.Agora.Sink do @impl true def handle_info(:keyframe_request, _ctx, state) do + IO.inspect("Requesting for keyframe") {[event: {:input, %Membrane.H264.FFmpeg.KeyframeRequestEvent{}}], state} end end From 09b0b6c78c42605f163e93bb8e206da8c449c231 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Fri, 5 Jul 2024 11:51:22 +0200 Subject: [PATCH 13/34] Fix typo --- c_src/membrane_agora_plugin/sink.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c_src/membrane_agora_plugin/sink.cpp b/c_src/membrane_agora_plugin/sink.cpp index 4fc29e5..39e48a3 100644 --- a/c_src/membrane_agora_plugin/sink.cpp +++ b/c_src/membrane_agora_plugin/sink.cpp @@ -46,7 +46,7 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, state->connection->registerObserver(connObserver.get()); auto localUserObserver = std::make_shared(destination); - state->connection_->getLocalUser()->registerLocalUserObserver(localUserObserver); + state->connection->getLocalUser()->registerLocalUserObserver(localUserObserver); int connection_res = state->connection->connect(token, channelId, userId); if (connection_res) { From 0ccfc6823016ee276369d6bba218f6ea1082536c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Fri, 5 Jul 2024 11:56:48 +0200 Subject: [PATCH 14/34] Derefer the shared pointer --- c_src/membrane_agora_plugin/sink.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c_src/membrane_agora_plugin/sink.cpp b/c_src/membrane_agora_plugin/sink.cpp index 39e48a3..215f6c9 100644 --- a/c_src/membrane_agora_plugin/sink.cpp +++ b/c_src/membrane_agora_plugin/sink.cpp @@ -46,7 +46,7 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, state->connection->registerObserver(connObserver.get()); auto localUserObserver = std::make_shared(destination); - state->connection->getLocalUser()->registerLocalUserObserver(localUserObserver); + state->connection->getLocalUser()->registerLocalUserObserver(localUserObserver.get()); int connection_res = state->connection->connect(token, channelId, userId); if (connection_res) { From 34c822674bae7af12084535c56549971c92ccc67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Fri, 5 Jul 2024 12:02:01 +0200 Subject: [PATCH 15/34] Make sure keyframe is requested only when playback is playing. Add proper handling of other events --- lib/agora/agora_sink.ex | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/agora/agora_sink.ex b/lib/agora/agora_sink.ex index 71dbc53..fbfb342 100644 --- a/lib/agora/agora_sink.ex +++ b/lib/agora/agora_sink.ex @@ -127,8 +127,13 @@ defmodule Membrane.Agora.Sink do end @impl true - def handle_info(:keyframe_request, _ctx, state) do + def handle_info(:keyframe_request, %{playback: :playing}, state) do IO.inspect("Requesting for keyframe") {[event: {:input, %Membrane.H264.FFmpeg.KeyframeRequestEvent{}}], state} end + + @impl true + def handle_info(_msg, _ctx, state) do + {[], state} + end end From 3a7f54f6cfb3b5c3d49e8c9447bb98c8c1bb0924 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Fri, 5 Jul 2024 12:18:15 +0200 Subject: [PATCH 16/34] Add debug log --- lib/agora/agora_sink.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/agora/agora_sink.ex b/lib/agora/agora_sink.ex index fbfb342..b3b1e75 100644 --- a/lib/agora/agora_sink.ex +++ b/lib/agora/agora_sink.ex @@ -133,7 +133,8 @@ defmodule Membrane.Agora.Sink do end @impl true - def handle_info(_msg, _ctx, state) do + def handle_info(msg, _ctx, state) do + IO.inspect(msg, label: :unhandled_msg) {[], state} end end From 480f5e5cf576c4265956fdb2b9486cd287cef017 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Fri, 5 Jul 2024 12:35:18 +0200 Subject: [PATCH 17/34] Change the pad name while requesting for keyframe --- lib/agora/agora_sink.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/agora/agora_sink.ex b/lib/agora/agora_sink.ex index b3b1e75..7961a20 100644 --- a/lib/agora/agora_sink.ex +++ b/lib/agora/agora_sink.ex @@ -129,7 +129,7 @@ defmodule Membrane.Agora.Sink do @impl true def handle_info(:keyframe_request, %{playback: :playing}, state) do IO.inspect("Requesting for keyframe") - {[event: {:input, %Membrane.H264.FFmpeg.KeyframeRequestEvent{}}], state} + {[event: {:video, %Membrane.H264.FFmpeg.KeyframeRequestEvent{}}], state} end @impl true From 1cd55de31aaed63b5a621c9d9bb9cac1f25db00a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Fri, 5 Jul 2024 13:20:05 +0200 Subject: [PATCH 18/34] Remove the resetting of the localUserObserver --- c_src/membrane_agora_plugin/sink.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c_src/membrane_agora_plugin/sink.cpp b/c_src/membrane_agora_plugin/sink.cpp index 215f6c9..2f4bef8 100644 --- a/c_src/membrane_agora_plugin/sink.cpp +++ b/c_src/membrane_agora_plugin/sink.cpp @@ -106,7 +106,7 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, // cleaning up state->connection->unregisterObserver(connObserver.get()); - localUserObserver.reset(); + //localUserObserver.reset(); connObserver.reset(); UNIFEX_TERM res = create_result_ok(env, state); unifex_release_state(env, state); From e4e184813083c5f677c07c144c84a506dbca0aff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Fri, 5 Jul 2024 14:18:31 +0200 Subject: [PATCH 19/34] fix segfault with connection being empty in the source --- .../sample_local_user_observer.h | 1 - c_src/membrane_agora_plugin/sink.cpp | 2 +- c_src/membrane_agora_plugin/source.cpp | 27 +++++++++---------- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/c_src/membrane_agora_plugin/sample_local_user_observer.h b/c_src/membrane_agora_plugin/sample_local_user_observer.h index 8955359..1d71431 100644 --- a/c_src/membrane_agora_plugin/sample_local_user_observer.h +++ b/c_src/membrane_agora_plugin/sample_local_user_observer.h @@ -87,7 +87,6 @@ class SampleLocalUserObserver : public ILocalUserObserver { void onAudioTrackPublishStart(agora_refptr audioTrack) override; void onIntraRequestReceived() override; - private: std::optional _destination; }; \ No newline at end of file diff --git a/c_src/membrane_agora_plugin/sink.cpp b/c_src/membrane_agora_plugin/sink.cpp index 2f4bef8..215f6c9 100644 --- a/c_src/membrane_agora_plugin/sink.cpp +++ b/c_src/membrane_agora_plugin/sink.cpp @@ -106,7 +106,7 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, // cleaning up state->connection->unregisterObserver(connObserver.get()); - //localUserObserver.reset(); + localUserObserver.reset(); connObserver.reset(); UNIFEX_TERM res = create_result_ok(env, state); unifex_release_state(env, state); diff --git a/c_src/membrane_agora_plugin/source.cpp b/c_src/membrane_agora_plugin/source.cpp index e7bfb94..d49dbfc 100644 --- a/c_src/membrane_agora_plugin/source.cpp +++ b/c_src/membrane_agora_plugin/source.cpp @@ -77,22 +77,21 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, } void handle_destroy_state(UnifexEnv *env, SourceState *state) { - state->connection->unregisterObserver(state->connObserver.get()); - state->connObserver.reset(); - - state->connection->getLocalUser()->unregisterLocalUserObserver( - state->localUserObserver.get()); - state->connection->getLocalUser()->unregisterVideoEncodedFrameObserver( - state->videoEncodedFrameObserver.get()); - state->connection->getLocalUser()->unregisterAudioFrameObserver( - state->audioFrameObserver.get()); - - state->localUserObserver.reset(); - state->videoEncodedFrameObserver.reset(); - state->audioFrameObserver.reset(); - UNUSED(env); if (state->connection) { + state->connection->unregisterObserver(state->connObserver.get()); + state->connObserver.reset(); + + state->connection->getLocalUser()->unregisterLocalUserObserver( + state->localUserObserver.get()); + state->connection->getLocalUser()->unregisterVideoEncodedFrameObserver( + state->videoEncodedFrameObserver.get()); + state->connection->getLocalUser()->unregisterAudioFrameObserver( + state->audioFrameObserver.get()); + + state->localUserObserver.reset(); + state->videoEncodedFrameObserver.reset(); + state->audioFrameObserver.reset(); if (state->connection->disconnect()) { AG_LOG(ERROR, "Failed to disconnect from Agora channel!"); return; From c3456628199ceeeff4fdb765a62f0cbbedfbda0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Fri, 5 Jul 2024 14:55:10 +0200 Subject: [PATCH 20/34] Disable local user observer --- c_src/membrane_agora_plugin/sink.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/c_src/membrane_agora_plugin/sink.cpp b/c_src/membrane_agora_plugin/sink.cpp index 215f6c9..b3b8e37 100644 --- a/c_src/membrane_agora_plugin/sink.cpp +++ b/c_src/membrane_agora_plugin/sink.cpp @@ -45,8 +45,8 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, auto connObserver = std::make_shared(state->connection); state->connection->registerObserver(connObserver.get()); - auto localUserObserver = std::make_shared(destination); - state->connection->getLocalUser()->registerLocalUserObserver(localUserObserver.get()); + // auto localUserObserver = std::make_shared(destination); + // state->connection->getLocalUser()->registerLocalUserObserver(localUserObserver.get()); int connection_res = state->connection->connect(token, channelId, userId); if (connection_res) { @@ -106,7 +106,7 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, // cleaning up state->connection->unregisterObserver(connObserver.get()); - localUserObserver.reset(); + //localUserObserver.reset(); connObserver.reset(); UNIFEX_TERM res = create_result_ok(env, state); unifex_release_state(env, state); From d551273aebd3b40e4af36a81ba5acdc4dd955329 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Fri, 5 Jul 2024 15:35:35 +0200 Subject: [PATCH 21/34] Create user observer --- c_src/membrane_agora_plugin/sink.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c_src/membrane_agora_plugin/sink.cpp b/c_src/membrane_agora_plugin/sink.cpp index b3b8e37..981bfcb 100644 --- a/c_src/membrane_agora_plugin/sink.cpp +++ b/c_src/membrane_agora_plugin/sink.cpp @@ -45,7 +45,7 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, auto connObserver = std::make_shared(state->connection); state->connection->registerObserver(connObserver.get()); - // auto localUserObserver = std::make_shared(destination); + auto localUserObserver = std::make_shared(destination); // state->connection->getLocalUser()->registerLocalUserObserver(localUserObserver.get()); int connection_res = state->connection->connect(token, channelId, userId); From cd2ca09abfbf6e25175efb4efa55e024b9caf2ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Fri, 5 Jul 2024 17:11:41 +0200 Subject: [PATCH 22/34] Make sure local user observer is unregistered --- c_src/membrane_agora_plugin/sample_local_user_observer.h | 9 +++++++-- c_src/membrane_agora_plugin/sink.cpp | 4 +--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/c_src/membrane_agora_plugin/sample_local_user_observer.h b/c_src/membrane_agora_plugin/sample_local_user_observer.h index 1d71431..d6796da 100644 --- a/c_src/membrane_agora_plugin/sample_local_user_observer.h +++ b/c_src/membrane_agora_plugin/sample_local_user_observer.h @@ -11,8 +11,12 @@ using namespace agora::rtc; class SampleLocalUserObserver : public ILocalUserObserver { public: SampleLocalUserObserver(): _destination(std::nullopt){} - SampleLocalUserObserver(UnifexPid destination): _destination(destination) {} - ~SampleLocalUserObserver() {} + SampleLocalUserObserver(ILocalUser *localUser, UnifexPid destination): _localUser(localUser), _destination(destination) { + _localUser->registerLocalUserObserver(this); + } + ~SampleLocalUserObserver() { + _localUser->unregisterLocalUserObserver(this); + } void onAudioTrackPublishSuccess( agora_refptr audioTrack) override; void onLocalAudioTrackStatistics(const LocalAudioStats &stats) override; @@ -89,4 +93,5 @@ class SampleLocalUserObserver : public ILocalUserObserver { private: std::optional _destination; + ILocalUser *_localUser; }; \ No newline at end of file diff --git a/c_src/membrane_agora_plugin/sink.cpp b/c_src/membrane_agora_plugin/sink.cpp index 981bfcb..60ec695 100644 --- a/c_src/membrane_agora_plugin/sink.cpp +++ b/c_src/membrane_agora_plugin/sink.cpp @@ -45,9 +45,6 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, auto connObserver = std::make_shared(state->connection); state->connection->registerObserver(connObserver.get()); - auto localUserObserver = std::make_shared(destination); - // state->connection->getLocalUser()->registerLocalUserObserver(localUserObserver.get()); - int connection_res = state->connection->connect(token, channelId, userId); if (connection_res) { AG_LOG(ERROR, "Failed to connect to Agora channel!"); @@ -56,6 +53,7 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, } connObserver->waitUntilConnected(); + auto localUserObserver = std::make_shared(destination); // senders creation agora::agora_refptr factory = state->service->createMediaNodeFactory(); From 56b7657d0d6df7d569e7dfca3e44b1d8c01572ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Fri, 5 Jul 2024 17:15:13 +0200 Subject: [PATCH 23/34] Pass localUser in the constructor --- c_src/membrane_agora_plugin/sink.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c_src/membrane_agora_plugin/sink.cpp b/c_src/membrane_agora_plugin/sink.cpp index 60ec695..1bcaa23 100644 --- a/c_src/membrane_agora_plugin/sink.cpp +++ b/c_src/membrane_agora_plugin/sink.cpp @@ -53,7 +53,7 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, } connObserver->waitUntilConnected(); - auto localUserObserver = std::make_shared(destination); + auto localUserObserver = std::make_shared(state->connection->getLocalUser(), destination); // senders creation agora::agora_refptr factory = state->service->createMediaNodeFactory(); From 8e193c85541eb622abd576223809f77c7346f912 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Fri, 5 Jul 2024 17:40:29 +0200 Subject: [PATCH 24/34] Add localUserObserver to sink's state --- c_src/membrane_agora_plugin/sink.cpp | 3 +-- c_src/membrane_agora_plugin/sink.h | 2 ++ 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/c_src/membrane_agora_plugin/sink.cpp b/c_src/membrane_agora_plugin/sink.cpp index 1bcaa23..5292c95 100644 --- a/c_src/membrane_agora_plugin/sink.cpp +++ b/c_src/membrane_agora_plugin/sink.cpp @@ -53,7 +53,7 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, } connObserver->waitUntilConnected(); - auto localUserObserver = std::make_shared(state->connection->getLocalUser(), destination); + localUserObserver = std::make_shared(state->connection->getLocalUser(), destination); // senders creation agora::agora_refptr factory = state->service->createMediaNodeFactory(); @@ -104,7 +104,6 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, // cleaning up state->connection->unregisterObserver(connObserver.get()); - //localUserObserver.reset(); connObserver.reset(); UNIFEX_TERM res = create_result_ok(env, state); unifex_release_state(env, state); diff --git a/c_src/membrane_agora_plugin/sink.h b/c_src/membrane_agora_plugin/sink.h index 18b3f9a..44a0ccb 100644 --- a/c_src/membrane_agora_plugin/sink.h +++ b/c_src/membrane_agora_plugin/sink.h @@ -14,6 +14,7 @@ #include "NGIAgoraMediaNodeFactory.h" #include "NGIAgoraRtcConnection.h" #include "NGIAgoraVideoTrack.h" +#include "NGIAgoraLocalUser.h" #include "common.h" #include "connection_observer.h" @@ -28,6 +29,7 @@ typedef struct { audioEncodedFrameSender; agora::agora_refptr customVideoTrack; agora::agora_refptr customAudioTrack; + std::shared_ptr localUserObserver; // video track parameters unsigned int width; From b369a253a39ce6b084ddcad84e253881f37e3955 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Fri, 5 Jul 2024 17:44:45 +0200 Subject: [PATCH 25/34] Add localUserObserver to sink's state --- c_src/membrane_agora_plugin/sink.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c_src/membrane_agora_plugin/sink.cpp b/c_src/membrane_agora_plugin/sink.cpp index 5292c95..0f28398 100644 --- a/c_src/membrane_agora_plugin/sink.cpp +++ b/c_src/membrane_agora_plugin/sink.cpp @@ -53,7 +53,7 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, } connObserver->waitUntilConnected(); - localUserObserver = std::make_shared(state->connection->getLocalUser(), destination); + state->localUserObserver = std::make_shared(state->connection->getLocalUser(), destination); // senders creation agora::agora_refptr factory = state->service->createMediaNodeFactory(); From 2ef6aa57bba8b9662cb148f99dda196e3a23c85a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Mon, 8 Jul 2024 10:56:09 +0200 Subject: [PATCH 26/34] Don't use sample local user observer in source --- c_src/membrane_agora_plugin/sink.cpp | 1 + c_src/membrane_agora_plugin/source.cpp | 12 ++++++------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/c_src/membrane_agora_plugin/sink.cpp b/c_src/membrane_agora_plugin/sink.cpp index 0f28398..82402f3 100644 --- a/c_src/membrane_agora_plugin/sink.cpp +++ b/c_src/membrane_agora_plugin/sink.cpp @@ -172,6 +172,7 @@ UNIFEX_TERM write_audio_data(UnifexEnv *env, UnifexPayload *payload, void handle_destroy_state(UnifexEnv *env, SinkState *state) { UNUSED(env); if (state->connection) { + state->localUserObserver.reset(); if (state->customVideoTrack) { state->connection->getLocalUser()->unpublishVideo( state->customVideoTrack); diff --git a/c_src/membrane_agora_plugin/source.cpp b/c_src/membrane_agora_plugin/source.cpp index d49dbfc..0f36d98 100644 --- a/c_src/membrane_agora_plugin/source.cpp +++ b/c_src/membrane_agora_plugin/source.cpp @@ -52,14 +52,14 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, state->connection->getLocalUser()->subscribeAllVideo(options); state->connection->getLocalUser()->subscribeAllAudio(); - state->localUserObserver = std::make_shared(); + //state->localUserObserver = std::make_shared(); state->videoEncodedFrameObserver = std::make_shared(destination); state->audioFrameObserver = std::make_shared(destination); - state->connection->getLocalUser()->registerLocalUserObserver( - state->localUserObserver.get()); + // state->connection->getLocalUser()->registerLocalUserObserver( + // state->localUserObserver.get()); state->connection->getLocalUser()->registerVideoEncodedFrameObserver( state->videoEncodedFrameObserver.get()); @@ -82,14 +82,14 @@ void handle_destroy_state(UnifexEnv *env, SourceState *state) { state->connection->unregisterObserver(state->connObserver.get()); state->connObserver.reset(); - state->connection->getLocalUser()->unregisterLocalUserObserver( - state->localUserObserver.get()); + //state->connection->getLocalUser()->unregisterLocalUserObserver( + // state->localUserObserver.get()); state->connection->getLocalUser()->unregisterVideoEncodedFrameObserver( state->videoEncodedFrameObserver.get()); state->connection->getLocalUser()->unregisterAudioFrameObserver( state->audioFrameObserver.get()); - state->localUserObserver.reset(); + //state->localUserObserver.reset(); state->videoEncodedFrameObserver.reset(); state->audioFrameObserver.reset(); if (state->connection->disconnect()) { From 97d888da08df637d8ca58c603bb36c5dce464208 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Mon, 8 Jul 2024 11:11:13 +0200 Subject: [PATCH 27/34] Move sink setup to handle_playing --- lib/agora/agora_sink.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/agora/agora_sink.ex b/lib/agora/agora_sink.ex index 7961a20..6a5cab8 100644 --- a/lib/agora/agora_sink.ex +++ b/lib/agora/agora_sink.ex @@ -61,7 +61,7 @@ defmodule Membrane.Agora.Sink do end @impl true - def handle_setup(_ctx, state) do + def handle_playing(_ctx, state) do {:ok, native_state} = try do Native.create(state.app_id, state.token, state.channel_name, state.user_id, self()) From 5a44ae3eac9d3bd3925e836157c559982cadaaea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Mon, 8 Jul 2024 11:17:21 +0200 Subject: [PATCH 28/34] Send event on proper pad --- lib/agora/agora_sink.ex | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/lib/agora/agora_sink.ex b/lib/agora/agora_sink.ex index 6a5cab8..e1c00e2 100644 --- a/lib/agora/agora_sink.ex +++ b/lib/agora/agora_sink.ex @@ -127,9 +127,18 @@ defmodule Membrane.Agora.Sink do end @impl true - def handle_info(:keyframe_request, %{playback: :playing}, state) do + def handle_info(:keyframe_request, %{playback: :playing} = ctx, state) do + video_pad = + ctx.pads + |> Map.keys() + |> Enum.find(fn + Pad.ref(:video, _id) -> true + _other_pad -> false + end) + IO.inspect("Requesting for keyframe") - {[event: {:video, %Membrane.H264.FFmpeg.KeyframeRequestEvent{}}], state} + + {[event: {video_pad, %Membrane.H264.FFmpeg.KeyframeRequestEvent{}}], state} end @impl true From 6d1046407567bdebf94da9ac1e40bf45a3251e7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Mon, 8 Jul 2024 14:43:01 +0200 Subject: [PATCH 29/34] Update dependency to file plugin --- mix.exs | 4 +++- mix.lock | 7 ++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/mix.exs b/mix.exs index 5f457f1..23c2e90 100644 --- a/mix.exs +++ b/mix.exs @@ -43,7 +43,9 @@ defmodule Membrane.Agora.Mixfile do {:membrane_aac_format, "~> 0.8.0"}, {:membrane_raw_audio_format, "~> 0.12.0"}, {:unifex, "~> 1.1.0"}, - {:membrane_file_plugin, "~> 0.16.0", only: :test}, + # {:membrane_file_plugin, "~> 0.16.0", only: :test}, + {:membrane_file_plugin, + github: "membraneframework/membrane_file_plugin", branch: "handle_upstream_events"}, {:membrane_h26x_plugin, "~> 0.10.0", only: :test}, {:membrane_aac_plugin, "~> 0.18.1", only: :test}, {:membrane_realtimer_plugin, "~> 0.9.0", only: :test}, diff --git a/mix.lock b/mix.lock index a004d90..5e2d10b 100644 --- a/mix.lock +++ b/mix.lock @@ -16,14 +16,15 @@ "finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"}, "hpax": {:hex, :hpax, "0.2.0", "5a58219adcb75977b2edce5eb22051de9362f08236220c9e859a47111c194ff5", [:mix], [], "hexpm", "bea06558cdae85bed075e6c036993d43cd54d447f76d8190a8db0dc5893fa2f1"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, + "logger_backends": {:hex, :logger_backends, "1.0.0", "09c4fad6202e08cb0fbd37f328282f16539aca380f512523ce9472b28edc6bdf", [:mix], [], "hexpm", "1faceb3e7ec3ef66a8f5746c5afd020e63996df6fd4eb8cdb789e5665ae6c9ce"}, "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"}, "makeup_erlang": {:hex, :makeup_erlang, "1.0.0", "6f0eff9c9c489f26b69b61440bf1b238d95badae49adac77973cbacae87e3c2e", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "ea7a9307de9d1548d2a72d299058d1fd2339e3d398560a0e46c27dab4891e4d2"}, "membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"}, "membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.18.1", "30433bffd4d5d773f79448dd9afd55d77338721688f09a89b20d742a68cc2c3d", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "8fd048c47d5d2949eb557e19f43f62d534d3af5096187f1a1a3a1694d14b772c"}, "membrane_common_c": {:hex, :membrane_common_c, "0.16.0", "caf3f29d2f5a1d32d8c2c122866110775866db2726e4272be58e66dfdf4bce40", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "a3c7e91de1ce1f8b23b9823188a5d13654d317235ea0ca781c05353ed3be9b1c"}, - "membrane_core": {:hex, :membrane_core, "1.0.1", "08aa546c0d131c66f8b906b3dfb2b8f2749b56859f6fc52bd3ac846b944b3baa", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a35ed68561bdf0a2dbb2f994333be78cf4e1c4d734e4cd927d77d92049bb1273"}, - "membrane_file_plugin": {:hex, :membrane_file_plugin, "0.16.0", "7917f6682c22b9bcfc2ca20ed960eee0f7d03ad31fd5f59ed850f1fe3ddd545a", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "b0727998f75a9b4dab8a2baefdfc13c3eac00a04e061ab1b0e61dc5566927acc"}, + "membrane_core": {:hex, :membrane_core, "1.1.0", "c3bbaa5af7c26a7c3748e573efe343c2104801e3463b9e491a607e82860334a4", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "b3209d7f7e86d736cb7caffbba16b075c571cebb9439ab939ed6119c50fb59a5"}, + "membrane_file_plugin": {:git, "https://github.com/membraneframework/membrane_file_plugin.git", "54c6ad9de34faeb9e966cc201c0fa26599952c5e", [branch: "handle_upstream_events"]}, "membrane_h264_ffmpeg_plugin": {:git, "https://github.com/membraneframework/membrane_h264_ffmpeg_plugin.git", "c6f8674babf92d7d2b9f78056114a79a6918fd6c", [branch: "force_keyframes"]}, "membrane_h264_format": {:hex, :membrane_h264_format, "0.6.1", "44836cd9de0abe989b146df1e114507787efc0cf0da2368f17a10c47b4e0738c", [:mix], [], "hexpm", "4b79be56465a876d2eac2c3af99e115374bbdc03eb1dea4f696ee9a8033cd4b0"}, "membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"}, @@ -40,7 +41,7 @@ "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"}, "qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"}, - "ratio": {:hex, :ratio, "3.0.2", "60a5976872a4dc3d873ecc57eed1738589e99d1094834b9c935b118231297cfb", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "3a13ed5a30ad0bfd7e4a86bf86d93d2b5a06f5904417d38d3f3ea6406cdfc7bb"}, + "ratio": {:hex, :ratio, "4.0.1", "3044166f2fc6890aa53d3aef0c336f84b2bebb889dc57d5f95cc540daa1912f8", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "c60cbb3ccdff9ffa56e7d6d1654b5c70d9f90f4d753ab3a43a6bf40855b881ce"}, "req": {:hex, :req, "0.4.14", "103de133a076a31044e5458e0f850d5681eef23dfabf3ea34af63212e3b902e2", [:mix], [{:aws_signature, "~> 0.3.2", [hex: :aws_signature, repo: "hexpm", optional: true]}, {:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:nimble_ownership, "~> 0.2.0 or ~> 0.3.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "2ddd3d33f9ab714ced8d3c15fd03db40c14dbf129003c4a3eb80fac2cc0b1b08"}, "shmex": {:hex, :shmex, "0.5.1", "81dd209093416bf6608e66882cb7e676089307448a1afd4fc906c1f7e5b94cf4", [:mix], [{:bunch_native, "~> 0.5.0", [hex: :bunch_native, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "c29f8286891252f64c4e1dac40b217d960f7d58def597c4e606ff8fbe71ceb80"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, From d6066e05b86715c02ad06ee45665ae3bc70bc6d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Mon, 8 Jul 2024 15:23:08 +0200 Subject: [PATCH 30/34] Store connObserver in state --- c_src/membrane_agora_plugin/sink.cpp | 13 +++++++------ c_src/membrane_agora_plugin/sink.h | 1 + 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/c_src/membrane_agora_plugin/sink.cpp b/c_src/membrane_agora_plugin/sink.cpp index 82402f3..af51ac4 100644 --- a/c_src/membrane_agora_plugin/sink.cpp +++ b/c_src/membrane_agora_plugin/sink.cpp @@ -42,8 +42,8 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, s->setBool("che.video.has_intra_request", false); // connecting - auto connObserver = std::make_shared(state->connection); - state->connection->registerObserver(connObserver.get()); + state->connObserver = std::make_shared(state->connection); + state->connection->registerObserver(state->connObserver.get()); int connection_res = state->connection->connect(token, channelId, userId); if (connection_res) { @@ -51,7 +51,6 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, unifex_release_state(env, state); return create_result_error(env, "Failed to connect to Agora channel!"); } - connObserver->waitUntilConnected(); state->localUserObserver = std::make_shared(state->connection->getLocalUser(), destination); // senders creation @@ -102,9 +101,8 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, state->connection->getLocalUser()->publishVideo(state->customVideoTrack); state->connection->getLocalUser()->publishAudio(state->customAudioTrack); - // cleaning up - state->connection->unregisterObserver(connObserver.get()); - connObserver.reset(); + state->connObserver->waitUntilConnected(); + UNIFEX_TERM res = create_result_ok(env, state); unifex_release_state(env, state); @@ -172,6 +170,9 @@ UNIFEX_TERM write_audio_data(UnifexEnv *env, UnifexPayload *payload, void handle_destroy_state(UnifexEnv *env, SinkState *state) { UNUSED(env); if (state->connection) { + // cleaning up + state->connection->unregisterObserver(state->connObserver.get()); + state->connObserver.reset(); state->localUserObserver.reset(); if (state->customVideoTrack) { state->connection->getLocalUser()->unpublishVideo( diff --git a/c_src/membrane_agora_plugin/sink.h b/c_src/membrane_agora_plugin/sink.h index 44a0ccb..45dda6b 100644 --- a/c_src/membrane_agora_plugin/sink.h +++ b/c_src/membrane_agora_plugin/sink.h @@ -29,6 +29,7 @@ typedef struct { audioEncodedFrameSender; agora::agora_refptr customVideoTrack; agora::agora_refptr customAudioTrack; + std::shared_ptr connObserver; std::shared_ptr localUserObserver; // video track parameters From ba4bcf0728914132fd1cf8d9191b7a13f25a8cc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Mon, 8 Jul 2024 15:25:36 +0200 Subject: [PATCH 31/34] Change connObserver type to ConnectionObserver --- c_src/membrane_agora_plugin/sink.cpp | 1 - c_src/membrane_agora_plugin/sink.h | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/c_src/membrane_agora_plugin/sink.cpp b/c_src/membrane_agora_plugin/sink.cpp index af51ac4..d5d80f6 100644 --- a/c_src/membrane_agora_plugin/sink.cpp +++ b/c_src/membrane_agora_plugin/sink.cpp @@ -100,7 +100,6 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, state->customAudioTrack->setEnabled(true); state->connection->getLocalUser()->publishVideo(state->customVideoTrack); state->connection->getLocalUser()->publishAudio(state->customAudioTrack); - state->connObserver->waitUntilConnected(); UNIFEX_TERM res = create_result_ok(env, state); diff --git a/c_src/membrane_agora_plugin/sink.h b/c_src/membrane_agora_plugin/sink.h index 45dda6b..78bbdd3 100644 --- a/c_src/membrane_agora_plugin/sink.h +++ b/c_src/membrane_agora_plugin/sink.h @@ -29,7 +29,7 @@ typedef struct { audioEncodedFrameSender; agora::agora_refptr customVideoTrack; agora::agora_refptr customAudioTrack; - std::shared_ptr connObserver; + std::shared_ptr connObserver; std::shared_ptr localUserObserver; // video track parameters From dd642c85d64a1b98899b5ae71ea422f1dde3b473 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Tue, 9 Jul 2024 15:56:53 +0200 Subject: [PATCH 32/34] Remove unused code. Update deps --- c_src/membrane_agora_plugin/source.cpp | 8 -------- c_src/membrane_agora_plugin/source.h | 1 - lib/agora/agora_sink.ex | 5 +---- mix.exs | 5 ++--- mix.lock | 2 +- test/support/receiver_pipeline.ex | 5 ++--- 6 files changed, 6 insertions(+), 20 deletions(-) diff --git a/c_src/membrane_agora_plugin/source.cpp b/c_src/membrane_agora_plugin/source.cpp index 0f36d98..03b281b 100644 --- a/c_src/membrane_agora_plugin/source.cpp +++ b/c_src/membrane_agora_plugin/source.cpp @@ -52,15 +52,11 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId, state->connection->getLocalUser()->subscribeAllVideo(options); state->connection->getLocalUser()->subscribeAllAudio(); - //state->localUserObserver = std::make_shared(); state->videoEncodedFrameObserver = std::make_shared(destination); state->audioFrameObserver = std::make_shared(destination); - // state->connection->getLocalUser()->registerLocalUserObserver( - // state->localUserObserver.get()); - state->connection->getLocalUser()->registerVideoEncodedFrameObserver( state->videoEncodedFrameObserver.get()); @@ -81,15 +77,11 @@ void handle_destroy_state(UnifexEnv *env, SourceState *state) { if (state->connection) { state->connection->unregisterObserver(state->connObserver.get()); state->connObserver.reset(); - - //state->connection->getLocalUser()->unregisterLocalUserObserver( - // state->localUserObserver.get()); state->connection->getLocalUser()->unregisterVideoEncodedFrameObserver( state->videoEncodedFrameObserver.get()); state->connection->getLocalUser()->unregisterAudioFrameObserver( state->audioFrameObserver.get()); - //state->localUserObserver.reset(); state->videoEncodedFrameObserver.reset(); state->audioFrameObserver.reset(); if (state->connection->disconnect()) { diff --git a/c_src/membrane_agora_plugin/source.h b/c_src/membrane_agora_plugin/source.h index cbd1d8e..e558ad3 100644 --- a/c_src/membrane_agora_plugin/source.h +++ b/c_src/membrane_agora_plugin/source.h @@ -18,7 +18,6 @@ typedef struct { agora::base::IAgoraService *service = NULL; agora::agora_refptr connection; std::shared_ptr connObserver; - std::shared_ptr localUserObserver; std::shared_ptr videoEncodedFrameObserver; std::shared_ptr audioFrameObserver; diff --git a/lib/agora/agora_sink.ex b/lib/agora/agora_sink.ex index e1c00e2..ea6532e 100644 --- a/lib/agora/agora_sink.ex +++ b/lib/agora/agora_sink.ex @@ -136,14 +136,11 @@ defmodule Membrane.Agora.Sink do _other_pad -> false end) - IO.inspect("Requesting for keyframe") - - {[event: {video_pad, %Membrane.H264.FFmpeg.KeyframeRequestEvent{}}], state} + {[event: {video_pad, %Membrane.KeyframeRequestEvent{}}], state} end @impl true def handle_info(msg, _ctx, state) do - IO.inspect(msg, label: :unhandled_msg) {[], state} end end diff --git a/mix.exs b/mix.exs index 23c2e90..bc8e6e8 100644 --- a/mix.exs +++ b/mix.exs @@ -43,12 +43,11 @@ defmodule Membrane.Agora.Mixfile do {:membrane_aac_format, "~> 0.8.0"}, {:membrane_raw_audio_format, "~> 0.12.0"}, {:unifex, "~> 1.1.0"}, - # {:membrane_file_plugin, "~> 0.16.0", only: :test}, - {:membrane_file_plugin, - github: "membraneframework/membrane_file_plugin", branch: "handle_upstream_events"}, + {:membrane_file_plugin, "~> 0.17.2", only: :test}, {:membrane_h26x_plugin, "~> 0.10.0", only: :test}, {:membrane_aac_plugin, "~> 0.18.1", only: :test}, {:membrane_realtimer_plugin, "~> 0.9.0", only: :test}, + # {:membrane_h264_ffmpeg_plugin, "~> 0.17.2"}, {:membrane_h264_ffmpeg_plugin, github: "membraneframework/membrane_h264_ffmpeg_plugin", branch: "force_keyframes"}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, diff --git a/mix.lock b/mix.lock index 5e2d10b..d80da2f 100644 --- a/mix.lock +++ b/mix.lock @@ -24,7 +24,7 @@ "membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.18.1", "30433bffd4d5d773f79448dd9afd55d77338721688f09a89b20d742a68cc2c3d", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "8fd048c47d5d2949eb557e19f43f62d534d3af5096187f1a1a3a1694d14b772c"}, "membrane_common_c": {:hex, :membrane_common_c, "0.16.0", "caf3f29d2f5a1d32d8c2c122866110775866db2726e4272be58e66dfdf4bce40", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "a3c7e91de1ce1f8b23b9823188a5d13654d317235ea0ca781c05353ed3be9b1c"}, "membrane_core": {:hex, :membrane_core, "1.1.0", "c3bbaa5af7c26a7c3748e573efe343c2104801e3463b9e491a607e82860334a4", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "b3209d7f7e86d736cb7caffbba16b075c571cebb9439ab939ed6119c50fb59a5"}, - "membrane_file_plugin": {:git, "https://github.com/membraneframework/membrane_file_plugin.git", "54c6ad9de34faeb9e966cc201c0fa26599952c5e", [branch: "handle_upstream_events"]}, + "membrane_file_plugin": {:hex, :membrane_file_plugin, "0.17.2", "650e134c2345d946f930082fac8bac9f5aba785a7817d38a9a9da41ffc56fa92", [:mix], [{:logger_backends, "~> 1.0", [hex: :logger_backends, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "df50c6040004cd7b901cf057bd7e99c875bbbd6ae574efc93b2c753c96f43b9d"}, "membrane_h264_ffmpeg_plugin": {:git, "https://github.com/membraneframework/membrane_h264_ffmpeg_plugin.git", "c6f8674babf92d7d2b9f78056114a79a6918fd6c", [branch: "force_keyframes"]}, "membrane_h264_format": {:hex, :membrane_h264_format, "0.6.1", "44836cd9de0abe989b146df1e114507787efc0cf0da2368f17a10c47b4e0738c", [:mix], [], "hexpm", "4b79be56465a876d2eac2c3af99e115374bbdc03eb1dea4f696ee9a8033cd4b0"}, "membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"}, diff --git a/test/support/receiver_pipeline.ex b/test/support/receiver_pipeline.ex index 0711c7e..88e707a 100644 --- a/test/support/receiver_pipeline.ex +++ b/test/support/receiver_pipeline.ex @@ -49,7 +49,6 @@ defmodule Membrane.Agora.Support.ReceiverPipeline do end end - @channel_name System.get_env("AGORA_CHANNEL_NAME", "") @app_id System.get_env("AGORA_APP_ID", "") @certificate System.get_env("AGORA_CERTIFICATE", "") @@ -59,8 +58,8 @@ defmodule Membrane.Agora.Support.ReceiverPipeline do spec = [ child(:source, %Membrane.Agora.Source{ - channel_name: @channel_name, - token: TokenGenerator.get_token(@certificate, @app_id, @channel_name, user_id), + channel_name: opts[:channel_name], + token: TokenGenerator.get_token(@certificate, @app_id, opts[:channel_name], user_id), app_id: @app_id, user_id: user_id }) From d7baa4a09879aefff45ec8b89d9421e5b33df7c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Tue, 9 Jul 2024 15:59:39 +0200 Subject: [PATCH 33/34] Remove unused deps --- mix.exs | 3 --- mix.lock | 4 ---- 2 files changed, 7 deletions(-) diff --git a/mix.exs b/mix.exs index bc8e6e8..4a9aa8b 100644 --- a/mix.exs +++ b/mix.exs @@ -47,9 +47,6 @@ defmodule Membrane.Agora.Mixfile do {:membrane_h26x_plugin, "~> 0.10.0", only: :test}, {:membrane_aac_plugin, "~> 0.18.1", only: :test}, {:membrane_realtimer_plugin, "~> 0.9.0", only: :test}, - # {:membrane_h264_ffmpeg_plugin, "~> 0.17.2"}, - {:membrane_h264_ffmpeg_plugin, - github: "membraneframework/membrane_h264_ffmpeg_plugin", branch: "force_keyframes"}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:dialyxir, ">= 0.0.0", only: :dev, runtime: false}, {:credo, ">= 0.0.0", only: :dev, runtime: false} diff --git a/mix.lock b/mix.lock index d80da2f..188f3df 100644 --- a/mix.lock +++ b/mix.lock @@ -22,16 +22,12 @@ "makeup_erlang": {:hex, :makeup_erlang, "1.0.0", "6f0eff9c9c489f26b69b61440bf1b238d95badae49adac77973cbacae87e3c2e", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "ea7a9307de9d1548d2a72d299058d1fd2339e3d398560a0e46c27dab4891e4d2"}, "membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"}, "membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.18.1", "30433bffd4d5d773f79448dd9afd55d77338721688f09a89b20d742a68cc2c3d", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "8fd048c47d5d2949eb557e19f43f62d534d3af5096187f1a1a3a1694d14b772c"}, - "membrane_common_c": {:hex, :membrane_common_c, "0.16.0", "caf3f29d2f5a1d32d8c2c122866110775866db2726e4272be58e66dfdf4bce40", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "a3c7e91de1ce1f8b23b9823188a5d13654d317235ea0ca781c05353ed3be9b1c"}, "membrane_core": {:hex, :membrane_core, "1.1.0", "c3bbaa5af7c26a7c3748e573efe343c2104801e3463b9e491a607e82860334a4", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "b3209d7f7e86d736cb7caffbba16b075c571cebb9439ab939ed6119c50fb59a5"}, "membrane_file_plugin": {:hex, :membrane_file_plugin, "0.17.2", "650e134c2345d946f930082fac8bac9f5aba785a7817d38a9a9da41ffc56fa92", [:mix], [{:logger_backends, "~> 1.0", [hex: :logger_backends, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "df50c6040004cd7b901cf057bd7e99c875bbbd6ae574efc93b2c753c96f43b9d"}, - "membrane_h264_ffmpeg_plugin": {:git, "https://github.com/membraneframework/membrane_h264_ffmpeg_plugin.git", "c6f8674babf92d7d2b9f78056114a79a6918fd6c", [branch: "force_keyframes"]}, "membrane_h264_format": {:hex, :membrane_h264_format, "0.6.1", "44836cd9de0abe989b146df1e114507787efc0cf0da2368f17a10c47b4e0738c", [:mix], [], "hexpm", "4b79be56465a876d2eac2c3af99e115374bbdc03eb1dea4f696ee9a8033cd4b0"}, "membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"}, "membrane_h26x_plugin": {:hex, :membrane_h26x_plugin, "0.10.2", "caf2790d8c107df35f8d456b45f4e09fb9c56ce6c7669a3a03f7d59972e6ed82", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}], "hexpm", "becf1ac4a589adecd850137ccd61a33058f686083a514a7e39fcd721bcf9fb2e"}, - "membrane_precompiled_dependency_provider": {:hex, :membrane_precompiled_dependency_provider, "0.1.2", "8af73b7dc15ba55c9f5fbfc0453d4a8edfb007ade54b56c37d626be0d1189aba", [:mix], [{:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "7fe3e07361510445a29bee95336adde667c4162b76b7f4c8af3aeb3415292023"}, "membrane_raw_audio_format": {:hex, :membrane_raw_audio_format, "0.12.0", "b574cd90f69ce2a8b6201b0ccf0826ca28b0fbc8245b8078d9f11cef65f7d5d5", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "6e6c98e3622a2b9df19eab50ba65d7eb45949b1ba306fa8423df6cdb12fd0b44"}, - "membrane_raw_video_format": {:hex, :membrane_raw_video_format, "0.3.0", "ba10f475e0814a6fe79602a74536b796047577c7ef5b0e33def27cd344229699", [:mix], [], "hexpm", "2f08760061c8a5386ecf04273480f10e48d25a1a40aa99476302b0bcd34ccb1c"}, "membrane_realtimer_plugin": {:hex, :membrane_realtimer_plugin, "0.9.0", "27210d5e32a5e8bfd101c41e4d8c1876e873a52cc129ebfbee4d0ccbea1cbd21", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "b2e96d62135ee57ef9a5fdea94b3a9ab1198e5ea8ee248391b89c671125d1b51"}, "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, "mint": {:hex, :mint, "1.6.0", "88a4f91cd690508a04ff1c3e28952f322528934be541844d54e0ceb765f01d5e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "3c5ae85d90a5aca0a49c0d8b67360bbe407f3b54f1030a111047ff988e8fefaa"}, From 43596899d1130d0eb99353a7eb4cac42c223fd12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Kita?= Date: Tue, 9 Jul 2024 16:10:03 +0200 Subject: [PATCH 34/34] Use INFO log level to inform about keyframe request --- c_src/membrane_agora_plugin/sample_local_user_observer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/c_src/membrane_agora_plugin/sample_local_user_observer.cpp b/c_src/membrane_agora_plugin/sample_local_user_observer.cpp index b565720..7596e4d 100644 --- a/c_src/membrane_agora_plugin/sample_local_user_observer.cpp +++ b/c_src/membrane_agora_plugin/sample_local_user_observer.cpp @@ -74,7 +74,7 @@ void SampleLocalUserObserver::onVideoTrackPublishStart(agora_refptr audioTrack) {} void SampleLocalUserObserver::onAudioTrackPublishStart(agora_refptr audioTrack) {} void SampleLocalUserObserver::onIntraRequestReceived() { - AG_LOG(ERROR, "Keyframe request received"); + AG_LOG(INFO, "Keyframe request received"); if (_destination.has_value()) { UnifexEnv *env = unifex_alloc_env(NULL);