Skip to content

Commit

Permalink
Header Synchronization Improvement
Browse files Browse the repository at this point in the history
Added a way to know if a peer is up or down
in `ar_peers` by calling `ar_peers:activate_peer/1`
and `ar_peers:inactivate_peer/1` functions. To
know if a peer is activate, one can call
`ar_peers:is_active_peer/1` function.

Modified `ar_http_iface_client:get_block_shadow/2`
function to add more parameters, in particular
to modify default timeout and the value used to
pick a peer from peer list.

Modified `ar_header_sync` module to only select
active peer when synchronizing headers.

Fixed a typo in `bin/console`.
  • Loading branch information
humaite authored and humaite committed Oct 3, 2024
1 parent a283bcc commit e258b40
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 27 deletions.
7 changes: 5 additions & 2 deletions apps/arweave/src/ar_header_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,8 @@ check_fork(Height, H, TXRoot) ->
end.

download_block(H, H2, TXRoot) ->
Peers = ar_peers:get_peers(lifetime),
FullPeers = ar_peers:get_peers(lifetime),
Peers = [ P || P <- FullPeers, ar_peers:is_active_peer(P) ],
case ar_storage:read_block(H) of
unavailable ->
download_block(Peers, H, H2, TXRoot);
Expand All @@ -492,7 +493,9 @@ download_block(H, H2, TXRoot) ->

download_block(Peers, H, H2, TXRoot) ->
Fork_2_0 = ar_fork:height_2_0(),
case ar_http_iface_client:get_block_shadow(Peers, H) of
PeerLength = length(Peers),
Opts = #{ rand_min => PeerLength },
case ar_http_iface_client:get_block_shadow(Peers, H, Opts) of
unavailable ->
?LOG_WARNING([
{event, ar_header_sync_failed_to_download_block_header},
Expand Down
4 changes: 4 additions & 0 deletions apps/arweave/src/ar_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,12 @@ handle_info({gun_up, PID, _Protocol}, #state{ status_by_pid = StatusByPID } = St
[gen_server:reply(ReplyTo, {ok, PID}) || {ReplyTo, _} <- PendingRequests],
StatusByPID2 = maps:put(PID, {connected, MonitorRef, Peer}, StatusByPID),
prometheus_gauge:inc(outbound_connections),
ar_peers:activate_peer(Peer),
{noreply, State#state{ status_by_pid = StatusByPID2 }};
{connected, _MonitorRef, Peer} ->
?LOG_WARNING([{event, gun_up_pid_already_exists},
{peer, ar_util:format_peer(Peer)}]),
ar_peers:activate_peer(Peer),
{noreply, State}
end;

Expand Down Expand Up @@ -179,6 +181,7 @@ handle_info({gun_error, PID, Reason},
prometheus_gauge:dec(outbound_connections),
ok
end,
ar_peers:inactivate_peer(Peer),
gun:shutdown(PID),
?LOG_DEBUG([{event, connection_error}, {reason, io_lib:format("~p", [Reason])}]),
{noreply, State#state{ status_by_pid = StatusByPID2, pid_by_peer = PIDByPeer2 }}
Expand Down Expand Up @@ -208,6 +211,7 @@ handle_info({gun_down, PID, Protocol, Reason, _KilledStreams, _UnprocessedStream
prometheus_gauge:dec(outbound_connections),
ok
end,
ar_peers:inactivate_peer(Peer),
{noreply, State#state{ status_by_pid = StatusByPID2, pid_by_peer = PIDByPeer2 }}
end;

Expand Down
71 changes: 47 additions & 24 deletions apps/arweave/src/ar_http_iface_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@

-module(ar_http_iface_client).

-export([send_block_json/3, send_block_binary/3, send_block_binary/4, send_tx_json/3,
send_tx_binary/3, send_block_announcement/2, get_block_shadow/2, get_block_shadow/3,
get_block/3, get_tx/2, get_txs/2, get_tx_from_remote_peer/2, get_tx_data/2,
get_wallet_list_chunk/2, get_wallet_list_chunk/3, get_wallet_list/2,
add_peer/1, get_info/1, get_info/2, get_peers/1, get_time/2, get_height/1,
get_block_index/3, get_sync_record/1, get_sync_record/3,
get_chunk_binary/3, get_mempool/1, get_sync_buckets/1,
get_recent_hash_list/1, get_recent_hash_list_diff/2, get_reward_history/3,
get_block_time_history/3,
push_nonce_limiter_update/3, get_vdf_update/1, get_vdf_session/1,
get_previous_vdf_session/1, get_cm_partition_table/1, cm_h1_send/2, cm_h2_send/2,
cm_publish_send/2, get_jobs/2, post_partial_solution/2,
get_pool_cm_jobs/2, post_pool_cm_jobs/2, post_cm_partition_table_to_pool/2]).
-export([send_block_json/3, send_block_binary/3, send_block_binary/4,
send_tx_json/3, send_tx_binary/3, send_block_announcement/2,
get_block/3, get_tx/2, get_txs/2, get_tx_from_remote_peer/2,
get_tx_data/2, get_wallet_list_chunk/2, get_wallet_list_chunk/3,
get_wallet_list/2, add_peer/1, get_info/1, get_info/2, get_peers/1,
get_time/2, get_height/1, get_block_index/3, get_sync_record/1,
get_sync_record/3, get_chunk_binary/3, get_mempool/1,
get_sync_buckets/1, get_recent_hash_list/1,
get_recent_hash_list_diff/2, get_reward_history/3,
get_block_time_history/3, push_nonce_limiter_update/3,
get_vdf_update/1, get_vdf_session/1, get_previous_vdf_session/1,
get_cm_partition_table/1, cm_h1_send/2, cm_h2_send/2,
cm_publish_send/2, get_jobs/2, post_partial_solution/2,
get_pool_cm_jobs/2, post_pool_cm_jobs/2,
post_cm_partition_table_to_pool/2]).
-export([get_block_shadow/2, get_block_shadow/3, get_block_shadow/4]).

-include_lib("arweave/include/ar.hrl").
-include_lib("arweave/include/ar_config.hrl").
Expand Down Expand Up @@ -122,28 +125,48 @@ get_block(Peer, H, TXIndices) ->
{B, Time, Size}
end.

%% @doc Retreive a block shadow by hash or height from one of the given peers.
get_block_shadow([], _ID) ->
unavailable;
%%--------------------------------------------------------------------
%% @doc get a block shadow using default parameter.
%% @end
%%--------------------------------------------------------------------
get_block_shadow(Peers, ID) ->
Peer = lists:nth(rand:uniform(min(5, length(Peers))), Peers),
case get_block_shadow(ID, Peer, binary) of
get_block_shadow(Peers, ID, #{}).

%%--------------------------------------------------------------------
%% @doc Retrieve a block shadow by hash or height from one of the given
%% peers. Some options can be modified like `rand_min',
%% `connect_timeout' and `timeout'.
%% @see get_block_shadow/4
%% @end
%%--------------------------------------------------------------------
get_block_shadow([], _ID, _Opts) ->
unavailable;
get_block_shadow(Peers, ID, Opts) ->
RandMin = maps:get(rand_min, Opts, 5),
Random = rand:uniform(min(RandMin, length(Peers))),
Peer = lists:nth(Random, Peers),
case get_block_shadow(ID, Peer, binary, Opts) of
not_found ->
get_block_shadow(Peers -- [Peer], ID);
get_block_shadow(Peers -- [Peer], ID, Opts);
{ok, B, Time, Size} ->
{Peer, B, Time, Size}
end.

%% @doc Retreive a block shadow by hash or height from the given peer.
get_block_shadow(ID, Peer, Encoding) ->
%%--------------------------------------------------------------------
%% @doc Retrieve a block shadow by hash or height from the given peer.
%% @end
%%--------------------------------------------------------------------
get_block_shadow(ID, Peer, Encoding, Opts) ->
ConnectTimeout = maps:get(connect_timeout, Opts, 500),
Timeout = maps:get(timeout, Opts, 30),
handle_block_response(Peer, Encoding,
ar_http:req(#{
method => get,
peer => Peer,
path => get_block_path(ID, Encoding),
headers => p2p_headers(),
connect_timeout => 500,
timeout => 30 * 1000,
connect_timeout => ConnectTimeout,
timeout => Timeout * 1000,
limit => ?MAX_BODY_SIZE
})).

Expand Down Expand Up @@ -1102,7 +1125,7 @@ get_info(Peer) ->
timeout => 2 * 1000
})
of
{ok, {{<<"200">>, _}, _, JSON, _, _}} ->
{ok, {{<<"200">>, _}, _, JSON, _, _}} ->
case ar_serialize:json_decode(JSON, [return_maps]) of
{ok, JsonMap} ->
JsonMap;
Expand Down
42 changes: 42 additions & 0 deletions apps/arweave/src/ar_peers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-export([activate_peer/1, inactivate_peer/1, is_active_peer/1]).

%% The frequency in seconds of re-resolving DNS of peers configured by domain names.
-define(STORE_RESOLVED_DOMAIN_S, 60).
Expand Down Expand Up @@ -908,9 +909,50 @@ store_peers() ->
ar_storage:write_term(peers, {Total, Records})
end.

%%--------------------------------------------------------------------
%% @doc activate a peer (up).
%% @end
%%--------------------------------------------------------------------
activate_peer(Peer) ->
ets:insert(?MODULE, {{peer_active, Peer}, true}).

%%--------------------------------------------------------------------
%% @doc inactivate a peer (down).
%% @end
%%--------------------------------------------------------------------
inactivate_peer(Peer) ->
ets:insert(?MODULE, {{peer_active, Peer}, false}).

%%--------------------------------------------------------------------
%% @doc returns `true' if a peer is active, else `false'.
%% @end
%%--------------------------------------------------------------------
is_active_peer(Peer) ->
Pattern = {{peer_active, Peer}, '$1'},
Guard = [],
Select = ['$1'],
case ets:select(?MODULE, [{Pattern, Guard, Select}]) of
[] -> false;
[V] when is_boolean(V) -> V;
_ -> false
end.

%%%===================================================================
%%% Tests.
%%%===================================================================
activate_peer_test() ->
Peer = {100, 117, 109, 98, 1234},
% by default, if a peer is not present in the table, it must
% be down.
?assertEqual(false, is_active_peer(Peer)),

% activate peer
activate_peer(Peer),
?assertEqual(true, is_active_peer(Peer)),

% inactivate peer
inactivate_peer(Peer),
?assertEqual(false, is_active_peer(Peer)).

rotate_peer_ports_test() ->
Peer = {2, 2, 2, 2, 1},
Expand Down
2 changes: 1 addition & 1 deletion bin/console
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ SCRIPT_ACTION="remote_console"

# Sets $ARWEAVE and $ARWEAVE_* variables
source ${SCRIPT_DIR}/arweave.env
$(ARWEAVE} ${SCRIPT_ACTION}
${ARWEAVE} ${SCRIPT_ACTION}

0 comments on commit e258b40

Please sign in to comment.