Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Control SD sync/async behaviour with env var on QNX #710

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion documentation/vsomeipUserGuide
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ On startup the following environment variables are read out:
Please note that <application> must be valid as part of an environment variable.
* `VSOMEIP_MANDATORY_CONFIGURATION_FILES`: vsomeip allows to specify mandatory configuration
files to speed-up application startup. While mandatory configuration files are read by all
applications, all other configuration files are only read by the application that is
applications, all other configuration files are only read by the application that is
responsible for connections to external devices. If this configuration variable is not set,
the default mandatory files vsomeip_std.json, vsomeip_app.json and vsomeip_plc.json are used.
* `VSOMEIP_CLIENTSIDELOGGING`: Set this variable to an empty string to enable logging of
Expand All @@ -296,6 +296,16 @@ On startup the following environment variables are read out:
`Environment=VSOMEIP_CLIENTSIDELOGGING="b003.0001 f013.000a 1001 1002"`
`Environment=VSOMEIP_CLIENTSIDELOGGING="b003.0001:f013.000a:1001:1002"`

For systems without a netlink service (_e.g._ QNX):
* `VSOMEIP_USE_ASYNCHRONOUS_SD`: (QNX only) Enables service discovery to run asynchronously
on startup and to (if enabled) wait on network availability.
* `VSOMEIP_WAIT_FOR_INTERFACE`: Iff service discovery is running
asynchronously, this env var will cause it to block on network availability
(mostly for use in a testing scenario.)
* `VSOMEIP_NETWORK_INT_READY_FILE`: Full path of the file that service
discovery is waiting on (waiting for the creation of) to indicate that a
network is available.

NOTE: If the file/folder that is configured by `VSOMEIP_CONFIGURATION` does _not_ exist,
the default configuration locations will be used.

Expand Down
19 changes: 19 additions & 0 deletions implementation/configuration/include/internal.hpp.in
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@

#define VSOMEIP_ROUTING_HOST_PORT_DEFAULT 31490

//
// Defines related to running SD asynchronously - this is not available upstream

// Env var that if exists will cause SD setup to be performed asynchronously and
// wait on network availability. This wait also impacts the routing_manager to
// also wait until a network interface is available before issuring OFFERs
#define VSOMEIP_ENV_USE_ASYNCHRONOUS_SD "VSOMEIP_USE_ASYNCHRONOUS_SD"

// Iff SD is running synchronously, the existence of this env var will cause the
// SD setup to still block on network availability. (mostly a testing scenario)
#define VSOMEIP_ENV_WAIT_FOR_INTERFACE "VSOMEIP_WAIT_FOR_INTERFACE"

// The current waiting mechanism is to block until a file (specified by this
// define)
#define VSOMEIP_NETWORK_INT_READY_FILE "@VSOMEIP_NETWORK_INT_READY_FILE@"

// /end of async change
//

#ifdef _WIN32
#define VSOMEIP_CFG_LIBRARY "vsomeip3-cfg.dll"
#else
Expand Down
2 changes: 1 addition & 1 deletion implementation/routing/include/routing_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ class routing_manager_impl: public routing_manager_base,
bool if_state_running_;
bool sd_route_set_;
bool routing_running_;
std::mutex pending_sd_offers_mutex_;
std::recursive_mutex pending_sd_offers_mutex_;
std::vector<std::pair<service_t, instance_t>> pending_sd_offers_;
#if defined(__linux__) || defined(ANDROID)
std::shared_ptr<netlink_connector> netlink_connector_;
Expand Down
40 changes: 22 additions & 18 deletions implementation/routing/src/routing_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ void routing_manager_impl::start() {
netlink_connector_->start();
#else
{
std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
std::lock_guard its_lock(pending_sd_offers_mutex_);
start_ip_routing();
}
#endif
Expand Down Expand Up @@ -448,7 +448,7 @@ bool routing_manager_impl::offer_service(client_t _client,
}

{
std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
std::lock_guard its_lock(pending_sd_offers_mutex_);
if (if_state_running_) {
init_service_info(_service, _instance, true);
} else {
Expand Down Expand Up @@ -542,7 +542,7 @@ void routing_manager_impl::stop_offer_service(client_t _client,
}
if (is_local) {
{
std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
std::lock_guard its_lock(pending_sd_offers_mutex_);
for (auto it = pending_sd_offers_.begin(); it != pending_sd_offers_.end(); ) {
if (it->first == _service && it->second == _instance) {
it = pending_sd_offers_.erase(it);
Expand Down Expand Up @@ -3881,7 +3881,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
}

// start processing of SD messages (incoming remote offers should lead to new subscribe messages)
discovery_->start();
discovery_->start([]() { });

// Trigger initial offer phase for relevant services
for (const auto &its_service : get_offered_services()) {
Expand Down Expand Up @@ -3950,7 +3950,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {

void routing_manager_impl::on_net_interface_or_route_state_changed(
bool _is_interface, const std::string &_if, bool _available) {
std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
std::lock_guard its_lock(pending_sd_offers_mutex_);
auto log_change_message = [&_if, _available, _is_interface](bool _warning) {
std::stringstream ss;
ss << (_is_interface ? "Network interface" : "Route") << " \"" << _if
Expand Down Expand Up @@ -4000,26 +4000,30 @@ void routing_manager_impl::on_net_interface_or_route_state_changed(
}

void routing_manager_impl::start_ip_routing() {
#if defined(_WIN32) || defined(__QNX__)
#if defined(_WIN32)
if_state_running_ = true;
#endif

if (routing_ready_handler_) {
routing_ready_handler_();
}
auto on_routing_started = [this]() -> void
{
std::lock_guard its_lock(pending_sd_offers_mutex_);

if_state_running_ = true;
for (auto const& its_service : pending_sd_offers_) {
init_service_info(its_service.first, its_service.second, true);
}
pending_sd_offers_.clear();

routing_running_ = true;
VSOMEIP_INFO << VSOMEIP_ROUTING_READY_MESSAGE;
};

if (discovery_) {
discovery_->start();
discovery_->start(on_routing_started);
} else {
init_routing_info();
on_routing_started();
}

for (auto its_service : pending_sd_offers_) {
init_service_info(its_service.first, its_service.second, true);
}
pending_sd_offers_.clear();

routing_running_ = true;
VSOMEIP_INFO << VSOMEIP_ROUTING_READY_MESSAGE;
}

void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class service_discovery {
virtual boost::asio::io_context &get_io() = 0;

virtual void init() = 0;
virtual void start() = 0;
virtual void start(std::function<void(void)> on_routing_started) = 0;
virtual void stop() = 0;

virtual void request_service(service_t _service, instance_t _instance,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <set>
#include <forward_list>
#include <atomic>
#include <thread>
#include <tuple>

#include <boost/asio/steady_timer.hpp>
Expand Down Expand Up @@ -67,7 +68,12 @@ class service_discovery_impl: public service_discovery,
std::recursive_mutex& get_subscribed_mutex();

void init();
void start();
void start(std::function<void(void)> on_routing_started);

// Function that'll do the startup work of SD, either in a new thread or
// in the current thread depending on the user's choice
void do_start_sd(std::function<void(void)> on_complete, bool wait_for_if);

void stop();

void request_service(service_t _service, instance_t _instance,
Expand Down Expand Up @@ -367,7 +373,9 @@ class service_discovery_impl: public service_discovery,
boost::asio::ip::address unicast_;
uint16_t port_;
bool reliable_;
std::mutex endpoint_mutex_;
std::shared_ptr<endpoint> endpoint_;
std::thread endpoint_getter_thread_;

std::shared_ptr<serializer> serializer_;
std::shared_ptr<deserializer> deserializer_;
Expand Down
126 changes: 113 additions & 13 deletions implementation/service_discovery/src/service_discovery_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,23 @@

#include <vsomeip/constants.hpp>

#if defined(__linux__) || defined(ANDROID) || defined(__QNX__)
#include <pthread.h>
#endif

#include <chrono>
#include <iomanip>
#include <forward_list>
#include <random>
#include <thread>

#include <chrono>

#ifdef __QNX__
#include <string_view>
#include <libgen.h>
#endif

#include <vsomeip/internal/logger.hpp>

#include "../include/constants.hpp"
Expand Down Expand Up @@ -71,7 +82,7 @@ service_discovery_impl::service_discovery_impl(
find_debounce_time_(VSOMEIP_SD_DEFAULT_FIND_DEBOUNCE_TIME),
find_debounce_timer_(_host->get_io()),
main_phase_timer_(_host->get_io()),
is_suspended_(false),
is_suspended_(true), // Start suspended: this is different than upstream as we start before a network interface is available
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduced a bug where SD tries to start multicast a second time, leading to an error in the log

is_diagnosis_(false),
last_msg_received_timer_(_host->get_io()),
last_msg_received_timer_timeout_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY +
Expand All @@ -80,8 +91,7 @@ service_discovery_impl::service_discovery_impl(
next_subscription_expiration_ = std::chrono::steady_clock::now() + std::chrono::hours(24);
}

service_discovery_impl::~service_discovery_impl() {
}
service_discovery_impl::~service_discovery_impl() = default;

boost::asio::io_context &service_discovery_impl::get_io() {
return io_;
Expand Down Expand Up @@ -158,16 +168,64 @@ service_discovery_impl::init() {
+ (cyclic_offer_delay_ / 10);
}

void
service_discovery_impl::start() {
auto wait_for_interface() -> bool {
#ifdef __QNX__
static std::string_view constexpr path = VSOMEIP_NETWORK_INT_READY_FILE;
if (path.empty()) {
VSOMEIP_ERROR << "No network interface signal path defined, service discovery will effectively be disabled.";
return false;
}
// Indefinite delay. If the condition we're waiting for doesn't occur then
// we are in an error state and thus should not continue.
static auto constexpr delay_ms = std::numeric_limits<int>::max();
static int constexpr poll_ms = 50;

auto const start = std::chrono::steady_clock::now();
VSOMEIP_DEBUG << "Waiting (blocking) indefinitely on network interface (signal=" << path << ")";
auto const r = waitfor(path.data(), delay_ms, poll_ms);
auto const end = std::chrono::steady_clock::now();
auto diff = end - start;
if (0 == r)
{
VSOMEIP_DEBUG << "Waited (blocked) for network interface (signal=" << path << ") for " << std::chrono::duration_cast<std::chrono::milliseconds>(diff).count() << " ms.";
return true;
} else {
VSOMEIP_ERROR << "Timedout waiting for network interface (signal=" << path << ") after " << std::chrono::duration_cast<std::chrono::milliseconds>(diff).count() << " ms: errno=" << errno << ", msg=" << strerror(errno);
return false;
}
#else
// Omitting a Linux implementation.
//
// Currently the Linux implementations use netlink in routing_manager_impl to
// receive network events, so this function wouldnt be expected to be used. In
// theory the QNX implementation could be modified to use pps (if available)
// to mirror the Linux functionality, which would avoid this function entirely.
//
// If however an implementation on linux wanted to use a pattern similar to
// this, inotify would likely be the best candidate to monitor the filesystem
// for this file.
return true;
#endif
}

void service_discovery_impl::do_start_sd(std::function<void(void)> on_complete, bool wait_for_if)
{
bool wait_for_result = false;
if (wait_for_if)
{
wait_for_result = wait_for_interface();
}

std::lock_guard<std::mutex> its_lock(endpoint_mutex_);
if (!endpoint_) {
endpoint_ = host_->create_service_discovery_endpoint(
sd_multicast_, port_, reliable_);
if (!endpoint_) {
VSOMEIP_ERROR << "Couldn't start service discovery";
VSOMEIP_ERROR << "Couldn't start service discovery" << (wait_for_result ? "" : " - likely due to timeing out waiting for a network interface");
return;
}
}

{
std::lock_guard<std::mutex> its_lock(sessions_received_mutex_);
sessions_received_.clear();
Expand All @@ -185,20 +243,61 @@ service_discovery_impl::start() {
i.second->set_sent_counter(0);
}
}

// rejoin multicast group
if (endpoint_ && !reliable_) {
auto its_server_endpoint
= std::dynamic_pointer_cast<udp_server_endpoint_impl>(endpoint_);
if (its_server_endpoint)
its_server_endpoint->join(sd_multicast_);
// rejoin multicast group
dynamic_cast<udp_server_endpoint_impl*>(
endpoint_.get())->join(sd_multicast_);
}
}
is_suspended_ = false;
start_main_phase_timer();
start_offer_debounce_timer(true);
start_find_debounce_timer(true);
start_ttl_timer();

on_complete();
}

void
service_discovery_impl::start(std::function<void(void)> on_routing_started) {
#if defined(__QNX__)
auto* const use_async_sd = getenv(VSOMEIP_ENV_USE_ASYNCHRONOUS_SD);
#else
// Linux/Android uses netlink, so there's less need for this
// asynchroneous service discovery
const char* use_async_sd = nullptr;
#endif

if (use_async_sd)
{
// Perform the SD setup in a new thread, and use a wait in that block
// to wait for the network to be available
VSOMEIP_DEBUG << "Starting service discovery using separate thread";
endpoint_getter_thread_ = std::thread(&service_discovery_impl::do_start_sd, this, on_routing_started, true);
endpoint_getter_thread_.detach();
#if defined(__linux__) || defined(ANDROID) || defined(__QNX__)
{
auto err = pthread_setname_np(endpoint_getter_thread_.native_handle(), "sd_start");
if (err) {
VSOMEIP_ERROR << "Could not rename SD thread: " << errno << ":" << strerror(errno);
}
}
#endif
} else {
// Perform the SD setup in the current thread (synchronously), and
// if VSOMEIP_ENV_WAIT_FOR_INTERFACE exists use a wait in that block
// to wait for the network to be available. Note that
// VSOMEIP_ENV_WAIT_FOR_INTERFACE mostly exists for performance
// testing.

#if defined(__QNX__)
auto* const wait_for_interface_env = getenv(VSOMEIP_ENV_WAIT_FOR_INTERFACE);
#else
const char* wait_for_interface_env = nullptr;
#endif
VSOMEIP_DEBUG << "Starting service discovery using main thread" << (wait_for_interface_env ? ", will block for network interface" : ", will assume network interface pre-exists");
do_start_sd(on_routing_started, wait_for_interface_env != nullptr);
}
}

void
Expand Down Expand Up @@ -3508,6 +3607,7 @@ service_discovery_impl::on_last_msg_received_timer_expired(
std::dec << last_msg_received_timer_timeout_.count() << "ms.";

// Rejoin multicast group
std::lock_guard<std::mutex> its_lock(endpoint_mutex_);
if (endpoint_ && !reliable_) {
auto its_server_endpoint
= std::dynamic_pointer_cast<udp_server_endpoint_impl>(endpoint_);
Expand All @@ -3518,7 +3618,7 @@ service_discovery_impl::on_last_msg_received_timer_expired(
}
{
boost::system::error_code ec;
std::lock_guard<std::mutex> its_lock(last_msg_received_timer_mutex_);
std::lock_guard<std::mutex> its_lock_inner(last_msg_received_timer_mutex_);
last_msg_received_timer_.expires_from_now(last_msg_received_timer_timeout_, ec);
last_msg_received_timer_.async_wait(
std::bind(
Expand Down
Loading