From fe8aa35ec6fc930e73c8da86a2a42ffeb488dd2e Mon Sep 17 00:00:00 2001 From: amir Date: Mon, 27 Jan 2020 16:38:07 +0200 Subject: [PATCH] add current_song_controller, to do reporting logic once, and send final string to servicecs --- CMakeLists.txt | 1 + src/current_song_controller.cc | 70 ++++++++++++++++++++++++++++++++++ src/current_song_controller.h | 47 +++++++++++++++++++++++ src/mqtt_api.cc | 37 +++--------------- src/mqtt_api.h | 12 ++---- src/wavplayeralsa.cpp | 10 +++-- src/web_sockets_api.cc | 60 +++++------------------------ src/web_sockets_api.h | 23 +++-------- 8 files changed, 148 insertions(+), 112 deletions(-) create mode 100644 src/current_song_controller.cc create mode 100644 src/current_song_controller.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 8222592..e354655 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,6 +15,7 @@ set (SOURCES src/http_api.cc src/mqtt_api.cc src/audio_files_manager.cc + src/current_song_controller.cc ) set(CMAKE_BUILD_TYPE Debug) diff --git a/src/current_song_controller.cc b/src/current_song_controller.cc new file mode 100644 index 0000000..c48749f --- /dev/null +++ b/src/current_song_controller.cc @@ -0,0 +1,70 @@ +#include + +#include +#include +#include "nlohmann/json.hpp" + +using json = nlohmann::json; + +namespace wavplayeralsa +{ + + CurrentSongController::CurrentSongController(boost::asio::io_service &io_service, MqttApi *mqtt_service, WebSocketsApi *ws_service) + : throttle_timer_(io_service) + { + mqtt_service_ = mqtt_service; + ws_service_ = ws_service; + + json j; + j["song_is_playing"] = false; + UpdateLastStatusMsg(j); + } + + void CurrentSongController::NewSongStatus(const std::string &file_id, uint64_t start_time_millis_since_epoch, double speed) + { + json j; + j["song_is_playing"] = true; + j["file_id"] = file_id; + j["start_time_millis_since_epoch"] = start_time_millis_since_epoch; + j["speed"] = speed; + UpdateLastStatusMsg(j); + } + + void CurrentSongController::NoSongPlayingStatus(const std::string &file_id) + { + json j; + j["song_is_playing"] = false; + j["stopped_file_id"] = file_id; + UpdateLastStatusMsg(j); + } + + void CurrentSongController::UpdateLastStatusMsg(const json &msgJson) + { + const std::string msg_json_str = msgJson.dump(); + + if(msg_json_str == last_status_msg_) { + return; + } + + last_status_msg_ = msg_json_str; + + if(!throttle_timer_set_) { + throttle_timer_.expires_from_now(boost::posix_time::milliseconds(THROTTLE_WAIT_TIME_MS)); + throttle_timer_.async_wait(boost::bind(&CurrentSongController::ReportCurrentSongToServices, this, _1)); + throttle_timer_set_ = true; + } + } + + void CurrentSongController::ReportCurrentSongToServices(const boost::system::error_code& error) + { + throttle_timer_set_ = false; + if(error) + return; + + mqtt_service_->ReportCurrentSong(last_status_msg_); + ws_service_->ReportCurrentSong(last_status_msg_); + } + +} + + diff --git a/src/current_song_controller.h b/src/current_song_controller.h new file mode 100644 index 0000000..fc1049f --- /dev/null +++ b/src/current_song_controller.h @@ -0,0 +1,47 @@ +#ifndef CURRENT_SONG_CONTROLLER_H_ +#define CURRENT_SONG_CONTROLLER_H_ + +#include +#include +#include "nlohmann/json_fwd.hpp" + +#include + +#include +#include + +using json = nlohmann::json; + +namespace wavplayeralsa { + + class CurrentSongController : public PlayerEventsIfc + { + + public: + CurrentSongController(boost::asio::io_service &io_service, MqttApi *mqtt_service, WebSocketsApi *ws_service); + + public: + void NewSongStatus(const std::string &file_id, uint64_t start_time_millis_since_epoch, double speed); + void NoSongPlayingStatus(const std::string &file_id); + + private: + void UpdateLastStatusMsg(const json &msgJson); + void ReportCurrentSongToServices(const boost::system::error_code& error); + + private: + MqttApi *mqtt_service_; + WebSocketsApi *ws_service_; + + private: + std::string last_status_msg_; + + private: + // throttling issues: + const int THROTTLE_WAIT_TIME_MS = 50; + boost::asio::deadline_timer throttle_timer_; + bool throttle_timer_set_ = false; + + }; +} + +#endif // CURRENT_SONG_CONTROLLER_H_ diff --git a/src/mqtt_api.cc b/src/mqtt_api.cc index 4396ade..c68c943 100644 --- a/src/mqtt_api.cc +++ b/src/mqtt_api.cc @@ -2,9 +2,6 @@ #include #include -#include "nlohmann/json.hpp" - -using json = nlohmann::json; namespace wavplayeralsa { @@ -21,10 +18,6 @@ namespace wavplayeralsa { player_action_callback_ = player_action_callback; logger_ = logger; - json j; - j["song_is_playing"] = false; - last_status_msg_ = j.dump(); - const char *mqtt_client_id = "wavplayeralsa"; logger_->info("creating mqtt connection to host {} on port {} with client id {}", mqtt_host, mqtt_port, mqtt_client_id); logger_->info("will publish current song updates on topic {}", CURRENT_SONG_TOPIC); @@ -65,34 +58,14 @@ namespace wavplayeralsa { } - void MqttApi::NewSongStatus(const std::string &file_id, uint64_t start_time_millis_since_epoch, double speed) + void MqttApi::ReportCurrentSong(const std::string &json_str) { - json j; - j["song_is_playing"] = true; - j["file_id"] = file_id; - j["start_time_millis_since_epoch"] = start_time_millis_since_epoch; - j["speed"] = speed; - UpdateLastStatusMsg(j); - } + last_status_msg_ = json_str; - void MqttApi::NoSongPlayingStatus(const std::string &file_id) - { - json j; - j["song_is_playing"] = false; - j["stopped_file_id"] = file_id; - UpdateLastStatusMsg(j); - } - - void MqttApi::UpdateLastStatusMsg(const json &msgJson) - { - const std::string msg_json_str = msgJson.dump(); - - if(msg_json_str == last_status_msg_) { - return; + if(this->mqtt_client_) + { + this->mqtt_client_->publish_exactly_once(CURRENT_SONG_TOPIC, last_status_msg_, true); } - - last_status_msg_ = msg_json_str; - this->mqtt_client_->publish_exactly_once(CURRENT_SONG_TOPIC, last_status_msg_, true); } } \ No newline at end of file diff --git a/src/mqtt_api.h b/src/mqtt_api.h index 9e0789e..7281cc5 100644 --- a/src/mqtt_api.h +++ b/src/mqtt_api.h @@ -10,10 +10,8 @@ #include "spdlog/spdlog.h" #define MQTT_NO_TLS #include "mqtt_cpp/mqtt_client_cpp.hpp" -#include "nlohmann/json_fwd.hpp" #include "player_actions_ifc.h" -#include "player_events_ifc.h" #ifndef CURRENT_SONG_TOPIC #define CURRENT_SONG_TOPIC "current-song" @@ -21,18 +19,14 @@ namespace wavplayeralsa { - class MqttApi : public PlayerEventsIfc { + class MqttApi { public: MqttApi(boost::asio::io_service &io_service); void Initialize(std::shared_ptr logger, PlayerActionsIfc *player_action_callback, const std::string &mqtt_host, uint16_t mqtt_port); public: - void NewSongStatus(const std::string &file_id, uint64_t start_time_millis_since_epoch, double speed); - void NoSongPlayingStatus(const std::string &file_id); - - private: - void UpdateLastStatusMsg(const nlohmann::json &msgJson); + void ReportCurrentSong(const std::string &json_str); private: const int reconnect_wait_ms = 2000; @@ -44,7 +38,7 @@ namespace wavplayeralsa { boost::asio::io_service &io_service_; private: - std::shared_ptr>> mqtt_client_; + std::shared_ptr>> mqtt_client_ = nullptr; boost::asio::deadline_timer reconnect_timer_; std::string last_status_msg_; diff --git a/src/wavplayeralsa.cpp b/src/wavplayeralsa.cpp index 41e1181..291300c 100644 --- a/src/wavplayeralsa.cpp +++ b/src/wavplayeralsa.cpp @@ -18,6 +18,7 @@ #include "mqtt_api.h" #include "alsa_frames_transfer.h" #include "audio_files_manager.h" +#include "current_song_controller.h" /* @@ -28,8 +29,10 @@ class WavPlayerAlsa { public: WavPlayerAlsa() : + web_sockets_api_(), io_service_work_(io_service_), - mqtt_api_(io_service_) + mqtt_api_(io_service_), + current_song_controller_(io_service_, &mqtt_api_, &web_sockets_api_) { } @@ -135,10 +138,9 @@ class WavPlayerAlsa { if(UseMqtt()) { mqtt_api_.Initialize(mqtt_api_logger_, &audio_files_manager, mqtt_host_, mqtt_port_); - audio_files_manager.RegisterPlayerEventsHandler(&mqtt_api_); } - audio_files_manager.RegisterPlayerEventsHandler(&web_sockets_api_); + audio_files_manager.RegisterPlayerEventsHandler(¤t_song_controller_); } catch(const std::exception &e) { root_logger_->critical("failed initialization, unable to start player. {}", e.what()); @@ -243,6 +245,8 @@ class WavPlayerAlsa { wavplayeralsa::MqttApi mqtt_api_; wavplayeralsa::AudioFilesManager audio_files_manager; + wavplayeralsa::CurrentSongController current_song_controller_; + }; diff --git a/src/web_sockets_api.cc b/src/web_sockets_api.cc index c712a16..2a5dd48 100644 --- a/src/web_sockets_api.cc +++ b/src/web_sockets_api.cc @@ -3,28 +3,18 @@ #include #include -#include "nlohmann/json.hpp" - - using websocketpp::connection_hdl; -using json = nlohmann::json; - - namespace wavplayeralsa { - WebSocketsApi::~WebSocketsApi() { - if(msg_throttle_timer_ != nullptr) { - delete msg_throttle_timer_; - msg_throttle_timer_ = nullptr; - } + WebSocketsApi::WebSocketsApi() + { + } void WebSocketsApi::Initialize(std::shared_ptr logger, boost::asio::io_service *io_service, uint16_t ws_listen_port) { logger_ = logger; - io_service_ = io_service; - msg_throttle_timer_ = new boost::asio::deadline_timer(*io_service); server_.clear_error_channels(websocketpp::log::alevel::all); server_.clear_access_channels(websocketpp::log::alevel::all); @@ -42,55 +32,25 @@ namespace wavplayeralsa { server_.start_accept(); logger_->info("web sockets server started on port {}", ws_listen_port); - } - - void WebSocketsApi::NewSongStatus(const std::string &file_id, uint64_t start_time_millis_since_epoch, double speed) { - json j; - j["song_is_playing"] = true; - j["file_id"] = file_id; - j["start_time_millis_since_epoch"] = start_time_millis_since_epoch; - j["speed"] = speed; - UpdateLastStatusMsg(j); - } - void WebSocketsApi::NoSongPlayingStatus(const std::string &file_id) { - json j; - j["song_is_playing"] = false; - j["stopped_file_id"] = file_id; - UpdateLastStatusMsg(j); + initialized = true; } - void WebSocketsApi::UpdateLastStatusMsg(const json &msgJson) { - std::string msg_json_str = msgJson.dump(); + void WebSocketsApi::ReportCurrentSong(const std::string &json_str) + { + last_status_msg_ = json_str; - // Test for msg duplication. - // this is just optimization, and may not cover all cases (json keys are not ordered), - // but its cheap and easy to test. - if(msg_json_str == last_status_msg_) { + if(!initialized) return; - } - - last_status_msg_ = msg_json_str; - if(!has_active_timer_) { - msg_throttle_timer_->expires_from_now(boost::posix_time::milliseconds(THROTTLE_WAIT_TIME_MS)); - msg_throttle_timer_->async_wait(boost::bind(&WebSocketsApi::SendToAllConnectedClients, this, _1)); - has_active_timer_ = true; - } - } - - void WebSocketsApi::SendToAllConnectedClients(const boost::system::error_code &e) { - if(e) { - return; - } logger_->info("new status message: {}. will send to all {} connected clients", last_status_msg_, connections_.size()); BOOST_FOREACH(const connection_hdl &hdl, connections_) { server_.send(hdl, last_status_msg_, websocketpp::frame::opcode::text); } - has_active_timer_ = false; } - void WebSocketsApi::OnOpen(connection_hdl hdl) { + void WebSocketsApi::OnOpen(connection_hdl hdl) + { connections_.insert(hdl); const auto con = server_.get_con_from_hdl(hdl); diff --git a/src/web_sockets_api.h b/src/web_sockets_api.h index 8bde84d..03b7288 100644 --- a/src/web_sockets_api.h +++ b/src/web_sockets_api.h @@ -8,29 +8,21 @@ #include "websocketpp/config/asio_no_tls.hpp" #include "websocketpp/server.hpp" #include "spdlog/spdlog.h" -#include "nlohmann/json_fwd.hpp" #include "player_events_ifc.h" namespace wavplayeralsa { - class WebSocketsApi : public PlayerEventsIfc { + class WebSocketsApi { public: - ~WebSocketsApi(); + WebSocketsApi(); public: void Initialize(std::shared_ptr logger, boost::asio::io_service *io_service, uint16_t ws_listen_port); public: - // PlayerEventsIfc - void NewSongStatus(const std::string &file_id, uint64_t start_time_millis_since_epoch, double speed); - void NoSongPlayingStatus(const std::string &file_id); - - private: - // handler functions - void UpdateLastStatusMsg(const nlohmann::json &msgJson); - void SendToAllConnectedClients(const boost::system::error_code &e); + void ReportCurrentSong(const std::string &json_str); private: // web sockets callbacks @@ -43,16 +35,11 @@ namespace wavplayeralsa { std::shared_ptr logger_; boost::asio::io_service *io_service_; - // throttle - in case we have burst of messages, we don't want to flood the clients. - // we will wait few ms after the first msg, and before sending it to clients. - // if more messages are recived in that time, we will only send the last on once. - static const int THROTTLE_WAIT_TIME_MS = 50; - boost::asio::deadline_timer *msg_throttle_timer_ = nullptr; - bool has_active_timer_ = false; - typedef std::set> ConList; ConList connections_; + bool initialized = false; + std::string last_status_msg_; };