From e7c41c77b4aaa714ee849f13bd7e5ddd23229f32 Mon Sep 17 00:00:00 2001 From: humaite Date: Thu, 3 Oct 2024 09:55:02 +0200 Subject: [PATCH] Header Synchronization Improvement Added a way to know if a peer is up or down in `ar_peers` by calling `ar_peers:activate_peer/1` and `ar_peers:inactivate_peer/1` functions. To know if a peer is activate, one can call `ar_peers:is_active_peer/1` function. The full list of active peers (present in the ranking) can be returned by calling `ar_peers:get_peers(active)` function. Modified `ar_http_iface_client:get_block_shadow/2` function to add more parameters, in particular to modify default timeout and the value used to pick a peer from peer list. Modified `ar_header_sync` module to only select active peer when synchronizing headers. Fixed a typo in `bin/console`. --- apps/arweave/src/ar_header_sync.erl | 6 +- apps/arweave/src/ar_http.erl | 4 ++ apps/arweave/src/ar_http_iface_client.erl | 71 +++++++++++++++-------- apps/arweave/src/ar_peers.erl | 54 +++++++++++++++++ bin/console | 2 +- 5 files changed, 110 insertions(+), 27 deletions(-) diff --git a/apps/arweave/src/ar_header_sync.erl b/apps/arweave/src/ar_header_sync.erl index b4df4c491..370d3b18f 100644 --- a/apps/arweave/src/ar_header_sync.erl +++ b/apps/arweave/src/ar_header_sync.erl @@ -482,7 +482,7 @@ check_fork(Height, H, TXRoot) -> end. download_block(H, H2, TXRoot) -> - Peers = ar_peers:get_peers(lifetime), + Peers = ar_peers:get_peers(active), case ar_storage:read_block(H) of unavailable -> download_block(Peers, H, H2, TXRoot); @@ -492,7 +492,9 @@ download_block(H, H2, TXRoot) -> download_block(Peers, H, H2, TXRoot) -> Fork_2_0 = ar_fork:height_2_0(), - case ar_http_iface_client:get_block_shadow(Peers, H) of + PeerLength = length(Peers), + Opts = #{ rand_min => PeerLength }, + case ar_http_iface_client:get_block_shadow(Peers, H, Opts) of unavailable -> ?LOG_WARNING([ {event, ar_header_sync_failed_to_download_block_header}, diff --git a/apps/arweave/src/ar_http.erl b/apps/arweave/src/ar_http.erl index 0d19d5492..559a081a5 100644 --- a/apps/arweave/src/ar_http.erl +++ b/apps/arweave/src/ar_http.erl @@ -147,10 +147,12 @@ handle_info({gun_up, PID, _Protocol}, #state{ status_by_pid = StatusByPID } = St [gen_server:reply(ReplyTo, {ok, PID}) || {ReplyTo, _} <- PendingRequests], StatusByPID2 = maps:put(PID, {connected, MonitorRef, Peer}, StatusByPID), prometheus_gauge:inc(outbound_connections), + ar_peers:activate_peer(Peer), {noreply, State#state{ status_by_pid = StatusByPID2 }}; {connected, _MonitorRef, Peer} -> ?LOG_WARNING([{event, gun_up_pid_already_exists}, {peer, ar_util:format_peer(Peer)}]), + ar_peers:activate_peer(Peer), {noreply, State} end; @@ -179,6 +181,7 @@ handle_info({gun_error, PID, Reason}, prometheus_gauge:dec(outbound_connections), ok end, + ar_peers:inactivate_peer(Peer), gun:shutdown(PID), ?LOG_DEBUG([{event, connection_error}, {reason, io_lib:format("~p", [Reason])}]), {noreply, State#state{ status_by_pid = StatusByPID2, pid_by_peer = PIDByPeer2 }} @@ -208,6 +211,7 @@ handle_info({gun_down, PID, Protocol, Reason, _KilledStreams, _UnprocessedStream prometheus_gauge:dec(outbound_connections), ok end, + ar_peers:inactivate_peer(Peer), {noreply, State#state{ status_by_pid = StatusByPID2, pid_by_peer = PIDByPeer2 }} end; diff --git a/apps/arweave/src/ar_http_iface_client.erl b/apps/arweave/src/ar_http_iface_client.erl index a2321c515..76c57710f 100644 --- a/apps/arweave/src/ar_http_iface_client.erl +++ b/apps/arweave/src/ar_http_iface_client.erl @@ -4,19 +4,22 @@ -module(ar_http_iface_client). --export([send_block_json/3, send_block_binary/3, send_block_binary/4, send_tx_json/3, - send_tx_binary/3, send_block_announcement/2, get_block_shadow/2, get_block_shadow/3, - get_block/3, get_tx/2, get_txs/2, get_tx_from_remote_peer/2, get_tx_data/2, - get_wallet_list_chunk/2, get_wallet_list_chunk/3, get_wallet_list/2, - add_peer/1, get_info/1, get_info/2, get_peers/1, get_time/2, get_height/1, - get_block_index/3, get_sync_record/1, get_sync_record/3, - get_chunk_binary/3, get_mempool/1, get_sync_buckets/1, - get_recent_hash_list/1, get_recent_hash_list_diff/2, get_reward_history/3, - get_block_time_history/3, - push_nonce_limiter_update/3, get_vdf_update/1, get_vdf_session/1, - get_previous_vdf_session/1, get_cm_partition_table/1, cm_h1_send/2, cm_h2_send/2, - cm_publish_send/2, get_jobs/2, post_partial_solution/2, - get_pool_cm_jobs/2, post_pool_cm_jobs/2, post_cm_partition_table_to_pool/2]). +-export([send_block_json/3, send_block_binary/3, send_block_binary/4, + send_tx_json/3, send_tx_binary/3, send_block_announcement/2, + get_block/3, get_tx/2, get_txs/2, get_tx_from_remote_peer/2, + get_tx_data/2, get_wallet_list_chunk/2, get_wallet_list_chunk/3, + get_wallet_list/2, add_peer/1, get_info/1, get_info/2, get_peers/1, + get_time/2, get_height/1, get_block_index/3, get_sync_record/1, + get_sync_record/3, get_chunk_binary/3, get_mempool/1, + get_sync_buckets/1, get_recent_hash_list/1, + get_recent_hash_list_diff/2, get_reward_history/3, + get_block_time_history/3, push_nonce_limiter_update/3, + get_vdf_update/1, get_vdf_session/1, get_previous_vdf_session/1, + get_cm_partition_table/1, cm_h1_send/2, cm_h2_send/2, + cm_publish_send/2, get_jobs/2, post_partial_solution/2, + get_pool_cm_jobs/2, post_pool_cm_jobs/2, + post_cm_partition_table_to_pool/2]). +-export([get_block_shadow/2, get_block_shadow/3, get_block_shadow/4]). -include_lib("arweave/include/ar.hrl"). -include_lib("arweave/include/ar_config.hrl"). @@ -122,28 +125,48 @@ get_block(Peer, H, TXIndices) -> {B, Time, Size} end. -%% @doc Retreive a block shadow by hash or height from one of the given peers. -get_block_shadow([], _ID) -> - unavailable; +%%-------------------------------------------------------------------- +%% @doc get a block shadow using default parameter. +%% @end +%%-------------------------------------------------------------------- get_block_shadow(Peers, ID) -> - Peer = lists:nth(rand:uniform(min(5, length(Peers))), Peers), - case get_block_shadow(ID, Peer, binary) of + get_block_shadow(Peers, ID, #{}). + +%%-------------------------------------------------------------------- +%% @doc Retrieve a block shadow by hash or height from one of the given +%% peers. Some options can be modified like `rand_min', +%% `connect_timeout' and `timeout'. +%% @see get_block_shadow/4 +%% @end +%%-------------------------------------------------------------------- +get_block_shadow([], _ID, _Opts) -> + unavailable; +get_block_shadow(Peers, ID, Opts) -> + RandMin = maps:get(rand_min, Opts, 5), + Random = rand:uniform(min(RandMin, length(Peers))), + Peer = lists:nth(Random, Peers), + case get_block_shadow(ID, Peer, binary, Opts) of not_found -> - get_block_shadow(Peers -- [Peer], ID); + get_block_shadow(Peers -- [Peer], ID, Opts); {ok, B, Time, Size} -> {Peer, B, Time, Size} end. -%% @doc Retreive a block shadow by hash or height from the given peer. -get_block_shadow(ID, Peer, Encoding) -> +%%-------------------------------------------------------------------- +%% @doc Retrieve a block shadow by hash or height from the given peer. +%% @end +%%-------------------------------------------------------------------- +get_block_shadow(ID, Peer, Encoding, Opts) -> + ConnectTimeout = maps:get(connect_timeout, Opts, 500), + Timeout = maps:get(timeout, Opts, 30), handle_block_response(Peer, Encoding, ar_http:req(#{ method => get, peer => Peer, path => get_block_path(ID, Encoding), headers => p2p_headers(), - connect_timeout => 500, - timeout => 30 * 1000, + connect_timeout => ConnectTimeout, + timeout => Timeout * 1000, limit => ?MAX_BODY_SIZE })). @@ -1102,7 +1125,7 @@ get_info(Peer) -> timeout => 2 * 1000 }) of - {ok, {{<<"200">>, _}, _, JSON, _, _}} -> + {ok, {{<<"200">>, _}, _, JSON, _, _}} -> case ar_serialize:json_decode(JSON, [return_maps]) of {ok, JsonMap} -> JsonMap; diff --git a/apps/arweave/src/ar_peers.erl b/apps/arweave/src/ar_peers.erl index 412bc8805..825e363b9 100644 --- a/apps/arweave/src/ar_peers.erl +++ b/apps/arweave/src/ar_peers.erl @@ -17,6 +17,7 @@ ]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). +-export([activate_peer/1, inactivate_peer/1, is_active_peer/1]). %% The frequency in seconds of re-resolving DNS of peers configured by domain names. -define(STORE_RESOLVED_DOMAIN_S, 60). @@ -158,6 +159,9 @@ start_link() -> %% %% 'current' weights recently received data higher than old data and is most useful when we care %% more about maximizing throughput (e.g. when syncing chunks). +get_peers(active) -> + Peers = get_peers(lifetime), + [ P || P <- Peers, is_active_peer(P) ]; get_peers(Ranking) -> case catch ets:lookup(?MODULE, {peers, Ranking}) of {'EXIT', _} -> @@ -908,9 +912,59 @@ store_peers() -> ar_storage:write_term(peers, {Total, Records}) end. +%%-------------------------------------------------------------------- +%% @doc activate a peer (up). +%% @end +%%-------------------------------------------------------------------- +activate_peer(Peer) -> + ets:insert(?MODULE, {{peer_active, Peer}, true}). + +%%-------------------------------------------------------------------- +%% @doc inactivate a peer (down). +%% @end +%%-------------------------------------------------------------------- +inactivate_peer(Peer) -> + ets:insert(?MODULE, {{peer_active, Peer}, false}). + +%%-------------------------------------------------------------------- +%% @doc returns `true' if a peer is active, else `false'. +%% @end +%%-------------------------------------------------------------------- +is_active_peer(Peer) -> + Pattern = {{peer_active, Peer}, '$1'}, + Guard = [], + Select = ['$1'], + case ets:select(?MODULE, [{Pattern, Guard, Select}]) of + [] -> false; + [V] when is_boolean(V) -> V; + _ -> false + end. + %%%=================================================================== %%% Tests. %%%=================================================================== +active_peer_test() -> + ets:delete_all_objects(?MODULE), + Peer = {100, 117, 109, 98, 1234}, + + % get_peers(active) returns an empty list when no + % active nodes are present + ?assertEqual([], get_peers(active)), + + % by default, if a peer is not present in the table, it must + % be down. + set_ranked_peers(lifetime, [Peer]), + ?assertEqual(false, is_active_peer(Peer)), + + % activate peer + activate_peer(Peer), + ?assertEqual(true, is_active_peer(Peer)), + ?assertEqual([Peer], get_peers(active)), + + % inactivate peer + inactivate_peer(Peer), + ?assertEqual(false, is_active_peer(Peer)), + ?assertEqual([], get_peers(active)). rotate_peer_ports_test() -> Peer = {2, 2, 2, 2, 1}, diff --git a/bin/console b/bin/console index 6c4d7ddf5..8f77337f3 100755 --- a/bin/console +++ b/bin/console @@ -8,4 +8,4 @@ SCRIPT_ACTION="remote_console" # Sets $ARWEAVE and $ARWEAVE_* variables source ${SCRIPT_DIR}/arweave.env -$(ARWEAVE} ${SCRIPT_ACTION} +${ARWEAVE} ${SCRIPT_ACTION}