Skip to content

Commit

Permalink
Remove most instances of process_flag(trap_exit, true)
Browse files Browse the repository at this point in the history
Only keep the ones in processes that write something to disk.
I believe the use of trap_exit is part of the reason for the slow
node shutdown. I think we only need in situatinos where there's
an operation in proecss that needs to be terminated gracefully
(e.g. closing an open file)
  • Loading branch information
JamesPiechota committed Jul 14, 2024
1 parent 91da9b7 commit 2e61c63
Show file tree
Hide file tree
Showing 39 changed files with 31 additions and 37 deletions.
1 change: 0 additions & 1 deletion apps/arweave/src/ar_bench_packing.erl
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,6 @@ dirty_test({TotalMegaBytes, _, _, _} = Permutation, WorkerFun, Args, NumWorkers)
io:format("~n").

dirty_worker(WorkerID, Permutation, WorkerFun, Args, Offset, Size) ->
process_flag(trap_exit, true),
ar_bench_timer:record({total, WorkerID}, WorkerFun, [
WorkerID,
Permutation,
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_block_pre_validator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ pre_validate(B, Peer, ReceiveTimestamp) ->
%%%===================================================================

init([]) ->
process_flag(trap_exit, true),
gen_server:cast(?MODULE, pre_validate),
ok = ar_events:subscribe(block),
{ok, Config} = application:get_env(arweave, config),
Expand Down
3 changes: 2 additions & 1 deletion apps/arweave/src/ar_block_propagation_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ start_link(Name) ->
%%%===================================================================

init([]) ->
process_flag(trap_exit, true),
{ok, #state{}}.

handle_call(Request, _From, State) ->
Expand All @@ -41,6 +40,7 @@ handle_cast({send_block, SendFun, RetryCount, From}, State) ->
end;

handle_cast({send_block2, Peer, SendAnnouncementFun, SendFun, RetryCount, From}, State) ->
?LOG_DEBUG([{event, send_block2}, {peer, ar_util:format_peer(Peer)}]),
case SendAnnouncementFun() of
{ok, {{<<"412">>, _}, _, _, _, _}} when RetryCount > 0 ->
ar_util:cast_after(2000, self(),
Expand Down Expand Up @@ -93,6 +93,7 @@ handle_info(Info, State) ->
{noreply, State}.

terminate(_Reason, _State) ->
?LOG_WARNING([{event, terminate}, {module, ?MODULE}]),
ok.

%%%===================================================================
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_bridge.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ start_link(Name, Workers) ->
%% @end
%%--------------------------------------------------------------------
init(Workers) ->
process_flag(trap_exit, true),
ar_events:subscribe(block),
WorkerMap = lists:foldl(fun(W, Acc) -> maps:put(W, free, Acc) end, #{}, Workers),
State = #state{ workers = WorkerMap },
Expand Down
1 change: 1 addition & 0 deletions apps/arweave/src/ar_chain_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ get_forks(StartTime) ->
%%% Generic server callbacks.
%%%===================================================================
init([]) ->
%% Trap exit to avoid corrupting any open files on quit..
process_flag(trap_exit, true),
ok = ar_kv:open(filename:join(?ROCKS_DB_DIR, "forks_db"), forks_db),
{ok, #{}}.
Expand Down
1 change: 1 addition & 0 deletions apps/arweave/src/ar_chunk_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ run_defragmentation() ->
%%%===================================================================

init({StoreID, RepackInPlacePacking}) ->
%% Trap exit to avoid corrupting any open files on quit..
process_flag(trap_exit, true),
{ok, Config} = application:get_env(arweave, config),
DataDir = Config#config.data_dir,
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_coordination.erl
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ get_cluster_partitions_list() ->
%%%===================================================================

init([]) ->
process_flag(trap_exit, true),
{ok, Config} = application:get_env(arweave, config),

ar_util:cast_after(?BATCH_POLL_INTERVAL_MS, ?MODULE, check_batches),
Expand Down
5 changes: 0 additions & 5 deletions apps/arweave/src/ar_data_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ get_bucket_peers(Bucket, Cursor, Peers) ->
%%%===================================================================

init([]) ->
process_flag(trap_exit, true),
{ok, _} = timer:apply_interval(
?DATA_DISCOVERY_COLLECT_PEERS_FREQUENCY_MS, ?MODULE, collect_peers, []),
gen_server:cast(?MODULE, update_network_data_map),
Expand Down Expand Up @@ -100,7 +99,6 @@ handle_cast(update_network_data_map, #state{ peers_pending = N } = State)
{{value, Peer}, Queue} ->
monitor(process, spawn_link(
fun() ->
process_flag(trap_exit, true),
case ar_http_iface_client:get_sync_buckets(Peer) of
{ok, SyncBuckets} ->
gen_server:cast(?MODULE, {add_peer_sync_buckets, Peer,
Expand Down Expand Up @@ -157,9 +155,6 @@ handle_cast(Cast, State) ->
?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]),
{noreply, State}.

handle_info({'EXIT', _, normal}, State) ->
{noreply, State};

handle_info({'DOWN', _, process, _, _}, #state{ peers_pending = N } = State) ->
{noreply, State#state{ peers_pending = N - 1 }};

Expand Down
2 changes: 2 additions & 0 deletions apps/arweave/src/ar_data_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ debug_get_disk_pool_chunks(Cursor) ->

init({"default" = StoreID, _}) ->
?LOG_INFO([{event, ar_data_sync_start}, {store_id, StoreID}]),
%% Trap exit to avoid corrupting any open files on quit..
process_flag(trap_exit, true),
{ok, Config} = application:get_env(arweave, config),
[ok, ok] = ar_events:subscribe([node_state, disksup]),
Expand Down Expand Up @@ -677,6 +678,7 @@ init({"default" = StoreID, _}) ->
{ok, State2};
init({StoreID, RepackInPlacePacking}) ->
?LOG_INFO([{event, ar_data_sync_start}, {store_id, StoreID}]),
%% Trap exit to avoid corrupting any open files on quit..
process_flag(trap_exit, true),
[ok, ok] = ar_events:subscribe([node_state, disksup]),
State = init_kv(StoreID),
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_data_sync_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ start_link(Name) ->
%%%===================================================================

init(Name) ->
process_flag(trap_exit, true),
{ok, #state{ name = Name }}.

handle_call(Request, _From, State) ->
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_data_sync_worker_master.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ read_range(Start, End, OriginStoreID, TargetStoreID, SkipSmall) ->
%%%===================================================================

init(Workers) ->
process_flag(trap_exit, true),
gen_server:cast(?MODULE, process_main_queue),
ar_util:cast_after(?REBALANCE_FREQUENCY_MS, ?MODULE, rebalance_peers),

Expand Down
1 change: 1 addition & 0 deletions apps/arweave/src/ar_disk_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ start_link() ->
%% @end
%%--------------------------------------------------------------------
init([]) ->
%% Trap exit to avoid corrupting any open files on quit.
process_flag(trap_exit, true),
{ok, Config} = application:get_env(arweave, config),
Path = filename:join(Config#config.data_dir, ?DISK_CACHE_DIR),
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_global_sync_record.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ get_serialized_sync_buckets() ->
%%%===================================================================

init([]) ->
process_flag(trap_exit, true),
ok = ar_events:subscribe(sync_record),
{ok, Config} = application:get_env(arweave, config),
SyncRecord =
Expand Down
2 changes: 2 additions & 0 deletions apps/arweave/src/ar_header_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ remove_block(Height) ->

init([]) ->
?LOG_INFO([{event, ar_header_sync_start}]),
%% Trap exit to avoid corrupting any open files on quit..
process_flag(trap_exit, true),
[ok, ok] = ar_events:subscribe([tx, disksup]),
{ok, Config} = application:get_env(arweave, config),
Expand Down Expand Up @@ -440,6 +441,7 @@ process_item(Queue) ->
true ->
monitor(process, spawn(
fun() ->
%% Trap exit to avoid corrupting any open files on quit..
process_flag(trap_exit, true),
case download_block(H, H2, TXRoot) of
{error, _Reason} ->
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ req(Args, ReestablishedConnection) ->
%%% ==================================================================

init([]) ->
process_flag(trap_exit, true),
{ok, #state{}}.

handle_call({get_connection, Args}, From,
Expand Down
9 changes: 7 additions & 2 deletions apps/arweave/src/ar_http_iface_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ get_sync_record(Peer) ->

get_sync_record(Peer, Start, Limit) ->
Headers = [{<<"Content-Type">>, <<"application/etf">>}],
handle_sync_record_response(ar_http:req(#{
StartTime = erlang:monotonic_time(),
Response = handle_sync_record_response(ar_http:req(#{
peer => Peer,
method => get,
path => "/data_sync_record/" ++ integer_to_list(Start) ++ "/"
Expand All @@ -303,7 +304,11 @@ get_sync_record(Peer, Start, Limit) ->
connect_timeout => 5000,
limit => ?MAX_ETF_SYNC_RECORD_SIZE,
headers => Headers
}), Start, Limit).
}), Start, Limit),
ElapsedTime = erlang:convert_time_unit(erlang:monotonic_time() - StartTime, native, millisecond),
?LOG_DEBUG([{event, get_sync_record}, {peer, ar_util:format_peer(Peer)},
{elapsed, ElapsedTime}]),
Response.

get_chunk_binary(Peer, Offset, RequestedPacking) ->
PackingBinary =
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_mining_hash.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ garbage_collect() ->
%%%===================================================================

init([]) ->
process_flag(trap_exit, true),
{ok, Config} = application:get_env(arweave, config),
State = lists:foldl(
fun(_, Acc) -> start_hashing_thread(Acc) end,
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_mining_io.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ garbage_collect() ->
%%%===================================================================

init([]) ->
process_flag(trap_exit, true),
State =
lists:foldl(
fun ({PartitionNumber, MiningAddress, StoreID}, Acc) ->
Expand Down
1 change: 1 addition & 0 deletions apps/arweave/src/ar_mining_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ is_one_chunk_solution(Solution) ->
%%%===================================================================

init([]) ->
%% Trap exit to avoid corrupting any open files on quit.
process_flag(trap_exit, true),
ok = ar_events:subscribe(nonce_limiter),
ar_chunk_storage:open_files("default"),
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_mining_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ mining_paused() ->
%%%===================================================================

init([]) ->
process_flag(trap_exit, true),
{ok, #state{}}.

handle_call(Request, _From, State) ->
Expand Down
4 changes: 2 additions & 2 deletions apps/arweave/src/ar_node_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ set_reward_addr(Addr) ->
%%%===================================================================

init([]) ->
%% Trap exit to avoid corrupting any open files on quit.
process_flag(trap_exit, true),
[ok, ok, ok, ok, ok] = ar_events:subscribe([tx, block, nonce_limiter, miner, node_state]),
%% Initialize RandomX.
Expand Down Expand Up @@ -901,8 +902,7 @@ apply_block2(BShadow, PrevBlocks, Timestamp, State) ->
Self = self(),
monitor(
process,
PID = spawn(fun() -> process_flag(trap_exit, true),
get_missing_txs_and_retry(BShadow, Self) end)
PID = spawn(fun() -> get_missing_txs_and_retry(BShadow, Self) end)
),
BH = BShadow#block.indep_hash,
{noreply, State#{
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_nonce_limiter_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ maybe_request_sessions(SessionKey) ->
%%%===================================================================

init([]) ->
process_flag(trap_exit, true),
case ar_config:use_remote_vdf_server() andalso ar_config:pull_from_remote_vdf_server() of
false ->
ok;
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_nonce_limiter_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ get_full_prev_update(Format) ->
%%%===================================================================

init([]) ->
process_flag(trap_exit, true),
ok = ar_events:subscribe(nonce_limiter),
{ok, #state{}}.

Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_nonce_limiter_server_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ start_link(Name, RawPeer) ->
%%%===================================================================

init(RawPeer) ->
process_flag(trap_exit, true),
ok = ar_events:subscribe(nonce_limiter),
case ar_config:is_public_vdf_server() of
false ->
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_p3.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ get_rates_json() ->
%%% Generic server callbacks.
%%%===================================================================
init([]) ->
process_flag(trap_exit, true),
ok = ar_events:subscribe(node_state),
{ok, Config} = application:get_env(arweave, config),
ar_p3_config:validate_config(Config).
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_packing_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ start_link() ->
%%%===================================================================

init([]) ->
process_flag(trap_exit, true),
{ok, Config} = application:get_env(arweave, config),
Schedulers = erlang:system_info(dirty_cpu_schedulers_online),
ar:console("~nInitialising RandomX dataset for fast packing. Key: ~p. "
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_peer_intervals.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ fetch(Start, End, StoreID, _AllPeersIntervals) when Start >= End ->
gen_server:cast(ar_data_sync:name(StoreID), {update_all_peers_intervals, #{}});
fetch(Start, End, StoreID, AllPeersIntervals) ->
spawn_link(fun() ->
process_flag(trap_exit, true),
try
End2 = min(ar_util:ceil_int(Start, ?NETWORK_DATA_BUCKET_SIZE), End),
UnsyncedIntervals = get_unsynced_intervals(Start, End2, StoreID),
Expand Down
1 change: 1 addition & 0 deletions apps/arweave/src/ar_peers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ resolve_and_cache_peer(RawPeer, Type) ->
%%%===================================================================

init([]) ->
%% Trap exit to avoid corrupting any open files on quit.
process_flag(trap_exit, true),
ok = ar_events:subscribe(block),
load_peers(),
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_poller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ resume() ->
%%%===================================================================

init(Workers) ->
process_flag(trap_exit, true),
ok = ar_events:subscribe(node_state),
case ar_node:is_joined() of
true ->
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_poller_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ start_link(Name) ->
%%%===================================================================

init([]) ->
process_flag(trap_exit, true),
{ok, Config} = application:get_env(arweave, config),
[ok] = ar_events:subscribe([node_state]),
State = #state{ polling_frequency_ms = Config#config.polling * 1000 },
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ process_cm_jobs(Jobs, Peer) ->
%%%===================================================================

init([]) ->
process_flag(trap_exit, true),
ok = ar_events:subscribe(solution),
{ok, #state{}}.

Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_pool_cm_job_poller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ start_link() ->
%%%===================================================================

init([]) ->
process_flag(trap_exit, true),
case {ar_pool:is_client(), ar_coordination:is_exit_peer()} of
{true, true} ->
gen_server:cast(self(), fetch_cm_jobs);
Expand Down
1 change: 0 additions & 1 deletion apps/arweave/src/ar_pool_job_poller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ start_link() ->
%%%===================================================================

init([]) ->
process_flag(trap_exit, true),
case ar_pool:is_client() of
true ->
gen_server:cast(self(), fetch_jobs);
Expand Down
1 change: 1 addition & 0 deletions apps/arweave/src/ar_sync_record.erl
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ get_intersection_size(End, Start, ID, StoreID) ->
%%%===================================================================

init(StoreID) ->
%% Trap exit to avoid corrupting any open files on quit.
process_flag(trap_exit, true),
{Dir, StorageModuleSize, StorageModuleIndex, PartitionNumber} =
case StoreID of
Expand Down
1 change: 1 addition & 0 deletions apps/arweave/src/ar_tx_blacklist.erl
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ notify_about_added_tx(TXID, End, Start) ->

init([]) ->
ok = initialize_state(),
%% Trap exit to avoid corrupting any open files on quit.
process_flag(trap_exit, true),
ok = ar_events:subscribe(tx),
gen_server:cast(?MODULE, refresh_blacklist),
Expand Down
3 changes: 2 additions & 1 deletion apps/arweave/src/ar_tx_emitter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ start_link(Name, Workers) ->
%%%===================================================================

init(Workers) ->
process_flag(trap_exit, true),
gen_server:cast(?MODULE, process_chunk),
{ok, #state{ workers = queue:from_list(Workers), currently_emitting = sets:new() }}.

Expand Down Expand Up @@ -92,6 +91,8 @@ handle_info({timeout, TXID, Peer}, State) ->
%% Should have been emitted.
{noreply, State};
true ->
?LOG_DEBUG([{event, tx_propagation_timeout}, {txid, ar_util:encode(TXID)},
{peer, ar_util:format_peer(Peer)}]),
Emitting2 = sets:del_element({TXID, Peer}, Emitting),
case sets:is_empty(Emitting2) of
true ->
Expand Down
7 changes: 5 additions & 2 deletions apps/arweave/src/ar_tx_emitter_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ start_link(Name) ->
%%%===================================================================

init([]) ->
process_flag(trap_exit, true),
{ok, #state{}}.

handle_call(Request, _From, State) ->
Expand Down Expand Up @@ -55,6 +54,9 @@ handle_cast({emit, TXID, Peer, ReplyTo}, State) ->
end,
Reply = SendFun(),
PropagationTimeUs = timer:now_diff(erlang:timestamp(), StartedAt),
?LOG_DEBUG([{event, tx_propagated}, {txid, ar_util:encode(TXID)},
{peer, ar_util:format_peer(Peer)}, {reply, Reply},
{elapsed, PropagationTimeUs / 1000}]),
record_propagation_status(Reply),
record_propagation_rate(tx_propagated_size(TX), PropagationTimeUs)
end,
Expand Down Expand Up @@ -87,7 +89,8 @@ handle_info(Info, State) ->
?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, {info, Info}]),
{noreply, State}.

terminate(_Reason, _State) ->
terminate(Reason, _State) ->
?LOG_INFO([{event, terminate}, {module, ?MODULE}, {reason, Reason}]),
ok.

%%%===================================================================
Expand Down
Loading

0 comments on commit 2e61c63

Please sign in to comment.