diff --git a/apps/arweave/src/ar_kv.erl b/apps/arweave/src/ar_kv.erl index d1931c92f..91a45754b 100644 --- a/apps/arweave/src/ar_kv.erl +++ b/apps/arweave/src/ar_kv.erl @@ -46,7 +46,20 @@ cf_handle = undefined :: rocksdb:cf_handle() }). --record(state, {}). +-define(msg_trigger_timer(Kind, Secret), {msg_trigger_timer, Kind, Secret}). +-define(msg_trigger_db_flush(Secret), ?msg_trigger_timer(db_flush, Secret)). +-define(msg_trigger_wal_sync(Secret), ?msg_trigger_timer(wal_sync, Secret)). + +-record(timer, { + interval_ms :: pos_integer(), + ref :: erlang:reference() | undefined, + secret :: erlang:reference() | undefined +}). + +-record(state, { + db_flush_timer :: #timer{}, + wal_sync_timer :: #timer{} +}). @@ -254,7 +267,13 @@ count(Name) -> init([]) -> process_flag(trap_exit, true), - {ok, #state{}}. + S0 = #state{ + db_flush_timer = #timer{interval_ms = 1800000}, + wal_sync_timer = #timer{interval_ms = 60000} + }, + S1 = init_db_flush_timer(S0), + S2 = init_wal_sync_timer(S1), + {ok, S2}. @@ -277,7 +296,6 @@ handle_call({open, {DataDirRelativePath, CfDescriptors, UserOptions, CfNames}}, DbRec0 = new_dbrec(CfNames, CfDescriptors, DataDirRelativePath, UserOptions), case ets:lookup(?MODULE, hd(CfNames)) of [] -> - ?LOG_INFO([{event, skipping_repair_for_cf_database}]), case open(DbRec0) of ok -> {reply, ok, State}; {error, Reason} -> {reply, {error, Reason}, State} @@ -303,6 +321,32 @@ handle_cast(Cast, State) -> +handle_info( + ?msg_trigger_db_flush(SameSecret), + #state{db_flush_timer = #timer{secret = SameSecret}} = S0 +) -> + with_each_db(fun(DbRec) -> + {ElapsedUs, _} = timer:tc(fun() -> db_flush(DbRec) end), + ?LOG_INFO([ + {event, periodic_timer}, {}, {op, db_flush}, + {name, io_lib:format("~p", [DbRec#db.name])}, {elapsed_us, ElapsedUs} + ]) + end), + {noreply, init_db_flush_timer(S0)}; + +handle_info( + ?msg_trigger_wal_sync(SameSecret), + #state{wal_sync_timer = #timer{secret = SameSecret}} = S0 +) -> + with_each_db(fun(DbRec) -> + {ElapsedUs, _} = timer:tc(fun() -> wal_sync(DbRec) end), + ?LOG_INFO([ + {event, periodic_timer}, {}, {op, wal_sync}, + {name, io_lib:format("~p", [DbRec#db.name])}, {elapsed_us, ElapsedUs} + ]) + end), + {noreply, init_wal_sync_timer(S0)}; + handle_info(Message, State) -> ?LOG_WARNING([{event, unhandled_info}, {module, ?MODULE}, {message, Message}]), {noreply, State}. @@ -310,21 +354,11 @@ handle_info(Message, State) -> terminate(_Reason, _State) -> - ets:foldl( - fun(#db{db_handle = Db} = DbRec0, Closed) -> - case sets:is_element(Db, Closed) of - true -> - Closed; - false -> - _ = flush(DbRec0), - _ = sync_wal(DbRec0), - _ = close(DbRec0), - sets:add_element(Db, Closed) - end - end, - sets:new(), - ?MODULE - ). + with_each_db(fun(DbRec) -> + _ = db_flush(DbRec), + _ = wal_sync(DbRec), + _ = close(DbRec) + end). @@ -334,6 +368,33 @@ terminate(_Reason, _State) -> +maybe_cancel_timer(#timer{ref = undefined}) -> ok; +maybe_cancel_timer(#timer{ref = TRef}) -> erlang:cancel_timer(TRef). + + + +init_timer(Timer0, MsgFun) -> + _ = maybe_cancel_timer(Timer0), + Secret = erlang:make_ref(), + TRef = erlang:send_after(Timer0#timer.interval_ms, self(), apply(MsgFun, [Secret])), + Timer0#timer{ref = TRef, secret = Secret}. + + + +init_db_flush_timer(#state{db_flush_timer = Timer0} = S0) -> + S0#state{ + db_flush_timer = init_timer(Timer0, fun(Secret) -> ?msg_trigger_db_flush(Secret) end) + }. + + + +init_wal_sync_timer(#state{wal_sync_timer = Timer0} = S0) -> + S0#state{ + wal_sync_timer = init_timer(Timer0, fun(Secret) -> ?msg_trigger_wal_sync(Secret) end) + }. + + + %% @doc Create a new plain database record. new_dbrec(Name, DataDirRelativePath, UserOptions) -> Filepath = filename:join(get_data_dir(), DataDirRelativePath), @@ -388,15 +449,18 @@ open(#db{ } = DbRec0) -> case rocksdb:open(Filepath, DbOptions, CfDescriptors) of {ok, Db, Cfs} -> - _ = flush(Db), - lists:foreach( - fun({Cf, CfName}) -> + FirstDbRec = lists:foldr( + fun({Cf, CfName}, _) -> ?LOG_INFO([{event, db_operation}, {op, open}, {name, io_lib:format("~p", [CfName])}]), DbRec1 = DbRec0#db{name = CfName, db_handle = Db, cf_handle = Cf}, - true = ets:insert(?MODULE, DbRec1) + true = ets:insert(?MODULE, DbRec1), + DbRec1 end, + undefined, lists:zip(Cfs, CfNames) ), + %% flush the cf database (all column families at once) + _ = db_flush(FirstDbRec), ok; {error, OpenError} -> ?LOG_ERROR([{event, db_operation_failed}, {op, open}, @@ -415,8 +479,8 @@ open(#db{} = DbRec0) -> %% Attempt to close the database and remove the ETS entries related to it. %% This function WILL NOT perform any actions regarding persistence: it is up to -%% the user to ensure that both flush/1 and sync_wal/1 functions are called prior -%% calling this function. +%% the user to ensure that both db_flush/1 and wal_sync/1 functions are called +%% prior to calling this function. %% Database must be open at the moment of calling the function. close(#db{db_handle = undefined}) -> {error, closed}; @@ -444,19 +508,19 @@ close(#db{db_handle = Db, name = Name}) -> %% @doc Attempt to flush the database: persist the memtables contents on disk. %% Database must be open at the moment of calling the function. -flush(#db{name = Name, db_handle = undefined}) -> - ?LOG_ERROR([{event, db_operation_failed}, {op, flush}, {error, closed}, {name, io_lib:format("~p", [Name])}]), +db_flush(#db{name = Name, db_handle = undefined}) -> + ?LOG_ERROR([{event, db_operation_failed}, {op, db_flush}, {error, closed}, {name, io_lib:format("~p", [Name])}]), {error, closed}; -flush(#db{name = Name, db_handle = Db}) -> +db_flush(#db{name = Name, db_handle = Db}) -> case rocksdb:flush(Db, [{wait, true}, {allow_write_stall, false}]) of {error, FlushError} -> - ?LOG_ERROR([{event, db_operation_failed}, {op, flush}, + ?LOG_ERROR([{event, db_operation_failed}, {op, db_flush}, {name, io_lib:format("~p", [Name])}, {reason, io_lib:format("~p", [FlushError])}]), {error, failed}; _ -> - ?LOG_INFO([{event, db_operation}, {op, flush}, {name, io_lib:format("~p", [Name])}]), + ?LOG_INFO([{event, db_operation}, {op, db_flush}, {name, io_lib:format("~p", [Name])}]), ok end. @@ -464,19 +528,19 @@ flush(#db{name = Name, db_handle = Db}) -> %% @doc Attempt to sync Write Ahead Log (WAL): persist WAL contents on disk. %% Database must be open at the moment of calling the function. -sync_wal(#db{name = Name, db_handle = undefined}) -> - ?LOG_ERROR([{event, db_operation_failed}, {op, sync_wal}, {error, closed}, {name, io_lib:format("~p", [Name])}]), +wal_sync(#db{name = Name, db_handle = undefined}) -> + ?LOG_ERROR([{event, db_operation_failed}, {op, wal_sync}, {error, closed}, {name, io_lib:format("~p", [Name])}]), {error, closed}; -sync_wal(#db{name = Name, db_handle = Db}) -> +wal_sync(#db{name = Name, db_handle = Db}) -> case rocksdb:sync_wal(Db) of {error, SyncError} -> - ?LOG_ERROR([{event, db_operation_failed}, {op, sync_wal}, + ?LOG_ERROR([{event, db_operation_failed}, {op, wal_sync}, {name, io_lib:format("~p", [Name])}, {reason, io_lib:format("~p", [SyncError])}]), {error, failed}; _ -> - ?LOG_INFO([{event, db_operation}, {op, sync_wal}, {name, io_lib:format("~p", [Name])}]), + ?LOG_INFO([{event, db_operation}, {op, wal_sync}, {name, io_lib:format("~p", [Name])}]), ok end. @@ -520,6 +584,26 @@ with_db(Name, Op, Callback) -> +%% @doc Apply callback for each unique database found in ETS (column family +%% databases will be only called once). +%% The callback will get the database record (#db{}) as an argument. +with_each_db(Callback) -> + ets:foldl( + fun(#db{db_handle = Db} = DbRec0, Closed) -> + case sets:is_element(Db, Closed) of + true -> + Closed; + false -> + _ = apply(Callback, [DbRec0]), + sets:add_element(Db, Closed) + end + end, + sets:new(), + ?MODULE + ). + + + get_data_dir() -> {ok, Config} = application:get_env(arweave, config), Config#config.data_dir. @@ -734,7 +818,7 @@ destroy(Name) -> test_close(Name) -> ?WITH_DB(Name, fun(Db) -> - _ = flush(Db), - _ = sync_wal(Db), + _ = db_flush(Db), + _ = wal_sync(Db), _ = close(Db) end).