Skip to content

Commit

Permalink
Fix a number of bugs identified in the PR review
Browse files Browse the repository at this point in the history
Better error handling
Fix potential race condition in ar_block_cache update
Limit number of forks returned
Fix bugs in the ordering of blocks and forks
Add more tests
  • Loading branch information
JamesPiechota committed Jul 12, 2024
1 parent bae53c6 commit 91da9b7
Show file tree
Hide file tree
Showing 13 changed files with 288 additions and 84 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ jobs:
ar_header_sync_tests,
ar_http_iface_tests,
ar_http_util_tests,
ar_info_tests,
ar_mempool_tests,
ar_mine_randomx_tests,
ar_mine_vdf_tests,
Expand Down
3 changes: 3 additions & 0 deletions apps/arweave/include/ar_chain_stats.hrl
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
-ifndef(AR_CHAIN_STATS_HRL).
-define(AR_CHAIN_STATS_HRL, true).

-define(RECENT_FORKS_AGE, 60 * 60 * 24 * 30). %% last 30 days of forks
-define(RECENT_FORKS_LENGTH, 20). %% only return the last 20 fork

-record(fork, {
id,
height,
Expand Down
26 changes: 13 additions & 13 deletions apps/arweave/src/ar_block_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,16 @@ get(Tab, H) ->
B
end.

%% @doc Get the block and its status from cache.
%% Returns not_found if the block is not in cache.
get_block_and_status(Tab, H) ->
case ets:lookup(Tab, {block, H}) of
[] ->
not_found;
[{_, {B, Status, Timestamp, _Children}}] ->
{B, {Status, Timestamp}}
end.

%% @doc Get a {block, previous blocks, status} tuple for the earliest block from
%% the longest chain, which has not been validated yet. The previous blocks are
%% sorted from newest to oldest. The last one is a block from the current fork.
Expand Down Expand Up @@ -297,16 +307,6 @@ tx_id(#tx{ id = ID }) ->
tx_id(TXID) ->
TXID.

%% @doc Get the block and its status from cache.
%% Returns not_found if the block is not in cache.
get_block_and_status(Tab, H) ->
case ets:lookup(Tab, {block, H}) of
[] ->
not_found;
[{_, {B, Status, Timestamp, _Children}}] ->
{B, {Status, Timestamp}}
end.

%% @doc Mark the given block as the tip block. Mark the previous blocks as on-chain.
%% Mark the on-chain blocks from other forks as validated. Raises invalid_tip if
%% one of the preceeding blocks is not validated. Raises not_found if the block
Expand Down Expand Up @@ -458,6 +458,8 @@ get_siblings(Tab, B) ->

update_timestamp(Tab, H, ReceiveTimestamp) ->
case ets:lookup(Tab, {block, H}) of
[] ->
not_found;
[{_, {B, Status, Timestamp, Children}}] ->
case B#block.receive_timestamp of
undefined ->
Expand All @@ -472,9 +474,7 @@ update_timestamp(Tab, H, ReceiveTimestamp) ->
}, false);
_ ->
ok
end;
[] ->
not_found
end
end.

%%%===================================================================
Expand Down
59 changes: 43 additions & 16 deletions apps/arweave/src/ar_chain_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,23 @@ log_fork(Orphans, ForkRootB) ->
%% @doc Returns all forks that have been logged since the given start time
%% (system time in seconds)
get_forks(StartTime) ->
gen_server:call(?MODULE, {get_forks, StartTime}).
case catch gen_server:call(?MODULE, {get_forks, StartTime}) of
{'EXIT', {timeout, {gen_server, call, _}}} ->
{error, timeout};
Reply ->
Reply
end.

%%%===================================================================
%%% Generic server callbacks.
%%%===================================================================
init([]) ->
process_flag(trap_exit, true),
Columns = [
{"default", []},
{"forks", []}
],
Names = [
?MODULE,
forks
],
ok = ar_kv:open(filename:join(?ROCKS_DB_DIR, "ar_chain_stats_db"), Columns, [], Names),
ok = ar_kv:open(filename:join(?ROCKS_DB_DIR, "forks_db"), forks_db),
{ok, #{}}.

handle_call({get_forks, StartTime}, _From, State) ->
{ok, ForksMap} = ar_kv:get_range(forks, integer_to_binary(StartTime * 1000)),
{ok, ForksMap} = ar_kv:get_range(forks_db, <<(StartTime * 1000):64>>),
%% Sort forks by their key (the timestamp when they were detected)
SortedForks = lists:sort(maps:to_list(ForksMap)),
Forks = [binary_to_term(Fork) || {_Timestamp, Fork} <- SortedForks],
Expand All @@ -69,7 +66,7 @@ terminate(_Reason, _state) ->
%%%===================================================================
log_fork(Orphans, ForkRootB, ForkTime) ->
Fork = create_fork(Orphans, ForkRootB, ForkTime),
ar_kv:put(forks, integer_to_binary(ForkTime), term_to_binary(Fork)),
ar_kv:put(forks_db, <<ForkTime:64>>, term_to_binary(Fork)),
record_fork_depth(Orphans),
ok.

Expand Down Expand Up @@ -100,10 +97,33 @@ record_fork_depth([H | Orphans], N) ->
%%%===================================================================
forks_test_() ->
[
{timeout, 30, fun test_forks/0}
{timeout, 30, fun test_fork_time/0},
{timeout, 30, fun test_forks/0}
].

test_fork_time() ->
clear_forks_db(),
ForkRootB1 = #block{ height = 1 },

Orphans1 = [<<"a">>],
ExpectedFork1 = #fork{
id = crypto:hash(sha256, list_to_binary(Orphans1)),
height = 2,
block_ids = Orphans1
},
Orphans2 = [<<"b">>, <<"c">>],
ExpectedFork2 = #fork{
id = crypto:hash(sha256, list_to_binary(Orphans2)),
height = 2,
block_ids = Orphans2
},

gen_server:cast(?MODULE, {log_fork, Orphans1, ForkRootB1, 2}),
gen_server:cast(?MODULE, {log_fork, Orphans2, ForkRootB1, 11}),
assert_forks_equal([ExpectedFork1, ExpectedFork2], get_forks(0)).

test_forks() ->
clear_forks_db(),
StartTime = os:system_time(seconds),
ForkRootB1 = #block{ height = 1 },
ForkRootB2= #block{ height = 2 },
Expand Down Expand Up @@ -141,7 +161,7 @@ test_forks() ->
get_forks(StartTime)),

Orphans4 = [<<"e">>, <<"f">>, <<"g">>],
timer:sleep(5),
timer:sleep(1000),
log_fork(Orphans4, ForkRootB2),
ExpectedFork4 = #fork{
id = crypto:hash(sha256, list_to_binary(Orphans4)),
Expand All @@ -165,12 +185,19 @@ test_forks() ->
log_fork([], ForkRootB2),
assert_forks_equal(
[ExpectedFork1, ExpectedFork2, ExpectedFork3, ExpectedFork4, ExpectedFork3],
get_forks(StartTime)).

get_forks(StartTime)),

%% Check that the cutoff time is handled correctly
timer:sleep(5),
assert_forks_equal(
[ExpectedFork4, ExpectedFork3],
get_forks(StartTime+1)).

assert_forks_equal(ExpectedForks, ActualForks) ->
ExpectedForksStripped = [ Fork#fork{timestamp = undefined} || Fork <- ExpectedForks],
ActualForksStripped = [ Fork#fork{timestamp = undefined} || Fork <- ActualForks],
?assertEqual(ExpectedForksStripped, ActualForksStripped).

clear_forks_db() ->
Time = os:system_time(millisecond),
ar_kv:delete_range(forks_db, integer_to_binary(0), integer_to_binary(Time)).
2 changes: 1 addition & 1 deletion apps/arweave/src/ar_events_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ init([]) ->
%% Events: new, ready_for_mining, orphaned, emitting_scheduled,
%% preparing_unblacklisting, ready_for_unblacklisting, registered_offset.
?CHILD(ar_events, tx, worker),
%% Events: discovered, rejected, new, double_signing.
%% Events: discovered, rejected, new, double_signing, mined_block_received.
?CHILD(ar_events, block, worker),
%% Events: unpacked, packed.
?CHILD(ar_events, chunk, worker),
Expand Down
2 changes: 1 addition & 1 deletion apps/arweave/src/ar_http_iface_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ get_info(Peer, Type) ->
case get_info(Peer) of
info_unavailable -> info_unavailable;
Info ->
maps:get(atom_to_binary(Type), Info)
maps:get(atom_to_binary(Type), Info, info_unavailable)
end.
get_info(Peer) ->
case
Expand Down
28 changes: 24 additions & 4 deletions apps/arweave/src/ar_http_iface_middleware.erl
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,13 @@ handle(<<"GET">>, [<<"info">>], Req, _Pid) ->
{200, #{}, ar_serialize:jsonify(ar_info:get_info()), Req};

handle(<<"GET">>, [<<"recent">>], Req, _Pid) ->
{200, #{}, ar_serialize:jsonify(ar_info:get_recent()), Req};

case ar_node:is_joined() of
false ->
not_joined(Req);
true ->
{200, #{}, ar_serialize:jsonify(ar_info:get_recent()), Req}
end;

handle(<<"GET">>, [<<"is_tx_blacklisted">>, EncodedTXID], Req, _Pid) ->
case ar_util:safe_decode(EncodedTXID) of
{error, invalid} ->
Expand Down Expand Up @@ -2321,7 +2326,7 @@ handle_block_announcement(#block_announcement{ indep_hash = H, previous_block =
solution_hash = SolutionH }, Req) ->
case ar_ignore_registry:member(H) of
true ->
ar_block_cache:update_timestamp(block_cache, H, erlang:timestamp()),
check_block_receive_timestamp(H),
{208, #{}, <<>>, Req};
false ->
case ar_node:get_block_shadow_from_cache(PrevH) of
Expand Down Expand Up @@ -2438,7 +2443,7 @@ post_block(check_block_hash_header, Peer, {Req, Pid, Encoding}, ReceiveTimestamp
{ok, BH} when byte_size(BH) =< 48 ->
case ar_ignore_registry:member(BH) of
true ->
ar_block_cache:update_timestamp(block_cache, BH, ReceiveTimestamp),
check_block_receive_timestamp(BH),
{208, #{}, <<"Block already processed.">>, Req};
false ->
post_block(read_body, Peer, {Req, Pid, Encoding},
Expand Down Expand Up @@ -2508,6 +2513,21 @@ post_block(enqueue_block, {B, Peer}, Req, ReceiveTimestamp) ->
end,
{200, #{}, <<"OK">>, Req}.

check_block_receive_timestamp(H) ->
case ar_block_cache:get(block_cache, H) of
not_found ->
not_found;
B ->
case B#block.receive_timestamp of
undefined ->
%% This node mined block H and this is the first time it's been
%% gossipped back to it. Update the node's receive_timestamp.
ar_events:send(block, {mined_block_received, H, erlang:timestamp()});
_ ->
ok
end
end.

handle_post_partial_solution(Req, Pid) ->
{ok, Config} = application:get_env(arweave, config),
CMExitNode = ar_coordination:is_exit_peer() andalso ar_pool:is_client(),
Expand Down
79 changes: 48 additions & 31 deletions apps/arweave/src/ar_info.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ get_recent() ->
#{
%% #{
%% "id": <indep_hash>,
%% "received": <received_timestamp>"
%% "received": <received_timestamp>",
%% "height": <height>
%% }
<<"blocks">> => get_recent_blocks(ar_node:get_height()),
<<"blocks">> => get_recent_blocks(),
%% #{
%% "id": <hash_of_block_ids>,
%% "height": <height_of_first_orphaned_block>,
Expand All @@ -55,40 +56,56 @@ get_recent() ->
<<"forks">> => get_recent_forks()
}.

get_recent_blocks(CurrentHeight) ->
lists:foldl(
fun({H, _WeaveSize, _TXRoot}, Acc) ->
Acc ++ [#{
%% @doc Return the the most recent blocks in reverse chronological order.
%%
%% There are a few list reversals that happen here:
%% 1. get_block_anchors returns the blocks in reverse chronological order (latest block first)
%% 2. [Element | Acc] reverses the list into chronological order (latest block last)
%% 3. The final lists:reverse puts the list back into reverse chronological order
%% (latest block first)
get_recent_blocks() ->
Anchors = lists:sublist(ar_node:get_block_anchors(), ?CHECKPOINT_DEPTH),
Blocks = lists:foldl(
fun(H, Acc) ->
B = ar_block_cache:get(block_cache, H),
[#{
<<"id">> => ar_util:encode(H),
<<"received">> => get_block_timestamp(H, length(Acc))
}]
<<"received">> => get_block_timestamp(B, length(Acc)),
<<"height">> => B#block.height
} | Acc]
end,
[],
lists:sublist(ar_block_index:get_list(CurrentHeight), ?CHECKPOINT_DEPTH)
).
Anchors
),
lists:reverse(Blocks).

%% @doc Return the the most recent forks in reverse chronological order.
get_recent_forks() ->
lists:foldl(
fun(Fork, Acc) ->
#fork{
id = ID, height = Height, timestamp = Timestamp, block_ids = BlockIDs} = Fork,
Acc ++ [#{
<<"id">> => ar_util:encode(ID),
<<"height">> => Height,
<<"timestamp">> => Timestamp div 1000,
<<"blocks">> => [ ar_util:encode(BlockID) || BlockID <- BlockIDs ]
}]
end,
[],
ar_chain_stats:get_forks(0)
).
CutOffTime = os:system_time(seconds) - ?RECENT_FORKS_AGE,
case ar_chain_stats:get_forks(CutOffTime) of
{error, _} -> error;
Forks ->
lists:foldl(
fun(Fork, Acc) ->
#fork{
id = ID, height = Height, timestamp = Timestamp,
block_ids = BlockIDs} = Fork,
[#{
<<"id">> => ar_util:encode(ID),
<<"height">> => Height,
<<"timestamp">> => Timestamp div 1000,
<<"blocks">> => [ ar_util:encode(BlockID) || BlockID <- BlockIDs ]
} | Acc]
end,
[],
lists:sublist(Forks, ?RECENT_FORKS_LENGTH)
)
end.

get_block_timestamp(H, Depth) when Depth < ?RECENT_BLOCKS_WITHOUT_TIMESTAMP ->
get_block_timestamp(B, Depth)
when Depth < ?RECENT_BLOCKS_WITHOUT_TIMESTAMP orelse
B#block.receive_timestamp =:= undefined ->
<<"pending">>;
get_block_timestamp(H, _Depth) ->
B = ar_block_cache:get(block_cache, H),
case B#block.receive_timestamp of
undefined -> <<"pending">>;
Timestamp -> ar_util:timestamp_to_seconds(Timestamp)
end.
get_block_timestamp(B, _Depth) ->
ar_util:timestamp_to_seconds(B#block.receive_timestamp).

8 changes: 6 additions & 2 deletions apps/arweave/src/ar_node.erl
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,12 @@ get_current_usd_to_ar_rate() ->
%% the hashes of the recent blocks that can be used in transactions as anchors.
%% @end
get_block_anchors() ->
[{block_anchors, BlockAnchors}] = ets:lookup(node_state, block_anchors),
BlockAnchors.
case ets:lookup(node_state, block_anchors) of
[{block_anchors, BlockAnchors}] ->
BlockAnchors;
[] ->
not_joined
end.

%% @doc Return a map TXID -> ok containing all the recent transaction identifiers.
%% Used for preventing replay attacks.
Expand Down
4 changes: 4 additions & 0 deletions apps/arweave/src/ar_node_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,10 @@ handle_info({event, block, {new, B, _Source}}, State) ->
{noreply, State}
end;

handle_info({event, block, {mined_block_received, H, ReceiveTimestamp}}, State) ->
ar_block_cache:update_timestamp(block_cache, H, ReceiveTimestamp),
{noreply, State};

handle_info({event, block, _}, State) ->
{noreply, State};

Expand Down
2 changes: 2 additions & 0 deletions apps/arweave/src/ar_peers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,8 @@ discover_peers([Peer | Peers]) ->
case ar_http_iface_client:get_info(Peer, release) of
Release when is_integer(Release) ->
maybe_add_peer(Peer, Release);
info_unavailable ->
maybe_add_peer(Peer, 0);
_ ->
ok
end;
Expand Down
2 changes: 2 additions & 0 deletions apps/arweave/test/ar_http_iface_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ get_tx(ID) ->

%% @doc Ensure that server info can be retreived via the HTTP interface.
test_get_info(_) ->
?assertEqual(info_unavailable,
ar_http_iface_client:get_info(ar_test_node:peer_ip(main), bad_key)),
?assertEqual(<<?NETWORK_NAME>>,
ar_http_iface_client:get_info(ar_test_node:peer_ip(main), network)),
?assertEqual(?RELEASE_NUMBER,
Expand Down
Loading

0 comments on commit 91da9b7

Please sign in to comment.