Skip to content

Commit

Permalink
Control SD sync/async behaviour with env var on QNX
Browse files Browse the repository at this point in the history
This is a change intended for systems like QNX which do not generally
have a service like netlink monitoring the network status that can
easily be tied into.  This is not setup to be used/enabled on Linux/Android.

SD will use the new asynchronous behaviour if the env var
VSOMEIP_USE_ASYNCHRONOUS_SD is set.  If not set, SD will use a
synchronous behaviour and only wait for the interface if the env var
VSOMEIP_WAIT_FOR_INTERFACE exists.  The latter (without waiting) is very
close to the upstream behaviour, the difference being that a callback is
still sent to sd::start() from routing_manager_impl, rather than
executing that code directly in routing_manager_impl.

Notes:
- mutexes
  - sd_impl::endpoint_ is now mutex protected
  - rm_impl::pending_sd_offers_mutex_ is a recursive mutex, as now it
    can be called in its own thread and the new thread in SD
- There is no timeout on the waitfor.  The original implementation had a
  configurable timeout, however because timing out left us in an error
  state anyways, this timeout was removed (raised to
  numeric_limits<int>::max() = ~45 days, give or take.)
  • Loading branch information
kheaactua committed May 28, 2024
1 parent 6c0e9db commit 70c1670
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 35 deletions.
12 changes: 11 additions & 1 deletion documentation/vsomeipUserGuide
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ On startup the following environment variables are read out:
Please note that <application> must be valid as part of an environment variable.
* `VSOMEIP_MANDATORY_CONFIGURATION_FILES`: vsomeip allows to specify mandatory configuration
files to speed-up application startup. While mandatory configuration files are read by all
applications, all other configuration files are only read by the application that is
applications, all other configuration files are only read by the application that is
responsible for connections to external devices. If this configuration variable is not set,
the default mandatory files vsomeip_std.json, vsomeip_app.json and vsomeip_plc.json are used.
* `VSOMEIP_CLIENTSIDELOGGING`: Set this variable to an empty string to enable logging of
Expand All @@ -296,6 +296,16 @@ On startup the following environment variables are read out:
`Environment=VSOMEIP_CLIENTSIDELOGGING="b003.0001 f013.000a 1001 1002"`
`Environment=VSOMEIP_CLIENTSIDELOGGING="b003.0001:f013.000a:1001:1002"`

For systems without a netlink service (_e.g._ QNX):
* `VSOMEIP_USE_ASYNCHRONOUS_SD`: (QNX only) Enables service discovery to run asynchronously
on startup and to (if enabled) wait on network availability.
* `VSOMEIP_WAIT_FOR_INTERFACE`: Iff service discovery is running
asynchronously, this env var will cause it to block on network availability
(mostly for use in a testing scenario.)
* `VSOMEIP_NETWORK_INT_READY_FILE`: Full path of the file that service
discovery is waiting on (waiting for the creation of) to indicate that a
network is available.

NOTE: If the file/folder that is configured by `VSOMEIP_CONFIGURATION` does _not_ exist,
the default configuration locations will be used.

Expand Down
19 changes: 19 additions & 0 deletions implementation/configuration/include/internal.hpp.in
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@

#define VSOMEIP_ROUTING_HOST_PORT_DEFAULT 31490

//
// Defines related to running SD asynchronously - this is not available upstream

// Env var that if exists will cause SD setup to be performed asynchronously and
// wait on network availability. This wait also impacts the routing_manager to
// also wait until a network interface is available before issuring OFFERs
#define VSOMEIP_ENV_USE_ASYNCHRONOUS_SD "VSOMEIP_USE_ASYNCHRONOUS_SD"

// Iff SD is running synchronously, the existence of this env var will cause the
// SD setup to still block on network availability. (mostly a testing scenario)
#define VSOMEIP_ENV_WAIT_FOR_INTERFACE "VSOMEIP_WAIT_FOR_INTERFACE"

// The current waiting mechanism is to block until a file (specified by this
// define)
#define VSOMEIP_NETWORK_INT_READY_FILE "@VSOMEIP_NETWORK_INT_READY_FILE@"

// /end of async change
//

#ifdef _WIN32
#define VSOMEIP_CFG_LIBRARY "vsomeip3-cfg.dll"
#else
Expand Down
2 changes: 1 addition & 1 deletion implementation/routing/include/routing_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ class routing_manager_impl: public routing_manager_base,
bool if_state_running_;
bool sd_route_set_;
bool routing_running_;
std::mutex pending_sd_offers_mutex_;
std::recursive_mutex pending_sd_offers_mutex_;
std::vector<std::pair<service_t, instance_t>> pending_sd_offers_;
#if defined(__linux__) || defined(ANDROID)
std::shared_ptr<netlink_connector> netlink_connector_;
Expand Down
40 changes: 22 additions & 18 deletions implementation/routing/src/routing_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ void routing_manager_impl::start() {
netlink_connector_->start();
#else
{
std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
std::lock_guard its_lock(pending_sd_offers_mutex_);
start_ip_routing();
}
#endif
Expand Down Expand Up @@ -448,7 +448,7 @@ bool routing_manager_impl::offer_service(client_t _client,
}

{
std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
std::lock_guard its_lock(pending_sd_offers_mutex_);
if (if_state_running_) {
init_service_info(_service, _instance, true);
} else {
Expand Down Expand Up @@ -542,7 +542,7 @@ void routing_manager_impl::stop_offer_service(client_t _client,
}
if (is_local) {
{
std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
std::lock_guard its_lock(pending_sd_offers_mutex_);
for (auto it = pending_sd_offers_.begin(); it != pending_sd_offers_.end(); ) {
if (it->first == _service && it->second == _instance) {
it = pending_sd_offers_.erase(it);
Expand Down Expand Up @@ -3881,7 +3881,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
}

// start processing of SD messages (incoming remote offers should lead to new subscribe messages)
discovery_->start();
discovery_->start([]() { });

// Trigger initial offer phase for relevant services
for (const auto &its_service : get_offered_services()) {
Expand Down Expand Up @@ -3950,7 +3950,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {

void routing_manager_impl::on_net_interface_or_route_state_changed(
bool _is_interface, const std::string &_if, bool _available) {
std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
std::lock_guard its_lock(pending_sd_offers_mutex_);
auto log_change_message = [&_if, _available, _is_interface](bool _warning) {
std::stringstream ss;
ss << (_is_interface ? "Network interface" : "Route") << " \"" << _if
Expand Down Expand Up @@ -4000,26 +4000,30 @@ void routing_manager_impl::on_net_interface_or_route_state_changed(
}

void routing_manager_impl::start_ip_routing() {
#if defined(_WIN32) || defined(__QNX__)
#if defined(_WIN32)
if_state_running_ = true;
#endif

if (routing_ready_handler_) {
routing_ready_handler_();
}
auto on_routing_started = [this]() -> void
{
std::lock_guard its_lock(pending_sd_offers_mutex_);

if_state_running_ = true;
for (auto const& its_service : pending_sd_offers_) {
init_service_info(its_service.first, its_service.second, true);
}
pending_sd_offers_.clear();

routing_running_ = true;
VSOMEIP_INFO << VSOMEIP_ROUTING_READY_MESSAGE;
};

if (discovery_) {
discovery_->start();
discovery_->start(on_routing_started);
} else {
init_routing_info();
on_routing_started();
}

for (auto its_service : pending_sd_offers_) {
init_service_info(its_service.first, its_service.second, true);
}
pending_sd_offers_.clear();

routing_running_ = true;
VSOMEIP_INFO << VSOMEIP_ROUTING_READY_MESSAGE;
}

void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class service_discovery {
virtual boost::asio::io_context &get_io() = 0;

virtual void init() = 0;
virtual void start() = 0;
virtual void start(std::function<void(void)> on_routing_started) = 0;
virtual void stop() = 0;

virtual void request_service(service_t _service, instance_t _instance,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <set>
#include <forward_list>
#include <atomic>
#include <thread>
#include <tuple>

#include <boost/asio/steady_timer.hpp>
Expand Down Expand Up @@ -67,7 +68,12 @@ class service_discovery_impl: public service_discovery,
std::recursive_mutex& get_subscribed_mutex();

void init();
void start();
void start(std::function<void(void)> on_routing_started);

// Function that'll do the startup work of SD, either in a new thread or
// in the current thread depending on the user's choice
void do_start_sd(std::function<void(void)> on_complete, bool wait_for_if);

void stop();

void request_service(service_t _service, instance_t _instance,
Expand Down Expand Up @@ -367,7 +373,9 @@ class service_discovery_impl: public service_discovery,
boost::asio::ip::address unicast_;
uint16_t port_;
bool reliable_;
std::mutex endpoint_mutex_;
std::shared_ptr<endpoint> endpoint_;
std::thread endpoint_getter_thread_;

std::shared_ptr<serializer> serializer_;
std::shared_ptr<deserializer> deserializer_;
Expand Down
126 changes: 113 additions & 13 deletions implementation/service_discovery/src/service_discovery_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,23 @@

#include <vsomeip/constants.hpp>

#if defined(__linux__) || defined(ANDROID) || defined(__QNX__)
#include <pthread.h>
#endif

#include <chrono>
#include <iomanip>
#include <forward_list>
#include <random>
#include <thread>

#include <chrono>

#ifdef __QNX__
#include <string_view>
#include <libgen.h>
#endif

#include <vsomeip/internal/logger.hpp>

#include "../include/constants.hpp"
Expand Down Expand Up @@ -71,7 +82,7 @@ service_discovery_impl::service_discovery_impl(
find_debounce_time_(VSOMEIP_SD_DEFAULT_FIND_DEBOUNCE_TIME),
find_debounce_timer_(_host->get_io()),
main_phase_timer_(_host->get_io()),
is_suspended_(false),
is_suspended_(true), // Start suspended: this is different than upstream as we start before a network interface is available
is_diagnosis_(false),
last_msg_received_timer_(_host->get_io()),
last_msg_received_timer_timeout_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY +
Expand All @@ -80,8 +91,7 @@ service_discovery_impl::service_discovery_impl(
next_subscription_expiration_ = std::chrono::steady_clock::now() + std::chrono::hours(24);
}

service_discovery_impl::~service_discovery_impl() {
}
service_discovery_impl::~service_discovery_impl() = default;

boost::asio::io_context &service_discovery_impl::get_io() {
return io_;
Expand Down Expand Up @@ -158,16 +168,64 @@ service_discovery_impl::init() {
+ (cyclic_offer_delay_ / 10);
}

void
service_discovery_impl::start() {
auto wait_for_interface() -> bool {
#ifdef __QNX__
static std::string_view constexpr path = VSOMEIP_NETWORK_INT_READY_FILE;
if (path.empty()) {
VSOMEIP_ERROR << "No network interface signal path defined, service discovery will effectively be disabled.";
return false;
}
// Indefinite delay. If the condition we're waiting for doesn't occur then
// we are in an error state and thus should not continue.
static auto constexpr delay_ms = std::numeric_limits<int>::max();
static int constexpr poll_ms = 50;

auto const start = std::chrono::steady_clock::now();
VSOMEIP_DEBUG << "Waiting (blocking) indefinitely on network interface (signal=" << path << ")";
auto const r = waitfor(path.data(), delay_ms, poll_ms);
auto const end = std::chrono::steady_clock::now();
auto diff = end - start;
if (0 == r)
{
VSOMEIP_DEBUG << "Waited (blocked) for network interface (signal=" << path << ") for " << std::chrono::duration_cast<std::chrono::milliseconds>(diff).count() << " ms.";
return true;
} else {
VSOMEIP_ERROR << "Timedout waiting for network interface (signal=" << path << ") after " << std::chrono::duration_cast<std::chrono::milliseconds>(diff).count() << " ms: errno=" << errno << ", msg=" << strerror(errno);
return false;
}
#else
// Omitting a Linux implementation.
//
// Currently the Linux implementations use netlink in routing_manager_impl to
// receive network events, so this function wouldnt be expected to be used. In
// theory the QNX implementation could be modified to use pps (if available)
// to mirror the Linux functionality, which would avoid this function entirely.
//
// If however an implementation on linux wanted to use a pattern similar to
// this, inotify would likely be the best candidate to monitor the filesystem
// for this file.
return true;
#endif
}

void service_discovery_impl::do_start_sd(std::function<void(void)> on_complete, bool wait_for_if)
{
bool wait_for_result = false;
if (wait_for_if)
{
wait_for_result = wait_for_interface();
}

std::lock_guard<std::mutex> its_lock(endpoint_mutex_);
if (!endpoint_) {
endpoint_ = host_->create_service_discovery_endpoint(
sd_multicast_, port_, reliable_);
if (!endpoint_) {
VSOMEIP_ERROR << "Couldn't start service discovery";
VSOMEIP_ERROR << "Couldn't start service discovery" << (wait_for_result ? "" : " - likely due to timeing out waiting for a network interface");
return;
}
}

{
std::lock_guard<std::mutex> its_lock(sessions_received_mutex_);
sessions_received_.clear();
Expand All @@ -185,20 +243,61 @@ service_discovery_impl::start() {
i.second->set_sent_counter(0);
}
}

// rejoin multicast group
if (endpoint_ && !reliable_) {
auto its_server_endpoint
= std::dynamic_pointer_cast<udp_server_endpoint_impl>(endpoint_);
if (its_server_endpoint)
its_server_endpoint->join(sd_multicast_);
// rejoin multicast group
dynamic_cast<udp_server_endpoint_impl*>(
endpoint_.get())->join(sd_multicast_);
}
}
is_suspended_ = false;
start_main_phase_timer();
start_offer_debounce_timer(true);
start_find_debounce_timer(true);
start_ttl_timer();

on_complete();
}

void
service_discovery_impl::start(std::function<void(void)> on_routing_started) {
#if defined(__QNX__)
auto* const use_async_sd = getenv(VSOMEIP_ENV_USE_ASYNCHRONOUS_SD);
#else
// Linux/Android uses netlink, so there's less need for this
// asynchroneous service discovery
const char* use_async_sd = nullptr;
#endif

if (use_async_sd)
{
// Perform the SD setup in a new thread, and use a wait in that block
// to wait for the network to be available
VSOMEIP_DEBUG << "Starting service discovery using separate thread";
endpoint_getter_thread_ = std::thread(&service_discovery_impl::do_start_sd, this, on_routing_started, true);
endpoint_getter_thread_.detach();
#if defined(__linux__) || defined(ANDROID) || defined(__QNX__)
{
auto err = pthread_setname_np(endpoint_getter_thread_.native_handle(), "sd_start");
if (err) {
VSOMEIP_ERROR << "Could not rename SD thread: " << errno << ":" << strerror(errno);
}
}
#endif
} else {
// Perform the SD setup in the current thread (synchronously), and
// if VSOMEIP_ENV_WAIT_FOR_INTERFACE exists use a wait in that block
// to wait for the network to be available. Note that
// VSOMEIP_ENV_WAIT_FOR_INTERFACE mostly exists for performance
// testing.

#if defined(__QNX__)
auto* const wait_for_interface_env = getenv(VSOMEIP_ENV_WAIT_FOR_INTERFACE);
#else
const char* wait_for_interface_env = nullptr;
#endif
VSOMEIP_DEBUG << "Starting service discovery using main thread" << (wait_for_interface_env ? ", will block for network interface" : ", will assume network interface pre-exists");
do_start_sd(on_routing_started, wait_for_interface_env != nullptr);
}
}

void
Expand Down Expand Up @@ -3508,6 +3607,7 @@ service_discovery_impl::on_last_msg_received_timer_expired(
std::dec << last_msg_received_timer_timeout_.count() << "ms.";

// Rejoin multicast group
std::lock_guard<std::mutex> its_lock(endpoint_mutex_);
if (endpoint_ && !reliable_) {
auto its_server_endpoint
= std::dynamic_pointer_cast<udp_server_endpoint_impl>(endpoint_);
Expand All @@ -3518,7 +3618,7 @@ service_discovery_impl::on_last_msg_received_timer_expired(
}
{
boost::system::error_code ec;
std::lock_guard<std::mutex> its_lock(last_msg_received_timer_mutex_);
std::lock_guard<std::mutex> its_lock_inner(last_msg_received_timer_mutex_);
last_msg_received_timer_.expires_from_now(last_msg_received_timer_timeout_, ec);
last_msg_received_timer_.async_wait(
std::bind(
Expand Down

0 comments on commit 70c1670

Please sign in to comment.