Skip to content

Commit

Permalink
Fix #277 - Stack overflow in Bin Mon message handler (#278)
Browse files Browse the repository at this point in the history
Fix was to launch message handlers in new threads from the common `MonitoringMetadataReceiver::handleReceive` method for both Bin and LS mon plugin message handlers. Message handlers modified to use const refs to `SceneStore` to prevent accidental copies.
  • Loading branch information
firthm01 authored May 15, 2024
1 parent 77184d2 commit 2b312be
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class BinauralMonitoringBackend {
bool isExporting() { return isExporting_; }

private:
void onSceneReceived(proto::SceneStore store);
void onSceneReceived(const proto::SceneStore& store);
void onConnection(communication::ConnectionId connectionId,
const std::string& streamEndpoint);
void onConnectionLost();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class SceneStore;
namespace communication {
class MonitoringMetadataReceiver {
public:
using RequestHandler = std::function<void(proto::SceneStore)>;
using RequestHandler = std::function<void(const proto::SceneStore& store)>;
MonitoringMetadataReceiver(std::shared_ptr<spdlog::logger> logger = nullptr);
~MonitoringMetadataReceiver();
MonitoringMetadataReceiver(const MonitoringMetadataReceiver&) = delete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ class MonitoringBackend {
bool isExporting() { return isExporting_; }

private:
void onSceneReceived(proto::SceneStore store);
void onSceneReceived(const proto::SceneStore& store);
void onConnection(communication::ConnectionId connectionId,
const std::string& streamEndpoint);
void onConnectionLost();
void updateActiveGains(proto::SceneStore store);
void updateActiveGains(const proto::SceneStore& store);

std::shared_ptr<spdlog::logger> logger_;
std::mutex gainsMutex_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct ItemGains {
class SceneGainsCalculator {
public:
SceneGainsCalculator(Layout outputLayout, int inputChannelCount);
bool update(proto::SceneStore store);
bool update(const proto::SceneStore &store);
Eigen::MatrixXf directGains();
Eigen::MatrixXf diffuseGains();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ BinauralMonitoringBackend::getLatestObjectsTypeMetadata(ConnId id) {
return std::optional<ObjectsEarMetadataAndRouting>();
}

void BinauralMonitoringBackend::onSceneReceived(proto::SceneStore store) {
void BinauralMonitoringBackend::onSceneReceived(
const proto::SceneStore& store) {
isExporting_ = store.has_is_exporting() && store.is_exporting();

size_t totalDsChannels = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "communication/monitoring_metadata_receiver.hpp"
#include "scene_store.pb.h"
#include <functional>
#include <future>

namespace ear {
namespace plugin {
Expand Down Expand Up @@ -62,7 +63,12 @@ void MonitoringMetadataReceiver::handleReceive(std::error_code ec,
if (!sceneStore.ParseFromArray(message.data(), message.size())) {
throw std::runtime_error("Failed to parse Scene Object");
}
handler_(std::move(sceneStore));
// Called by NNG callback on thread with small stack.
// Launch task in another thread to overcome stack limitation.
auto future = std::async(std::launch::async, [this, &sceneStore]() {
handler_(sceneStore);
});
future.get(); //blocking
} catch (const std::runtime_error& e) {
EAR_LOGGER_ERROR(
logger_, "Failed to parse and dispatch scene metadata: {}", e.what());
Expand Down
8 changes: 4 additions & 4 deletions ear-production-suite-plugins/lib/src/monitoring_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@ MonitoringBackend::~MonitoringBackend() {
controlConnection_.onConnectionEstablished(nullptr);
}

void MonitoringBackend::onSceneReceived(proto::SceneStore store) {
void MonitoringBackend::onSceneReceived(const proto::SceneStore& store) {
isExporting_ = store.has_is_exporting() && store.is_exporting();
updateActiveGains(std::move(store));
updateActiveGains(store);
}

GainHolder MonitoringBackend::currentGains() {
std::lock_guard<std::mutex> lock(gainsMutex_);
return gains_;
}

void MonitoringBackend::updateActiveGains(proto::SceneStore store) {
void MonitoringBackend::updateActiveGains(const proto::SceneStore& store) {
{
std::lock_guard<std::mutex> lock(gainsCalculatorMutex_);
gainsCalculator_.update(std::move(store));
gainsCalculator_.update(store);
}
{
std::lock_guard<std::mutex> lock(gainsMutex_);
Expand Down
56 changes: 23 additions & 33 deletions ear-production-suite-plugins/lib/src/scene_gains_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
#include "ear/metadata.hpp"
#include "helper/eps_to_ear_metadata_converter.hpp"
#include "helper/container_helpers.hpp"
#include <future>
#include <algorithm>


namespace {

int inputCount(ear::plugin::ItemGains const& itemGains) {
Expand Down Expand Up @@ -53,41 +51,33 @@ SceneGainsCalculator::SceneGainsCalculator(ear::Layout outputLayout,
totalOutputChannels{static_cast<int>(outputLayout.channels().size())},
totalInputChannels{inputChannelCount} {}

bool SceneGainsCalculator::update(proto::SceneStore store) {
// Called by NNG callback on thread with small stack.
// Launch task in another thread to overcome stack limitation.
auto future = std::async(std::launch::async, [this, store]() {

// First figure out what we need to process updates for
std::vector<communication::ConnectionId> cachedIdsChecklist;
cachedIdsChecklist.reserve(routingCache_.size());
for(auto const&[key, val] : routingCache_) {
cachedIdsChecklist.push_back(key);
}
/// Check-off found items, and also delete changed items from routing cache to be re-evaluated
for(const auto& item : store.monitoring_items()) {
auto itemId = communication::ConnectionId{ item.connection_id() };
cachedIdsChecklist.erase(std::remove(cachedIdsChecklist.begin(), cachedIdsChecklist.end(), itemId), cachedIdsChecklist.end());
if(item.changed()) {
removeItem(itemId);
}
}
/// Delete removed items from routing cache (i.e, those that weren't checked-off and therefore remain in cachedIdsChecklist)
for(const auto& itemId : cachedIdsChecklist) {
bool SceneGainsCalculator::update(const proto::SceneStore& store) {
// First figure out what we need to process updates for
std::vector<communication::ConnectionId> cachedIdsChecklist;
cachedIdsChecklist.reserve(routingCache_.size());
for(auto const&[key, val] : routingCache_) {
cachedIdsChecklist.push_back(key);
}
/// Check-off found items, and also delete changed items from routing cache to be re-evaluated
for(const auto& item : store.monitoring_items()) {
auto itemId = communication::ConnectionId{ item.connection_id() };
cachedIdsChecklist.erase(std::remove(cachedIdsChecklist.begin(), cachedIdsChecklist.end(), itemId), cachedIdsChecklist.end());
if(item.changed()) {
removeItem(itemId);
}
}
/// Delete removed items from routing cache (i.e, those that weren't checked-off and therefore remain in cachedIdsChecklist)
for(const auto& itemId : cachedIdsChecklist) {
removeItem(itemId);
}

// Now get the gain updates we need
for(const auto& item : store.monitoring_items()) {
/// If it's not in routingCache_, it's new or changed, so needs re-evaluating
if(!mapHasKey(routingCache_, communication::ConnectionId{ item.connection_id() })) {
addOrUpdateItem(item);
}
// Now get the gain updates we need
for(const auto& item : store.monitoring_items()) {
/// If it's not in routingCache_, it's new or changed, so needs re-evaluating
if(!mapHasKey(routingCache_, communication::ConnectionId{ item.connection_id() })) {
addOrUpdateItem(item);
}

});

future.get();
}

return true;
}
Expand Down

0 comments on commit 2b312be

Please sign in to comment.