Skip to content

Commit

Permalink
GH-525 Use strand for socket
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Oct 2, 2024
1 parent 3e3a59d commit 517b66d
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ namespace eosio {
std::atomic<boost::asio::ip::port_type> remote_endpoint_port{0};

public:
boost::asio::io_context::strand strand;
boost::asio::strand<boost::asio::io_context::executor_type> strand;
std::shared_ptr<tcp::socket> socket; // only accessed through strand after construction

fc::message_buffer<1024*1024> pending_message_buffer;
Expand Down Expand Up @@ -1271,8 +1271,8 @@ namespace eosio {

connection::connection( const string& endpoint, const string& listen_address )
: peer_addr( endpoint ),
strand( my_impl->thread_pool.get_executor() ),
socket( new tcp::socket( my_impl->thread_pool.get_executor() ) ),
strand( boost::asio::make_strand(my_impl->thread_pool.get_executor()) ),
socket( new tcp::socket( strand ) ),
listen_address( listen_address ),
log_p2p_address( endpoint ),
connection_id( ++my_impl->current_connection_id ),
Expand All @@ -1289,7 +1289,7 @@ namespace eosio {
connection::connection(tcp::socket&& s, const string& listen_address, size_t block_sync_rate_limit)
: peer_addr(),
block_sync_rate_limit(block_sync_rate_limit),
strand( my_impl->thread_pool.get_executor() ),
strand( boost::asio::make_strand(my_impl->thread_pool.get_executor()) ),
socket( new tcp::socket( std::move(s) ) ),
listen_address( listen_address ),
connection_id( ++my_impl->current_connection_id ),
Expand Down Expand Up @@ -1450,7 +1450,7 @@ namespace eosio {

void connection::close( bool reconnect, bool shutdown ) {
set_state(connection_state::closing);
strand.post( [self = shared_from_this(), reconnect, shutdown]() {
boost::asio::post(strand, [self = shared_from_this(), reconnect, shutdown]() {
self->_close( reconnect, shutdown );
});
}
Expand Down Expand Up @@ -1589,7 +1589,7 @@ namespace eosio {
void connection::send_handshake() {
if (closed())
return;
strand.post( [c = shared_from_this()]() {
boost::asio::post( strand, [c = shared_from_this()]() {
fc::unique_lock g_conn( c->conn_mtx );
if( c->populate_handshake( c->last_handshake_sent ) ) {
static_assert( std::is_same_v<decltype( c->sent_handshake_count ), int16_t>, "INT16_MAX based on int16_t" );
Expand Down Expand Up @@ -1682,7 +1682,7 @@ namespace eosio {
std::vector<boost::asio::const_buffer> bufs;
buffer_queue.fill_out_buffer( bufs );

strand.post( [c{std::move(c)}, bufs{std::move(bufs)}]() {
boost::asio::post( strand, [c{std::move(c)}, bufs{std::move(bufs)}]() {
boost::asio::async_write( *c->socket, bufs,
boost::asio::bind_executor( c->strand, [c, socket=c->socket]( boost::system::error_code ec, std::size_t w ) {
try {
Expand Down Expand Up @@ -2152,7 +2152,7 @@ namespace eosio {
sync_source = new_sync_source;
request_sent = true;
sync_active_time = std::chrono::steady_clock::now();
new_sync_source->strand.post( [new_sync_source, start, end, fork_head_num=chain_info.fork_head_num, lib=chain_info.lib_num]() {
boost::asio::post( new_sync_source->strand, [new_sync_source, start, end, fork_head_num=chain_info.fork_head_num, lib=chain_info.lib_num]() {
peer_ilog( new_sync_source, "requesting range ${s} to ${e}, fhead ${h}, lib ${lib}", ("s", start)("e", end)("h", fork_head_num)("lib", lib) );
new_sync_source->request_sync_blocks( start, end );
} );
Expand Down Expand Up @@ -2736,7 +2736,7 @@ namespace eosio {

send_buffer_type sb = buff_factory.get_send_buffer( b );

cp->strand.post( [cp, bnum, sb{std::move(sb)}]() {
boost::asio::post(cp->strand, [cp, bnum, sb{std::move(sb)}]() {
cp->latest_blk_time = std::chrono::steady_clock::now();
bool has_block = cp->peer_lib_num >= bnum;
if( !has_block ) {
Expand All @@ -2754,7 +2754,7 @@ namespace eosio {
my_impl->connections.for_each_block_connection( [exclude_peer, msg{std::move(msg)}]( auto& cp ) {
if( !cp->current() ) return true;
if( cp->connection_id == exclude_peer ) return true;
cp->strand.post( [cp, msg]() {
boost::asio::post(cp->strand, [cp, msg]() {
if (cp->protocol_version >= proto_savanna) {
if (vote_logger.is_enabled(fc::log_level::debug))
peer_dlog(cp, "sending vote msg");
Expand Down Expand Up @@ -2787,7 +2787,7 @@ namespace eosio {

send_buffer_type sb = buff_factory.get_send_buffer( trx );
fc_dlog( logger, "sending trx: ${id}, to connection - ${cid}", ("id", trx->id())("cid", cp->connection_id) );
cp->strand.post( [cp, sb{std::move(sb)}]() {
boost::asio::post( cp->strand, [cp, sb{std::move(sb)}]() {
cp->enqueue_buffer( sb, no_reason );
} );
} );
Expand Down Expand Up @@ -2885,7 +2885,7 @@ namespace eosio {
});

connection_ptr new_connection = std::make_shared<connection>(std::move(socket), listen_address, limit);
new_connection->strand.post([new_connection, this]() {
boost::asio::post(new_connection->strand, [new_connection, this]() {
if (new_connection->start_session()) {
connections.add(new_connection);
}
Expand Down Expand Up @@ -3771,7 +3771,7 @@ namespace eosio {
// may have come in on a different connection and posted into dispatcher strand before this one
if( my_impl->dispatcher.have_block( id ) || cc.block_exists( id ) ) { // thread-safe
my_impl->dispatcher.add_peer_block( id, c->connection_id );
c->strand.post( [c, id, ptr{std::move(ptr)}]() {
boost::asio::post(c->strand, [c, id, ptr{std::move(ptr)}]() {
const fc::microseconds age(fc::time_point::now() - ptr->timestamp);
my_impl->sync_master->sync_recv_block( c, id, block_header::num_from_id(id), false, age );
});
Expand Down Expand Up @@ -3799,7 +3799,7 @@ namespace eosio {
("cid", cid)("n", ptr->block_num())("id", id.str().substr(8,16)));
}
if( exception ) {
c->strand.post( [c, id, blk_num=ptr->block_num(), close_mode]() {
boost::asio::post(c->strand, [c, id, blk_num=ptr->block_num(), close_mode]() {
my_impl->sync_master->rejected_block( c, blk_num, close_mode );
my_impl->dispatcher.rejected_block( id );
});
Expand Down Expand Up @@ -3840,7 +3840,7 @@ namespace eosio {
fc::microseconds age(fc::time_point::now() - block->timestamp);
try {
if( blk_num <= lib || cc.validated_block_exists(blk_id) ) {
c->strand.post( [sync_master = my_impl->sync_master.get(),
boost::asio::post(c->strand, [sync_master = my_impl->sync_master.get(),
&dispatcher = my_impl->dispatcher, c, blk_id, blk_num, latency = age]() {
dispatcher.add_peer_block( blk_id, c->connection_id );
sync_master->sync_recv_block( c, blk_id, blk_num, true, latency );
Expand Down Expand Up @@ -3902,14 +3902,14 @@ namespace eosio {
});
}
});
c->strand.post( [sync_master = my_impl->sync_master.get(),
boost::asio::post(c->strand, [sync_master = my_impl->sync_master.get(),
&dispatcher = my_impl->dispatcher,
c, blk_id, blk_num, latency = age]() {
dispatcher.recv_block( c, blk_id, blk_num );
sync_master->sync_recv_block( c, blk_id, blk_num, true, latency );
});
} else {
c->strand.post( [sync_master = my_impl->sync_master.get(), &dispatcher = my_impl->dispatcher, c,
boost::asio::post(c->strand, [sync_master = my_impl->sync_master.get(), &dispatcher = my_impl->dispatcher, c,
block{std::move(block)}, blk_id, blk_num, reason]() mutable {
if( reason == unlinkable || reason == no_reason ) {
dispatcher.add_unlinkable_block( std::move(block), blk_id );
Expand Down Expand Up @@ -3954,7 +3954,7 @@ namespace eosio {
auto current_time = std::chrono::steady_clock::now();
my->connections.for_each_connection( [current_time]( const connection_ptr& c ) {
if( c->socket_is_open() ) {
c->strand.post([c, current_time]() {
boost::asio::post(c->strand, [c, current_time]() {
c->check_heartbeat(current_time);
} );
}
Expand Down Expand Up @@ -4690,8 +4690,8 @@ namespace eosio {
}

fc_dlog(logger, "connecting to ${h}:${p}", ("h", host)("p", port));
strand.post([c, host, port]() {
auto resolver = std::make_shared<tcp::resolver>( c->strand.context() );
boost::asio::post(strand, [c, host, port]() {
auto resolver = std::make_shared<tcp::resolver>( c->strand );
fc_dlog(logger, "resolve ${h}:${p}", ("h", host)("p", port));
resolver->async_resolve(host, port, boost::asio::bind_executor( c->strand,
[resolver, c, host, port]
Expand Down

0 comments on commit 517b66d

Please sign in to comment.