Skip to content

Commit

Permalink
Introduce P3 and RocksDB fixes (#624)
Browse files Browse the repository at this point in the history
Introduce P3 and RocksDB fixes:
- P3 configuration was fixed;
- Repair code and configuration options were removed;
- Reconnect code was removed;
- All RocksDB databases are closed gracefully on node shutdown: termination sequence includes database flush and WAL sync;
- Configurable periodic database flush was implemented (default interval: 30m);
- Configurable periodic WAL sync was implemented (default interval: 1m).
  • Loading branch information
shizzard authored Oct 4, 2024
1 parent a283bcc commit f59047b
Show file tree
Hide file tree
Showing 14 changed files with 598 additions and 588 deletions.
10 changes: 8 additions & 2 deletions apps/arweave/include/ar_config.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@
%% The default number of chunks fetched from disk at a time during in-place repacking.
-define(DEFAULT_REPACK_BATCH_SIZE, 100).

%% The default rocksdb databases flush interval, 30 minutes.
-define(DEFAULT_ROCKSDB_FLUSH_INTERVAL_S, 1800).
%% The default rocksdb WAL sync interval, 1 minute.
-define(DEFAULT_ROCKSDB_WAL_SYNC_INTERVAL_S, 60).

%% @doc Startup options with default values.
-record(config, {
init = false,
Expand Down Expand Up @@ -178,7 +183,6 @@
nonce_limiter_server_trusted_peers = [],
nonce_limiter_client_peers = [],
debug = false,
repair_rocksdb = [],
run_defragmentation = false,
defragmentation_trigger_threshold = 1_500_000_000,
defragmentation_modules = [],
Expand All @@ -199,7 +203,9 @@
pool_api_key = not_set,
pool_worker_name = not_set,
%% Undocumented/unsupported options
chunk_storage_file_size = ?CHUNK_GROUP_SIZE
chunk_storage_file_size = ?CHUNK_GROUP_SIZE,
rocksdb_flush_interval_s = ?DEFAULT_ROCKSDB_FLUSH_INTERVAL_S,
rocksdb_wal_sync_interval_s = ?DEFAULT_ROCKSDB_WAL_SYNC_INTERVAL_S
}).

-endif.
11 changes: 7 additions & 4 deletions apps/arweave/src/ar.erl
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ show_help() ->
"to this peer. You can specify several vdf_client_peer options."},
{"debug",
"Enable extended logging."},
{"repair_rocksdb (file)", "Attempt to repair the given RocksDB database."},
{"run_defragmentation",
"Run defragmentation of chunk storage files."},
{"defragmentation_trigger_threshold",
Expand Down Expand Up @@ -335,7 +334,9 @@ show_help() ->
{"pool_server_address", "The pool address"},
{"pool_worker_name", "(optional) The pool worker name. "
"Useful if you have multiple machines (or replicas) "
"and you want to monitor them separately on pool"}
"and you want to monitor them separately on pool"},
{"rocksdb_flush_interval", "RocksDB flush interval in seconds"},
{"rocksdb_wal_sync_interval", "RocksDB WAL sync interval in seconds"}
]
),
erlang:halt().
Expand Down Expand Up @@ -556,8 +557,6 @@ parse_cli_args(["vdf_client_peer", RawPeer | Rest],
parse_cli_args(Rest, C#config{ nonce_limiter_client_peers = [RawPeer | Peers] });
parse_cli_args(["debug" | Rest], C) ->
parse_cli_args(Rest, C#config{ debug = true });
parse_cli_args(["repair_rocksdb", Path | Rest], #config{ repair_rocksdb = L } = C) ->
parse_cli_args(Rest, C#config{ repair_rocksdb = [filename:absname(Path) | L] });
parse_cli_args(["run_defragmentation" | Rest], C) ->
parse_cli_args(Rest, C#config{ run_defragmentation = true });
parse_cli_args(["defragmentation_trigger_threshold", Num | Rest], C) ->
Expand Down Expand Up @@ -628,6 +627,10 @@ parse_cli_args(["pool_server_address", Host | Rest], C) ->
parse_cli_args(Rest, C#config{ pool_server_address = list_to_binary(Host) });
parse_cli_args(["pool_worker_name", Host | Rest], C) ->
parse_cli_args(Rest, C#config{ pool_worker_name = list_to_binary(Host) });
parse_cli_args(["rocksdb_flush_interval", Seconds | Rest], C) ->
parse_cli_args(Rest, C#config{ rocksdb_flush_interval_s = list_to_integer(Seconds) });
parse_cli_args(["rocksdb_wal_sync_interval", Seconds | Rest], C) ->
parse_cli_args(Rest, C#config{ rocksdb_wal_sync_interval_s = list_to_integer(Seconds) });

%% Undocumented/unsupported options
parse_cli_args(["chunk_storage_file_size", Num | Rest], C) ->
Expand Down
16 changes: 12 additions & 4 deletions apps/arweave/src/ar_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -514,10 +514,6 @@ parse_options([{<<"vdf_client_peers">>, Peers} | _], _) ->
parse_options([{<<"debug">>, B} | Rest], Config) when is_boolean(B) ->
parse_options(Rest, Config#config{ debug = B });

parse_options([{<<"repair_rocksdb">>, L} | Rest], Config) ->
parse_options(Rest, Config#config{
repair_rocksdb = [filename:absname(binary_to_list(El)) || El <- L] });

parse_options([{<<"run_defragmentation">>, B} | Rest], Config) when is_boolean(B) ->
parse_options(Rest, Config#config{ run_defragmentation = B });

Expand Down Expand Up @@ -638,6 +634,18 @@ parse_options([{<<"chunk_storage_file_size">>, ChunkGroupSize} | Rest], Config)
parse_options([{<<"chunk_storage_file_size">>, ChunkGroupSize} | _], _) ->
{error, {bad_type, chunk_storage_file_size, number}, ChunkGroupSize};

parse_options([{<<"rocksdb_flush_interval">>, IntervalS} | Rest], Config)
when is_integer(IntervalS) ->
parse_options(Rest, Config#config{ rocksdb_flush_interval_s = IntervalS });
parse_options([{<<"rocksdb_flush_interval">>, IntervalS} | _], _) ->
{error, {bad_type, rocksdb_flush_interval, number}, IntervalS};

parse_options([{<<"rocksdb_wal_sync_interval">>, IntervalS} | Rest], Config)
when is_integer(IntervalS) ->
parse_options(Rest, Config#config{ rocksdb_wal_sync_interval_s = IntervalS });
parse_options([{<<"rocksdb_wal_sync_interval">>, IntervalS} | _], _) ->
{error, {bad_type, rocksdb_wal_sync_interval, number}, IntervalS};

parse_options([Opt | _], _) ->
{error, unknown, Opt};
parse_options([], Config) ->
Expand Down
2 changes: 1 addition & 1 deletion apps/arweave/src/ar_data_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3388,7 +3388,7 @@ find_storage_module_for_disk_pool_chunk(Offset) ->
sort_storage_modules_for_disk_pool_chunk(Modules) ->
{ok, Config} = application:get_env(arweave, config),
MiningAddress = Config#config.mining_addr,
CompareFun =
CompareFun =
fun({_, _, {spora_2_6, Addr1}}, {_, _, {spora_2_6, _}}) ->
%% Storage modules for our current mining address have the highest priority
Addr1 == MiningAddress;
Expand Down
8 changes: 4 additions & 4 deletions apps/arweave/src/ar_doctor_dump.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ help() ->
ar:console("data-doctor dump <min_height> <max_height> <data_dir> <output_dir>~n"),
ar:console(" min_height: The minimum height of the blocks to dump.~n"),
ar:console(" max_height: The maximum height of the blocks to dump.~n"),
ar:console(" data_dir: Full path to your data_dir.~n"),
ar:console(" output_dir: Full path to a directory where the dumped data will be written.~n"),
ar:console("~nExample:~n"),
ar:console(" data_dir: Full path to your data_dir.~n"),
ar:console(" output_dir: Full path to a directory where the dumped data will be written.~n"),
ar:console("~nExample:~n"),
ar:console("data-doctor dump /mnt/arweave-data /mnt/output~n").

dump(Args) when length(Args) < 4 ->
Expand Down Expand Up @@ -77,7 +77,7 @@ dump_blocks(Cursor, MinHeight, MaxHeight, OutputDir) ->
io:format("No more entries.~n")
end.

dump_txs([], OutputDir) ->
dump_txs([], _OutputDir) ->
ok;
dump_txs([TXID | TXIDs], OutputDir) ->
case ar_kv:get(tx_db, TXID) of
Expand Down
Loading

0 comments on commit f59047b

Please sign in to comment.