diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index c79e4eb157..4e3ab7ed6e 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -262,8 +262,6 @@ namespace eosio { struct connection_detail { std::string host; connection_ptr c; - tcp::endpoint active_ip; - tcp::resolver::results_type ips; }; using connection_details_index = multi_index_container< @@ -315,6 +313,8 @@ namespace eosio { boost::asio::steady_timer::duration conn_period, uint32_t maximum_client_count); + std::chrono::milliseconds get_heartbeat_timeout() const { return heartbeat_timeout; } + uint32_t get_max_client_count() const { return max_client_count; } fc::microseconds get_connector_period() const; @@ -332,8 +332,6 @@ namespace eosio { void add(connection_ptr c); string connect(const string& host, const string& p2p_address); string resolve_and_connect(const string& host, const string& p2p_address); - void update_connection_endpoint(connection_ptr c, const tcp::endpoint& endpoint); - void connect(const connection_ptr& c); string disconnect(const string& host); void close_all(); @@ -926,7 +924,7 @@ namespace eosio { bool populate_handshake( handshake_message& hello ) const; - bool reconnect(); + bool resolve_and_connect(); void connect( const tcp::resolver::results_type& endpoints ); void start_read_message(); @@ -1100,16 +1098,21 @@ namespace eosio { }; - std::tuple split_host_port_type(const std::string& peer_add) { + std::tuple split_host_port_type(const std::string& peer_add, bool incoming) { // host:port:[|] if (peer_add.empty()) return {}; string::size_type p = peer_add[0] == '[' ? peer_add.find(']') : 0; - if (p == string::npos) { - fc_wlog( logger, "Invalid peer address: ${peer}", ("peer", peer_add) ); + string::size_type colon = p != string::npos ? peer_add.find(':', p) : string::npos; + if (colon == std::string::npos || colon == 0) { + // if incoming then not an error this peer can do anything about + if (incoming) { + fc_dlog( logger, "Invalid peer address. must be \"host:port[:|]\": ${p}", ("p", peer_add) ); + } else { + fc_elog( logger, "Invalid peer address. must be \"host:port[:|]\": ${p}", ("p", peer_add) ); + } return {}; } - string::size_type colon = peer_add.find(':', p); string::size_type colon2 = peer_add.find(':', colon + 1); string::size_type end = colon2 == string::npos ? string::npos : peer_add.find_first_of( " :+=.,<>!$%^&(*)|-#@\t", colon2 + 1 ); // future proof by including most symbols without using regex @@ -1186,8 +1189,8 @@ namespace eosio { last_handshake_sent(), p2p_address( endpoint ) { + set_connection_type( peer_address() ); my_impl->mark_bp_connection(this); - update_endpoints(); fc_ilog( logger, "created connection - ${c} to ${n}", ("c", connection_id)("n", endpoint) ); } @@ -1202,7 +1205,6 @@ namespace eosio { last_handshake_recv(), last_handshake_sent() { - update_endpoints(); fc_dlog( logger, "new connection - ${c} object created for peer ${address}:${port} from listener ${addr}", ("c", connection_id)("address", log_remote_endpoint_ip)("port", log_remote_endpoint_port)("addr", listen_address) ); } @@ -1235,7 +1237,7 @@ namespace eosio { // called from connection strand void connection::set_connection_type( const std::string& peer_add ) { - auto [host, port, type] = split_host_port_type(peer_add); + auto [host, port, type] = split_host_port_type(peer_add, false); if( type.empty() ) { fc_dlog( logger, "Setting connection - ${c} type for: ${peer} to both transactions and blocks", ("c", connection_id)("peer", peer_add) ); connection_type = both; @@ -1296,6 +1298,7 @@ namespace eosio { bool connection::start_session() { verify_strand_in_this_thread( strand, __func__, __LINE__ ); + update_endpoints(); boost::asio::ip::tcp::no_delay nodelay( true ); boost::system::error_code ec; socket->set_option( nodelay, ec ); @@ -2745,31 +2748,6 @@ namespace eosio { //------------------------------------------------------------------------ - bool connection::reconnect() { - switch ( no_retry ) { - case no_reason: - case wrong_version: - case benign_other: - case duplicate: // attempt reconnect in case connection has been dropped, should quickly disconnect if duplicate - break; - default: - fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry ))); - return false; - } - if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) { - fc::microseconds connector_period = my_impl->connections.get_connector_period(); - fc::lock_guard g( conn_mtx ); - if( last_close == fc::time_point() || last_close > fc::time_point::now() - connector_period ) { - return true; // true so doesn't remove from valid connections - } - } - connection_ptr c = shared_from_this(); - strand.post([c]() { - my_impl->connections.connect(c); - }); - return true; - } - // called from connection strand void connection::connect( const tcp::resolver::results_type& endpoints ) { set_state(connection_state::connecting); @@ -2779,7 +2757,6 @@ namespace eosio { boost::asio::bind_executor( strand, [c = shared_from_this(), socket=socket]( const boost::system::error_code& err, const tcp::endpoint& endpoint ) { if( !err && socket->is_open() && socket == c->socket ) { - my_impl->connections.update_connection_endpoint(c, endpoint); c->update_endpoints(endpoint); if( c->start_session() ) { c->send_handshake(); @@ -2829,7 +2806,7 @@ namespace eosio { fc_ilog(logger, "Accepted new connection: " + paddr_str); connections.any_of_supplied_peers([&listen_address, &paddr_str, &paddr_desc, &limit](const string& peer_addr) { - auto [host, port, type] = split_host_port_type(peer_addr); + auto [host, port, type] = split_host_port_type(peer_addr, false); if (host == paddr_str) { if (limit > 0) { fc_dlog(logger, "Connection inbound to ${la} from ${a} is a configured p2p-peer-address and will not be throttled", ("la", listen_address)("a", paddr_desc)); @@ -3325,9 +3302,9 @@ namespace eosio { } if( incoming() ) { - auto [host, port, type] = split_host_port_type(msg.p2p_address); + auto [host, port, type] = split_host_port_type(msg.p2p_address, true); if (host.size()) - set_connection_type( msg.p2p_address ); + set_connection_type( msg.p2p_address); peer_dlog( this, "checking for duplicate" ); auto is_duplicate = [&](const connection_ptr& check) { @@ -4499,7 +4476,7 @@ namespace eosio { //---------------------------------------------------------------------------- size_t connections_manager::number_connections() const { - std::lock_guard g(connections_mtx); + std::shared_lock g(connections_mtx); return connections.size(); } @@ -4528,8 +4505,9 @@ namespace eosio { update_p2p_connection_metrics = std::move(fun); } + // can be called from any thread void connections_manager::connect_supplied_peers(const string& p2p_address) { - std::unique_lock g(connections_mtx); + std::shared_lock g(connections_mtx); chain::flat_set peers = supplied_peers; g.unlock(); for (const auto& peer : peers) { @@ -4539,12 +4517,9 @@ namespace eosio { void connections_manager::add( connection_ptr c ) { std::lock_guard g( connections_mtx ); - boost::system::error_code ec; - auto endpoint = c->socket->remote_endpoint(ec); connections.insert( connection_detail{ .host = c->peer_address(), - .c = std::move(c), - .active_ip = endpoint} ); + .c = std::move(c)} ); } // called by API @@ -4556,62 +4531,72 @@ namespace eosio { } string connections_manager::resolve_and_connect( const string& peer_address, const string& listen_address ) { - string::size_type colon = peer_address.find(':'); - if (colon == std::string::npos || colon == 0) { - fc_elog( logger, "Invalid peer address. must be \"host:port[:|]\": ${p}", ("p", peer_address) ); + auto [host, port, type] = split_host_port_type(peer_address, false); + if (host.empty()) { return "invalid peer address"; } - std::lock_guard g( connections_mtx ); - if( find_connection_i( peer_address ) ) - return "already connected"; - - auto [host, port, type] = split_host_port_type(peer_address); - - auto resolver = std::make_shared( my_impl->thread_pool.get_executor() ); - - resolver->async_resolve(host, port, - [resolver, host = host, port = port, peer_address = peer_address, listen_address = listen_address, this]( const boost::system::error_code& err, const tcp::resolver::results_type& results ) { - connection_ptr c = std::make_shared( peer_address, listen_address ); - c->set_heartbeat_timeout( heartbeat_timeout ); - std::lock_guard g( connections_mtx ); - auto [it, inserted] = connections.emplace( connection_detail{ - .host = peer_address, - .c = std::move(c), - .ips = results - }); - if( !err ) { - it->c->connect( results ); - } else { - fc_wlog( logger, "Unable to resolve ${host}:${port} ${error}", - ("host", host)("port", port)( "error", err.message() ) ); - it->c->set_state(connection::connection_state::closed); - ++(it->c->consecutive_immediate_connection_close); - } - } ); + { + std::shared_lock g( connections_mtx ); + if( find_connection_i( peer_address ) ) + return "already connected"; + } - return "added connection"; - } + connection_ptr c = std::make_shared( peer_address, listen_address ); + if (c->resolve_and_connect()) { + add(std::move(c)); - void connections_manager::update_connection_endpoint(connection_ptr c, - const tcp::endpoint& endpoint) { - std::unique_lock g( connections_mtx ); - auto& index = connections.get(); - const auto& it = index.find(c); - if( it != index.end() ) { - index.modify(it, [endpoint](connection_detail& cd) { - cd.active_ip = endpoint; - }); + return "added connection"; } + + return "connection failed"; } - void connections_manager::connect(const connection_ptr& c) { - std::lock_guard g( connections_mtx ); - const auto& index = connections.get(); - const auto& it = index.find(c); - if( it != index.end() ) { - it->c->connect( it->ips ); + // called from any thread + bool connection::resolve_and_connect() { + switch ( no_retry ) { + case no_reason: + case wrong_version: + case benign_other: + case duplicate: // attempt reconnect in case connection has been dropped, should quickly disconnect if duplicate + break; + default: + fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry ))); + return false; + } + + auto [host, port, type] = split_host_port_type(peer_address(), false); + if (host.empty()) + return false; + + connection_ptr c = shared_from_this(); + + if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) { + fc::microseconds connector_period = my_impl->connections.get_connector_period(); + fc::lock_guard g( conn_mtx ); + if( last_close == fc::time_point() || last_close > fc::time_point::now() - connector_period ) { + return true; // true so doesn't remove from valid connections + } } + + strand.post([c, host, port]() { + auto resolver = std::make_shared( my_impl->thread_pool.get_executor() ); + resolver->async_resolve(host, port, boost::asio::bind_executor(c->strand, + [resolver, c, host, port] + ( const boost::system::error_code& err, const tcp::resolver::results_type& results ) { + c->set_heartbeat_timeout( my_impl->connections.get_heartbeat_timeout() ); + if( !err ) { + c->connect( results ); + } else { + fc_wlog( logger, "Unable to resolve ${host}:${port} ${error}", + ("host", host)("port", port)( "error", err.message() ) ); + c->set_state(connection::connection_state::closed); + ++c->consecutive_immediate_connection_close; + } + } ) ); + } ); + + return true; } // called by API @@ -4640,8 +4625,11 @@ namespace eosio { } std::optional connections_manager::status( const string& host )const { - std::shared_lock g( connections_mtx ); - auto con = find_connection_i( host ); + connection_ptr con; + { + std::shared_lock g( connections_mtx ); + con = find_connection_i( host ); + } if( con ) { return con->get_status(); } @@ -4649,12 +4637,19 @@ namespace eosio { } vector connections_manager::connection_statuses()const { + vector conns; vector result; - std::shared_lock g( connections_mtx ); - auto& index = connections.get(); - result.reserve( index.size() ); - for( const connection_detail& cd : index ) { - result.emplace_back( cd.c->get_status() ); + { + std::shared_lock g( connections_mtx ); + auto& index = connections.get(); + result.reserve( index.size() ); + conns.reserve( index.size() ); + for( const connection_detail& cd : index ) { + conns.emplace_back( cd.c ); + } + } + for (const auto& c : conns) { + result.push_back( c->get_status() ); } return result; } @@ -4716,7 +4711,7 @@ namespace eosio { auto cleanup = [&num_peers, &num_rm, this](vector&& reconnecting, vector&& removing) { for( auto& c : reconnecting ) { - if (!c->reconnect()) { + if (!c->resolve_and_connect()) { --num_peers; ++num_rm; removing.push_back(c); @@ -4782,7 +4777,7 @@ namespace eosio { assert(update_p2p_connection_metrics); auto from = from_connection.lock(); std::shared_lock g(connections_mtx); - auto& index = connections.get(); + const auto& index = connections.get(); size_t num_clients = 0, num_peers = 0, num_bp_peers = 0; net_plugin::p2p_per_connection_metrics per_connection(index.size()); for (auto it = index.begin(); it != index.end(); ++it) {