Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.0.2 -> main] P2P: Resolve on reconnect #826

Merged
merged 8 commits into from
Sep 26, 2024
75 changes: 44 additions & 31 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ namespace eosio {
std::string host;
connection_ptr c;
tcp::endpoint active_ip;
tcp::resolver::results_type ips;
};

using connection_details_index = multi_index_container<
Expand Down Expand Up @@ -331,9 +330,9 @@ namespace eosio {

void add(connection_ptr c);
string connect(const string& host, const string& p2p_address);
string resolve_and_connect(const string& host, const string& p2p_address);
string resolve_and_connect(const string& host, const string& p2p_address, const connection_ptr& c = {});
void update_connection_endpoint(connection_ptr c, const tcp::endpoint& endpoint);
void connect(const connection_ptr& c);
void reconnect(const connection_ptr& c);
string disconnect(const string& host);
void close_all();

Expand Down Expand Up @@ -839,7 +838,7 @@ namespace eosio {

fc::sha256 conn_node_id;
string short_conn_node_id;
string listen_address; // address sent to peer in handshake
const string listen_address; // address sent to peer in handshake
string log_p2p_address;
string log_remote_endpoint_ip;
string log_remote_endpoint_port;
Expand Down Expand Up @@ -2754,16 +2753,21 @@ namespace eosio {
fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry )));
return false;
}

if (incoming())
return false;

if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) {
fc::microseconds connector_period = my_impl->connections.get_connector_period();
fc::lock_guard g( conn_mtx );
if( last_close == fc::time_point() || last_close > fc::time_point::now() - connector_period ) {
return true; // true so doesn't remove from valid connections
}
}

connection_ptr c = shared_from_this();
strand.post([c]() {
my_impl->connections.connect(c);
my_impl->connections.reconnect(c);
});
return true;
}
Expand Down Expand Up @@ -4553,39 +4557,48 @@ namespace eosio {
return resolve_and_connect( host, p2p_address );
}

string connections_manager::resolve_and_connect( const string& peer_address, const string& listen_address ) {
string connections_manager::resolve_and_connect( const string& peer_address, const string& listen_address,
const connection_ptr& c )
{
assert(!c || (c->peer_address() == peer_address && c->listen_address == listen_address));

string::size_type colon = peer_address.find(':');
if (colon == std::string::npos || colon == 0) {
fc_elog( logger, "Invalid peer address. must be \"host:port[:<blk>|<trx>]\": ${p}", ("p", peer_address) );
return "invalid peer address";
}

std::lock_guard g( connections_mtx );
if( find_connection_i( peer_address ) )
return "already connected";
{
std::lock_guard g(connections_mtx);
if (find_connection_i(peer_address))
return "already connected";
}

auto [host, port, type] = split_host_port_type(peer_address);

auto resolver = std::make_shared<tcp::resolver>( my_impl->thread_pool.get_executor() );

resolver->async_resolve(host, port,
[resolver, host = host, port = port, peer_address = peer_address, listen_address = listen_address, this]( const boost::system::error_code& err, const tcp::resolver::results_type& results ) {
connection_ptr c = std::make_shared<connection>( peer_address, listen_address );
c->set_heartbeat_timeout( heartbeat_timeout );
std::lock_guard g( connections_mtx );
auto [it, inserted] = connections.emplace( connection_detail{
.host = peer_address,
.c = std::move(c),
.ips = results
[this, resolver, c_org{c}, host, port, peer_address, listen_address]( const boost::system::error_code& err, const tcp::resolver::results_type& results ) {
connection_ptr c = c_org ? c_org : std::make_shared<connection>( peer_address, listen_address );
c->strand.post([this, resolver, c, err, results, host, port, peer_address]() {
c->set_heartbeat_timeout( heartbeat_timeout );
{
std::lock_guard g( connections_mtx );
connections.emplace( connection_detail{
.host = peer_address,
.c = c,
});
}
if( !err ) {
c->connect( results );
} else {
fc_wlog( logger, "Unable to resolve ${host}:${port} ${error}",
("host", host)("port", port)( "error", err.message() ) );
c->set_state(connection::connection_state::closed);
++(c->consecutive_immediate_connection_close);
}
});
if( !err ) {
it->c->connect( results );
} else {
fc_wlog( logger, "Unable to resolve ${host}:${port} ${error}",
("host", host)("port", port)( "error", err.message() ) );
it->c->set_state(connection::connection_state::closed);
++(it->c->consecutive_immediate_connection_close);
}
} );

return "added connection";
Expand All @@ -4603,13 +4616,13 @@ namespace eosio {
}
}

void connections_manager::connect(const connection_ptr& c) {
std::lock_guard g( connections_mtx );
const auto& index = connections.get<by_connection>();
const auto& it = index.find(c);
if( it != index.end() ) {
it->c->connect( it->ips );
void connections_manager::reconnect(const connection_ptr& c) {
{
std::lock_guard g( connections_mtx );
auto& index = connections.get<by_connection>();
index.erase(c);
}
resolve_and_connect(c->peer_address(), c->listen_address, c);
}

// called by API
Expand Down