Skip to content

Commit

Permalink
add current_song_controller, to do reporting logic once, and send fin…
Browse files Browse the repository at this point in the history
…al string to servicecs
  • Loading branch information
blumamir committed Jan 27, 2020
1 parent 4e707b7 commit fe8aa35
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 112 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ set (SOURCES
src/http_api.cc
src/mqtt_api.cc
src/audio_files_manager.cc
src/current_song_controller.cc
)

set(CMAKE_BUILD_TYPE Debug)
Expand Down
70 changes: 70 additions & 0 deletions src/current_song_controller.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#include <current_song_controller.h>

#include <iostream>
#include <boost/bind.hpp>
#include "nlohmann/json.hpp"

using json = nlohmann::json;

namespace wavplayeralsa
{

CurrentSongController::CurrentSongController(boost::asio::io_service &io_service, MqttApi *mqtt_service, WebSocketsApi *ws_service)
: throttle_timer_(io_service)
{
mqtt_service_ = mqtt_service;
ws_service_ = ws_service;

json j;
j["song_is_playing"] = false;
UpdateLastStatusMsg(j);
}

void CurrentSongController::NewSongStatus(const std::string &file_id, uint64_t start_time_millis_since_epoch, double speed)
{
json j;
j["song_is_playing"] = true;
j["file_id"] = file_id;
j["start_time_millis_since_epoch"] = start_time_millis_since_epoch;
j["speed"] = speed;
UpdateLastStatusMsg(j);
}

void CurrentSongController::NoSongPlayingStatus(const std::string &file_id)
{
json j;
j["song_is_playing"] = false;
j["stopped_file_id"] = file_id;
UpdateLastStatusMsg(j);
}

void CurrentSongController::UpdateLastStatusMsg(const json &msgJson)
{
const std::string msg_json_str = msgJson.dump();

if(msg_json_str == last_status_msg_) {
return;
}

last_status_msg_ = msg_json_str;

if(!throttle_timer_set_) {
throttle_timer_.expires_from_now(boost::posix_time::milliseconds(THROTTLE_WAIT_TIME_MS));
throttle_timer_.async_wait(boost::bind(&CurrentSongController::ReportCurrentSongToServices, this, _1));
throttle_timer_set_ = true;
}
}

void CurrentSongController::ReportCurrentSongToServices(const boost::system::error_code& error)
{
throttle_timer_set_ = false;
if(error)
return;

mqtt_service_->ReportCurrentSong(last_status_msg_);
ws_service_->ReportCurrentSong(last_status_msg_);
}

}


47 changes: 47 additions & 0 deletions src/current_song_controller.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#ifndef CURRENT_SONG_CONTROLLER_H_
#define CURRENT_SONG_CONTROLLER_H_

#include <string>
#include <boost/asio/deadline_timer.hpp>
#include "nlohmann/json_fwd.hpp"

#include <player_events_ifc.h>

#include <mqtt_api.h>
#include <web_sockets_api.h>

using json = nlohmann::json;

namespace wavplayeralsa {

class CurrentSongController : public PlayerEventsIfc
{

public:
CurrentSongController(boost::asio::io_service &io_service, MqttApi *mqtt_service, WebSocketsApi *ws_service);

public:
void NewSongStatus(const std::string &file_id, uint64_t start_time_millis_since_epoch, double speed);
void NoSongPlayingStatus(const std::string &file_id);

private:
void UpdateLastStatusMsg(const json &msgJson);
void ReportCurrentSongToServices(const boost::system::error_code& error);

private:
MqttApi *mqtt_service_;
WebSocketsApi *ws_service_;

private:
std::string last_status_msg_;

private:
// throttling issues:
const int THROTTLE_WAIT_TIME_MS = 50;
boost::asio::deadline_timer throttle_timer_;
bool throttle_timer_set_ = false;

};
}

#endif // CURRENT_SONG_CONTROLLER_H_
37 changes: 5 additions & 32 deletions src/mqtt_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

#include <iostream>
#include <boost/date_time/time_duration.hpp>
#include "nlohmann/json.hpp"

using json = nlohmann::json;

namespace wavplayeralsa {

Expand All @@ -21,10 +18,6 @@ namespace wavplayeralsa {
player_action_callback_ = player_action_callback;
logger_ = logger;

json j;
j["song_is_playing"] = false;
last_status_msg_ = j.dump();

const char *mqtt_client_id = "wavplayeralsa";
logger_->info("creating mqtt connection to host {} on port {} with client id {}", mqtt_host, mqtt_port, mqtt_client_id);
logger_->info("will publish current song updates on topic {}", CURRENT_SONG_TOPIC);
Expand Down Expand Up @@ -65,34 +58,14 @@ namespace wavplayeralsa {

}

void MqttApi::NewSongStatus(const std::string &file_id, uint64_t start_time_millis_since_epoch, double speed)
void MqttApi::ReportCurrentSong(const std::string &json_str)
{
json j;
j["song_is_playing"] = true;
j["file_id"] = file_id;
j["start_time_millis_since_epoch"] = start_time_millis_since_epoch;
j["speed"] = speed;
UpdateLastStatusMsg(j);
}
last_status_msg_ = json_str;

void MqttApi::NoSongPlayingStatus(const std::string &file_id)
{
json j;
j["song_is_playing"] = false;
j["stopped_file_id"] = file_id;
UpdateLastStatusMsg(j);
}

void MqttApi::UpdateLastStatusMsg(const json &msgJson)
{
const std::string msg_json_str = msgJson.dump();

if(msg_json_str == last_status_msg_) {
return;
if(this->mqtt_client_)
{
this->mqtt_client_->publish_exactly_once(CURRENT_SONG_TOPIC, last_status_msg_, true);
}

last_status_msg_ = msg_json_str;
this->mqtt_client_->publish_exactly_once(CURRENT_SONG_TOPIC, last_status_msg_, true);
}

}
12 changes: 3 additions & 9 deletions src/mqtt_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,23 @@
#include "spdlog/spdlog.h"
#define MQTT_NO_TLS
#include "mqtt_cpp/mqtt_client_cpp.hpp"
#include "nlohmann/json_fwd.hpp"

#include "player_actions_ifc.h"
#include "player_events_ifc.h"

#ifndef CURRENT_SONG_TOPIC
#define CURRENT_SONG_TOPIC "current-song"
#endif // CURRENT_SONG_TOPIC

namespace wavplayeralsa {

class MqttApi : public PlayerEventsIfc {
class MqttApi {

public:
MqttApi(boost::asio::io_service &io_service);
void Initialize(std::shared_ptr<spdlog::logger> logger, PlayerActionsIfc *player_action_callback, const std::string &mqtt_host, uint16_t mqtt_port);

public:
void NewSongStatus(const std::string &file_id, uint64_t start_time_millis_since_epoch, double speed);
void NoSongPlayingStatus(const std::string &file_id);

private:
void UpdateLastStatusMsg(const nlohmann::json &msgJson);
void ReportCurrentSong(const std::string &json_str);

private:
const int reconnect_wait_ms = 2000;
Expand All @@ -44,7 +38,7 @@ namespace wavplayeralsa {
boost::asio::io_service &io_service_;

private:
std::shared_ptr<mqtt::sync_client<mqtt::tcp_endpoint<mqtt::as::ip::tcp::socket, mqtt::as::io_service::strand>>> mqtt_client_;
std::shared_ptr<mqtt::sync_client<mqtt::tcp_endpoint<mqtt::as::ip::tcp::socket, mqtt::as::io_service::strand>>> mqtt_client_ = nullptr;
boost::asio::deadline_timer reconnect_timer_;

std::string last_status_msg_;
Expand Down
10 changes: 7 additions & 3 deletions src/wavplayeralsa.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "mqtt_api.h"
#include "alsa_frames_transfer.h"
#include "audio_files_manager.h"
#include "current_song_controller.h"


/*
Expand All @@ -28,8 +29,10 @@ class WavPlayerAlsa {
public:

WavPlayerAlsa() :
web_sockets_api_(),
io_service_work_(io_service_),
mqtt_api_(io_service_)
mqtt_api_(io_service_),
current_song_controller_(io_service_, &mqtt_api_, &web_sockets_api_)
{

}
Expand Down Expand Up @@ -135,10 +138,9 @@ class WavPlayerAlsa {

if(UseMqtt()) {
mqtt_api_.Initialize(mqtt_api_logger_, &audio_files_manager, mqtt_host_, mqtt_port_);
audio_files_manager.RegisterPlayerEventsHandler(&mqtt_api_);
}

audio_files_manager.RegisterPlayerEventsHandler(&web_sockets_api_);
audio_files_manager.RegisterPlayerEventsHandler(&current_song_controller_);
}
catch(const std::exception &e) {
root_logger_->critical("failed initialization, unable to start player. {}", e.what());
Expand Down Expand Up @@ -243,6 +245,8 @@ class WavPlayerAlsa {
wavplayeralsa::MqttApi mqtt_api_;
wavplayeralsa::AudioFilesManager audio_files_manager;

wavplayeralsa::CurrentSongController current_song_controller_;

};


Expand Down
60 changes: 10 additions & 50 deletions src/web_sockets_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,18 @@
#include <boost/foreach.hpp>
#include <boost/bind.hpp>

#include "nlohmann/json.hpp"


using websocketpp::connection_hdl;

using json = nlohmann::json;


namespace wavplayeralsa {

WebSocketsApi::~WebSocketsApi() {
if(msg_throttle_timer_ != nullptr) {
delete msg_throttle_timer_;
msg_throttle_timer_ = nullptr;
}
WebSocketsApi::WebSocketsApi()
{

}

void WebSocketsApi::Initialize(std::shared_ptr<spdlog::logger> logger, boost::asio::io_service *io_service, uint16_t ws_listen_port) {

logger_ = logger;
io_service_ = io_service;
msg_throttle_timer_ = new boost::asio::deadline_timer(*io_service);

server_.clear_error_channels(websocketpp::log::alevel::all);
server_.clear_access_channels(websocketpp::log::alevel::all);
Expand All @@ -42,55 +32,25 @@ namespace wavplayeralsa {
server_.start_accept();

logger_->info("web sockets server started on port {}", ws_listen_port);
}

void WebSocketsApi::NewSongStatus(const std::string &file_id, uint64_t start_time_millis_since_epoch, double speed) {
json j;
j["song_is_playing"] = true;
j["file_id"] = file_id;
j["start_time_millis_since_epoch"] = start_time_millis_since_epoch;
j["speed"] = speed;
UpdateLastStatusMsg(j);
}

void WebSocketsApi::NoSongPlayingStatus(const std::string &file_id) {
json j;
j["song_is_playing"] = false;
j["stopped_file_id"] = file_id;
UpdateLastStatusMsg(j);
initialized = true;
}

void WebSocketsApi::UpdateLastStatusMsg(const json &msgJson) {
std::string msg_json_str = msgJson.dump();
void WebSocketsApi::ReportCurrentSong(const std::string &json_str)
{
last_status_msg_ = json_str;

// Test for msg duplication.
// this is just optimization, and may not cover all cases (json keys are not ordered),
// but its cheap and easy to test.
if(msg_json_str == last_status_msg_) {
if(!initialized)
return;
}

last_status_msg_ = msg_json_str;
if(!has_active_timer_) {
msg_throttle_timer_->expires_from_now(boost::posix_time::milliseconds(THROTTLE_WAIT_TIME_MS));
msg_throttle_timer_->async_wait(boost::bind(&WebSocketsApi::SendToAllConnectedClients, this, _1));
has_active_timer_ = true;
}
}

void WebSocketsApi::SendToAllConnectedClients(const boost::system::error_code &e) {
if(e) {
return;
}

logger_->info("new status message: {}. will send to all {} connected clients", last_status_msg_, connections_.size());
BOOST_FOREACH(const connection_hdl &hdl, connections_) {
server_.send(hdl, last_status_msg_, websocketpp::frame::opcode::text);
}
has_active_timer_ = false;
}

void WebSocketsApi::OnOpen(connection_hdl hdl) {
void WebSocketsApi::OnOpen(connection_hdl hdl)
{
connections_.insert(hdl);

const auto con = server_.get_con_from_hdl(hdl);
Expand Down
Loading

0 comments on commit fe8aa35

Please sign in to comment.