Skip to content

Commit

Permalink
Implement periodic db flush and WAL sync (configless)
Browse files Browse the repository at this point in the history
  • Loading branch information
shizzard committed Oct 3, 2024
1 parent 8e5a34b commit 9c2d908
Showing 1 changed file with 120 additions and 36 deletions.
156 changes: 120 additions & 36 deletions apps/arweave/src/ar_kv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,20 @@
cf_handle = undefined :: rocksdb:cf_handle()
}).

-record(state, {}).
-define(msg_trigger_timer(Kind, Secret), {msg_trigger_timer, Kind, Secret}).
-define(msg_trigger_db_flush(Secret), ?msg_trigger_timer(db_flush, Secret)).
-define(msg_trigger_wal_sync(Secret), ?msg_trigger_timer(wal_sync, Secret)).

-record(timer, {
interval_ms :: pos_integer(),
ref :: erlang:reference() | undefined,
secret :: erlang:reference() | undefined
}).

-record(state, {
db_flush_timer :: #timer{},
wal_sync_timer :: #timer{}
}).



Expand Down Expand Up @@ -254,7 +267,13 @@ count(Name) ->

init([]) ->
process_flag(trap_exit, true),
{ok, #state{}}.
S0 = #state{
db_flush_timer = #timer{interval_ms = 1800000},
wal_sync_timer = #timer{interval_ms = 60000}
},
S1 = init_db_flush_timer(S0),
S2 = init_wal_sync_timer(S1),
{ok, S2}.



Expand All @@ -277,7 +296,6 @@ handle_call({open, {DataDirRelativePath, CfDescriptors, UserOptions, CfNames}},
DbRec0 = new_dbrec(CfNames, CfDescriptors, DataDirRelativePath, UserOptions),
case ets:lookup(?MODULE, hd(CfNames)) of
[] ->
?LOG_INFO([{event, skipping_repair_for_cf_database}]),
case open(DbRec0) of
ok -> {reply, ok, State};
{error, Reason} -> {reply, {error, Reason}, State}
Expand All @@ -303,28 +321,44 @@ handle_cast(Cast, State) ->



handle_info(
?msg_trigger_db_flush(SameSecret),
#state{db_flush_timer = #timer{secret = SameSecret}} = S0
) ->
with_each_db(fun(DbRec) ->
{ElapsedUs, _} = timer:tc(fun() -> db_flush(DbRec) end),
?LOG_INFO([
{event, periodic_timer}, {}, {op, db_flush},
{name, io_lib:format("~p", [DbRec#db.name])}, {elapsed_us, ElapsedUs}
])
end),
{noreply, init_db_flush_timer(S0)};

handle_info(
?msg_trigger_wal_sync(SameSecret),
#state{wal_sync_timer = #timer{secret = SameSecret}} = S0
) ->
with_each_db(fun(DbRec) ->
{ElapsedUs, _} = timer:tc(fun() -> wal_sync(DbRec) end),
?LOG_INFO([
{event, periodic_timer}, {}, {op, wal_sync},
{name, io_lib:format("~p", [DbRec#db.name])}, {elapsed_us, ElapsedUs}
])
end),
{noreply, init_wal_sync_timer(S0)};

handle_info(Message, State) ->
?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, {message, Message}]),
{noreply, State}.



terminate(_Reason, _State) ->
ets:foldl(
fun(#db{db_handle = Db} = DbRec0, Closed) ->
case sets:is_element(Db, Closed) of
true ->
Closed;
false ->
_ = flush(DbRec0),
_ = sync_wal(DbRec0),
_ = close(DbRec0),
sets:add_element(Db, Closed)
end
end,
sets:new(),
?MODULE
).
with_each_db(fun(DbRec) ->
_ = db_flush(DbRec),
_ = wal_sync(DbRec),
_ = close(DbRec)
end).



Expand All @@ -334,6 +368,33 @@ terminate(_Reason, _State) ->



maybe_cancel_timer(#timer{ref = undefined}) -> ok;
maybe_cancel_timer(#timer{ref = TRef}) -> erlang:cancel_timer(TRef).



init_timer(Timer0, MsgFun) ->
_ = maybe_cancel_timer(Timer0),
Secret = erlang:make_ref(),
TRef = erlang:send_after(Timer0#timer.interval_ms, self(), apply(MsgFun, [Secret])),
Timer0#timer{ref = TRef, secret = Secret}.



init_db_flush_timer(#state{db_flush_timer = Timer0} = S0) ->
S0#state{
db_flush_timer = init_timer(Timer0, fun(Secret) -> ?msg_trigger_db_flush(Secret) end)
}.



init_wal_sync_timer(#state{wal_sync_timer = Timer0} = S0) ->
S0#state{
wal_sync_timer = init_timer(Timer0, fun(Secret) -> ?msg_trigger_wal_sync(Secret) end)
}.



%% @doc Create a new plain database record.
new_dbrec(Name, DataDirRelativePath, UserOptions) ->
Filepath = filename:join(get_data_dir(), DataDirRelativePath),
Expand Down Expand Up @@ -388,15 +449,18 @@ open(#db{
} = DbRec0) ->
case rocksdb:open(Filepath, DbOptions, CfDescriptors) of
{ok, Db, Cfs} ->
_ = flush(Db),
lists:foreach(
fun({Cf, CfName}) ->
FirstDbRec = lists:foldr(
fun({Cf, CfName}, _) ->
?LOG_INFO([{event, db_operation}, {op, open}, {name, io_lib:format("~p", [CfName])}]),
DbRec1 = DbRec0#db{name = CfName, db_handle = Db, cf_handle = Cf},
true = ets:insert(?MODULE, DbRec1)
true = ets:insert(?MODULE, DbRec1),
DbRec1
end,
undefined,
lists:zip(Cfs, CfNames)
),
%% flush the cf database (all column families at once)
_ = db_flush(FirstDbRec),
ok;
{error, OpenError} ->
?LOG_ERROR([{event, db_operation_failed}, {op, open},
Expand All @@ -415,8 +479,8 @@ open(#db{} = DbRec0) ->

%% Attempt to close the database and remove the ETS entries related to it.
%% This function WILL NOT perform any actions regarding persistence: it is up to
%% the user to ensure that both flush/1 and sync_wal/1 functions are called prior
%% calling this function.
%% the user to ensure that both db_flush/1 and wal_sync/1 functions are called
%% prior to calling this function.
%% Database must be open at the moment of calling the function.
close(#db{db_handle = undefined}) -> {error, closed};

Expand Down Expand Up @@ -444,39 +508,39 @@ close(#db{db_handle = Db, name = Name}) ->

%% @doc Attempt to flush the database: persist the memtables contents on disk.
%% Database must be open at the moment of calling the function.
flush(#db{name = Name, db_handle = undefined}) ->
?LOG_ERROR([{event, db_operation_failed}, {op, flush}, {error, closed}, {name, io_lib:format("~p", [Name])}]),
db_flush(#db{name = Name, db_handle = undefined}) ->
?LOG_ERROR([{event, db_operation_failed}, {op, db_flush}, {error, closed}, {name, io_lib:format("~p", [Name])}]),
{error, closed};

flush(#db{name = Name, db_handle = Db}) ->
db_flush(#db{name = Name, db_handle = Db}) ->
case rocksdb:flush(Db, [{wait, true}, {allow_write_stall, false}]) of
{error, FlushError} ->
?LOG_ERROR([{event, db_operation_failed}, {op, flush},
?LOG_ERROR([{event, db_operation_failed}, {op, db_flush},
{name, io_lib:format("~p", [Name])},
{reason, io_lib:format("~p", [FlushError])}]),
{error, failed};
_ ->
?LOG_INFO([{event, db_operation}, {op, flush}, {name, io_lib:format("~p", [Name])}]),
?LOG_INFO([{event, db_operation}, {op, db_flush}, {name, io_lib:format("~p", [Name])}]),
ok
end.



%% @doc Attempt to sync Write Ahead Log (WAL): persist WAL contents on disk.
%% Database must be open at the moment of calling the function.
sync_wal(#db{name = Name, db_handle = undefined}) ->
?LOG_ERROR([{event, db_operation_failed}, {op, sync_wal}, {error, closed}, {name, io_lib:format("~p", [Name])}]),
wal_sync(#db{name = Name, db_handle = undefined}) ->
?LOG_ERROR([{event, db_operation_failed}, {op, wal_sync}, {error, closed}, {name, io_lib:format("~p", [Name])}]),
{error, closed};

sync_wal(#db{name = Name, db_handle = Db}) ->
wal_sync(#db{name = Name, db_handle = Db}) ->
case rocksdb:sync_wal(Db) of
{error, SyncError} ->
?LOG_ERROR([{event, db_operation_failed}, {op, sync_wal},
?LOG_ERROR([{event, db_operation_failed}, {op, wal_sync},
{name, io_lib:format("~p", [Name])},
{reason, io_lib:format("~p", [SyncError])}]),
{error, failed};
_ ->
?LOG_INFO([{event, db_operation}, {op, sync_wal}, {name, io_lib:format("~p", [Name])}]),
?LOG_INFO([{event, db_operation}, {op, wal_sync}, {name, io_lib:format("~p", [Name])}]),
ok
end.

Expand Down Expand Up @@ -520,6 +584,26 @@ with_db(Name, Op, Callback) ->



%% @doc Apply callback for each unique database found in ETS (column family
%% databases will be only called once).
%% The callback will get the database record (#db{}) as an argument.
with_each_db(Callback) ->
ets:foldl(
fun(#db{db_handle = Db} = DbRec0, Closed) ->
case sets:is_element(Db, Closed) of
true ->
Closed;
false ->
_ = apply(Callback, [DbRec0]),
sets:add_element(Db, Closed)
end
end,
sets:new(),
?MODULE
).



get_data_dir() ->
{ok, Config} = application:get_env(arweave, config),
Config#config.data_dir.
Expand Down Expand Up @@ -734,7 +818,7 @@ destroy(Name) ->

test_close(Name) ->
?WITH_DB(Name, fun(Db) ->
_ = flush(Db),
_ = sync_wal(Db),
_ = db_flush(Db),
_ = wal_sync(Db),
_ = close(Db)
end).

0 comments on commit 9c2d908

Please sign in to comment.