diff --git a/rebar.lock b/rebar.lock index dadc10c94..2e155556a 100644 --- a/rebar.lock +++ b/rebar.lock @@ -90,7 +90,7 @@ {<<"hackney">>,{pkg,<<"hackney">>,<<"1.18.1">>},0}, {<<"helium_proto">>, {git,"https://github.com/helium/proto.git", - {ref,"149997d2a74e08679e56c2c892d7e46f2d0d1c46"}}, + {ref,"816d924df69cd5d37f21007712d0d88964549845"}}, 0}, {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.2.3">>},2}, {<<"httpc_aws">>, diff --git a/src/grpc/router_ics_gateway_location_worker.erl b/src/grpc/router_ics_gateway_location_worker.erl new file mode 100644 index 000000000..925225750 --- /dev/null +++ b/src/grpc/router_ics_gateway_location_worker.erl @@ -0,0 +1,206 @@ +%%%------------------------------------------------------------------- +%% @doc +%% == Router IOT Config Service Gateway Location Worker == +%% @end +%%%------------------------------------------------------------------- +-module(router_ics_gateway_location_worker). + +-behavior(gen_server). + +-include("./autogen/iot_config_pb.hrl"). + +%% ------------------------------------------------------------------ +%% API Function Exports +%% ------------------------------------------------------------------ +-export([ + start_link/1, + init_ets/0, + get/1 +]). + +%% ------------------------------------------------------------------ +%% gen_server Function Exports +%% ------------------------------------------------------------------ +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-define(SERVER, ?MODULE). +-define(ETS, router_ics_gateway_location_worker_ets). +-define(INIT, init). +-ifdef(TEST). +-define(BACKOFF_MIN, 100). +-else. +-define(BACKOFF_MIN, timer:seconds(10)). +-endif. +-define(BACKOFF_MAX, timer:minutes(5)). + +-record(state, { + pubkey_bin :: libp2p_crypto:pubkey_bin(), + sig_fun :: function(), + host :: string(), + port :: non_neg_integer(), + conn_backoff :: backoff:backoff() +}). + +-record(location, { + gateway :: libp2p_crypto:pubkey_bin(), + timestamp :: non_neg_integer(), + h3_index :: h3:index() +}). + +-type state() :: #state{}. + +%% ------------------------------------------------------------------ +%% API Function Definitions +%% ------------------------------------------------------------------ +start_link(#{host := ""}) -> + ignore; +start_link(#{port := Port} = Args) when is_list(Port) -> + ?MODULE:start_link(Args#{port => erlang:list_to_integer(Port)}); +start_link(#{host := Host, port := Port} = Args) when + is_list(Host) andalso is_integer(Port) +-> + gen_server:start_link({local, ?SERVER}, ?SERVER, Args, []); +start_link(_Args) -> + ignore. + +-spec init_ets() -> ok. +init_ets() -> + ?ETS = ets:new(?ETS, [ + public, named_table, set, {read_concurrency, true}, {keypos, #location.gateway} + ]), + ok. + +-spec get(libp2p_crypto:pubkey_bin()) -> {ok, h3:index()} | {error, any()}. +get(PubKeyBin) -> + case lookup(PubKeyBin) of + {error, _Reason} -> + gen_server:call(?SERVER, {get, PubKeyBin}); + {ok, _} = OK -> + OK + end. + +%% ------------------------------------------------------------------ +%% gen_server Function Definitions +%% ------------------------------------------------------------------ +init(#{pubkey_bin := PubKeyBin, sig_fun := SigFun, host := Host, port := Port} = Args) -> + lager:info("~p init with ~p", [?SERVER, Args]), + {ok, _, SigFun, _} = blockchain_swarm:keys(), + Backoff = backoff:type(backoff:init(?BACKOFF_MIN, ?BACKOFF_MAX), normal), + self() ! ?INIT, + {ok, #state{ + pubkey_bin = PubKeyBin, + sig_fun = SigFun, + host = Host, + port = Port, + conn_backoff = Backoff + }}. + +handle_call({get, PubKeyBin}, _From, #state{conn_backoff = Backoff0} = State) -> + case get_gateway_location(PubKeyBin, State) of + {error, Reason} = Error -> + {Delay, Backoff1} = backoff:fail(Backoff0), + _ = erlang:send_after(Delay, self(), ?INIT), + lager:warning("fail to get_gateway_location ~p, reconnecting in ~wms", [Reason, Delay]), + {reply, Error, State#state{conn_backoff = Backoff1}}; + {ok, H3IndexString} -> + H3Index = h3:from_string(H3IndexString), + ok = insert(PubKeyBin, H3Index), + {reply, {ok, H3Index}, State} + end; +handle_call(_Msg, _From, State) -> + lager:warning("rcvd unknown call msg: ~p from: ~p", [_Msg, _From]), + {reply, ok, State}. + +handle_cast(_Msg, State) -> + lager:warning("rcvd unknown cast msg: ~p", [_Msg]), + {noreply, State}. + +handle_info(?INIT, #state{conn_backoff = Backoff0} = State) -> + {Delay, Backoff1} = backoff:fail(Backoff0), + case connect(State) of + {error, _Reason} -> + lager:warning("fail to connect ~p, reconnecting in ~wms", [_Reason, Delay]), + _ = erlang:send_after(Delay, self(), ?INIT), + {noreply, State#state{conn_backoff = Backoff1}}; + ok -> + lager:info("connected"), + {_, Backoff2} = backoff:succeed(Backoff0), + {noreply, State#state{conn_backoff = Backoff2}} + end; +handle_info(_Msg, State) -> + lager:warning("rcvd unknown info msg: ~p", [_Msg]), + {noreply, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +%% ------------------------------------------------------------------ +%% Internal Function Definitions +%% ------------------------------------------------------------------ + +-spec lookup(PubKeyBin :: libp2p_crypto:pubkey_bin()) -> + {ok, h3:index()} | {error, not_found | outdated}. +lookup(PubKeyBin) -> + Yesterday = erlang:system_time(millisecond) - timer:hours(24), + case ets:lookup(?ETS, PubKeyBin) of + [] -> + {error, not_found}; + [#location{timestamp = T}] when T < Yesterday -> + {error, outdated}; + [#location{h3_index = H3Index}] -> + {ok, H3Index} + end. + +-spec insert(PubKeyBin :: libp2p_crypto:pubkey_bin(), H3Index :: h3:index()) -> ok. +insert(PubKeyBin, H3Index) -> + true = ets:insert(?ETS, #location{ + gateway = PubKeyBin, + timestamp = erlang:system_time(millisecond), + h3_index = H3Index + }), + ok. + +-spec connect(State :: state()) -> ok | {error, any()}. +connect(#state{host = Host, port = Port} = State) -> + case grpcbox_channel:pick(?MODULE, stream) of + {error, _} -> + case + grpcbox_client:connect(?MODULE, [{http, Host, Port, []}], #{ + sync_start => true + }) + of + {ok, _Conn} -> + connect(State); + {error, _Reason} = Error -> + Error + end; + {ok, {_Conn, _Interceptor}} -> + ok + end. + +-spec get_gateway_location(PubKeyBin :: libp2p_crypto:pubkey_bin(), state()) -> + {ok, string()} | {error, any()}. +get_gateway_location(PubKeyBin, #state{sig_fun = SigFun}) -> + Req = #iot_config_gateway_location_req_v1_pb{ + gateway = PubKeyBin + }, + EncodedReq = iot_config_pb:encode_msg(Req, iot_config_gateway_location_req_v1_pb), + SignedReq = Req#iot_config_gateway_location_req_v1_pb{signature = SigFun(EncodedReq)}, + case helium_iot_config_gateway_client:location(SignedReq, #{channel => ?MODULE}) of + {grpc_error, Reason} -> + {error, Reason}; + {error, _} = Error -> + Error; + {ok, #iot_config_gateway_location_res_v1_pb{location = Location}, _Meta} -> + {ok, Location} + end. diff --git a/src/router_blockchain.erl b/src/router_blockchain.erl index 845383489..b2e36d0d8 100644 --- a/src/router_blockchain.erl +++ b/src/router_blockchain.erl @@ -54,16 +54,26 @@ calculate_dc_amount(PayloadSize) -> -spec get_hotspot_lat_lon(PubKeyBin :: libp2p_crypto:pubkey_bin()) -> {float(), float()} | {unknown, unknown}. get_hotspot_lat_lon(PubKeyBin) -> - Ledger = ledger(), - case blockchain_ledger_v1:find_gateway_info(PubKeyBin, Ledger) of - {error, _} -> - {unknown, unknown}; - {ok, Hotspot} -> - case blockchain_ledger_gateway_v2:location(Hotspot) of - undefined -> + case ?MODULE:is_chain_dead() of + false -> + Ledger = ledger(), + case blockchain_ledger_v1:find_gateway_info(PubKeyBin, Ledger) of + {error, _} -> + {unknown, unknown}; + {ok, Hotspot} -> + case blockchain_ledger_gateway_v2:location(Hotspot) of + undefined -> + {unknown, unknown}; + Loc -> + h3:to_geo(Loc) + end + end; + true -> + case router_ics_gateway_location_worker:get(PubKeyBin) of + {error, _} -> {unknown, unknown}; - Loc -> - h3:to_geo(Loc) + {ok, Index} -> + h3:to_geo(Index) end end. @@ -71,14 +81,19 @@ get_hotspot_lat_lon(PubKeyBin) -> -spec get_hotspot_location_index(PubKeybin :: libp2p_crypto:pubkey_bin()) -> {ok, non_neg_integer()} | {error, any()}. get_hotspot_location_index(PubKeyBin) -> - case blockchain_ledger_v1:find_gateway_info(PubKeyBin, ledger()) of - {error, _} = Error -> - Error; - {ok, Hotspot} -> - case blockchain_ledger_gateway_v2:location(Hotspot) of - undefined -> {error, undef_index}; - Index -> {ok, Index} - end + case ?MODULE:is_chain_dead() of + false -> + case blockchain_ledger_v1:find_gateway_info(PubKeyBin, ledger()) of + {error, _} = Error -> + Error; + {ok, Hotspot} -> + case blockchain_ledger_gateway_v2:location(Hotspot) of + undefined -> {error, undef_index}; + Index -> {ok, Index} + end + end; + true -> + router_ics_gateway_location_worker:get(PubKeyBin) end. %% Assigning DevAddrs diff --git a/src/router_sup.erl b/src/router_sup.erl index 845d38786..e19ac3383 100644 --- a/src/router_sup.erl +++ b/src/router_sup.erl @@ -131,6 +131,9 @@ init([]) -> PubKeyBin = libp2p_crypto:pubkey_to_bin(PubKey0), ICSOptsDefault = application:get_env(router, ics, #{}), ICSOpts = ICSOptsDefault#{pubkey_bin => PubKeyBin, sig_fun => SigFun}, + + router_ics_gateway_location_worker:init_ets(), + {ok, {?FLAGS, [ ?WORKER(ru_poc_denylist, [POCDenyListArgs]), @@ -144,7 +147,8 @@ init([]) -> ?WORKER(router_device_devaddr, [#{}]), ?WORKER(router_xor_filter_worker, [#{}]), ?WORKER(router_ics_eui_worker, [ICSOpts]), - ?WORKER(router_ics_skf_worker, [ICSOpts]) + ?WORKER(router_ics_skf_worker, [ICSOpts]), + ?WORKER(router_ics_gateway_location_worker, [ICSOpts]) ]}}. %%==================================================================== diff --git a/test/router_ics_gateway_location_worker_SUITE.erl b/test/router_ics_gateway_location_worker_SUITE.erl new file mode 100644 index 000000000..431ae1d12 --- /dev/null +++ b/test/router_ics_gateway_location_worker_SUITE.erl @@ -0,0 +1,135 @@ +-module(router_ics_gateway_location_worker_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include("../src/grpc/autogen/iot_config_pb.hrl"). +-include("console_test.hrl"). +-include("lorawan_vars.hrl"). + +-export([ + all/0, + init_per_testcase/2, + end_per_testcase/2 +]). + +-export([ + main_test/1 +]). + +-record(location, { + gateway :: libp2p_crypto:pubkey_bin(), + timestamp :: non_neg_integer(), + h3_index :: h3:index() +}). + +%%-------------------------------------------------------------------- +%% COMMON TEST CALLBACK FUNCTIONS +%%-------------------------------------------------------------------- + +%%-------------------------------------------------------------------- +%% @public +%% @doc +%% Running tests for this suite +%% @end +%%-------------------------------------------------------------------- +all() -> + [ + main_test + ]. + +%%-------------------------------------------------------------------- +%% TEST CASE SETUP +%%-------------------------------------------------------------------- +init_per_testcase(TestCase, Config) -> + persistent_term:put(router_test_ics_gateway_service, self()), + Port = 8085, + ServerPid = start_server(Port), + ok = application:set_env( + router, + ics, + #{host => "localhost", port => Port}, + [{persistent, true}] + ), + test_utils:init_per_testcase(TestCase, [{ics_server, ServerPid} | Config]). + +%%-------------------------------------------------------------------- +%% TEST CASE TEARDOWN +%%-------------------------------------------------------------------- +end_per_testcase(TestCase, Config) -> + test_utils:end_per_testcase(TestCase, Config), + ServerPid = proplists:get_value(ics_server, Config), + case erlang:is_process_alive(ServerPid) of + true -> gen_server:stop(ServerPid); + false -> ok + end, + _ = application:stop(grpcbox), + ok = application:set_env( + router, + ics, + #{}, + [{persistent, true}] + ), + ok. + +%%-------------------------------------------------------------------- +%% TEST CASES +%%-------------------------------------------------------------------- + +main_test(_Config) -> + #{public := PubKey1} = libp2p_crypto:generate_keys(ecc_compact), + PubKeyBin1 = libp2p_crypto:pubkey_to_bin(PubKey1), + ExpectedIndex = h3:from_string("8828308281fffff"), + + Before = erlang:system_time(millisecond), + + %% Let worker start + test_utils:wait_until(fun() -> + try router_ics_gateway_location_worker:get(PubKeyBin1) of + {ok, ExpectedIndex} -> true; + _ -> false + catch + _:_ -> + false + end + end), + + [LocationRec] = ets:lookup(router_ics_gateway_location_worker_ets, PubKeyBin1), + + ?assertEqual(PubKeyBin1, LocationRec#location.gateway), + ?assertEqual(ExpectedIndex, LocationRec#location.h3_index), + + Timestamp = LocationRec#location.timestamp, + Now = erlang:system_time(millisecond), + + ?assert(Timestamp > Before), + ?assert(Timestamp =< Now), + + [{location, Req1}] = rcv_loop([]), + ?assertEqual(PubKeyBin1, Req1#iot_config_gateway_location_req_v1_pb.gateway), + + ok. + +%% ------------------------------------------------------------------ +%% Helper functions +%% ------------------------------------------------------------------ + +start_server(Port) -> + _ = application:ensure_all_started(grpcbox), + {ok, ServerPid} = grpcbox:start_server(#{ + grpc_opts => #{ + service_protos => [iot_config_pb], + services => #{ + 'helium.iot_config.gateway' => router_test_ics_gateway_service + } + }, + listen_opts => #{port => Port, ip => {0, 0, 0, 0}} + }), + ServerPid. + +rcv_loop(Acc) -> + receive + {router_test_ics_gateway_service, Type, Req} -> + lager:notice("got router_test_ics_gateway_service ~p req ~p", [Type, Req]), + rcv_loop([{Type, Req} | Acc]) + after timer:seconds(2) -> Acc + end. diff --git a/test/router_test_ics_gateway_service.erl b/test/router_test_ics_gateway_service.erl new file mode 100644 index 000000000..586f506c4 --- /dev/null +++ b/test/router_test_ics_gateway_service.erl @@ -0,0 +1,57 @@ +-module(router_test_ics_gateway_service). + +-behaviour(helium_iot_config_gateway_bhvr). +-include("../src/grpc/autogen/iot_config_pb.hrl"). + +-export([ + init/2, + handle_info/2 +]). + +-export([ + region_params/2, + load_region/2, + location/2 +]). + +-spec init(atom(), StreamState :: grpcbox_stream:t()) -> grpcbox_stream:t(). +init(_RPC, StreamState) -> + StreamState. + +-spec handle_info(Msg :: any(), StreamState :: grpcbox_stream:t()) -> grpcbox_stream:t(). +handle_info(_Msg, StreamState) -> + StreamState. + +region_params(_Ctx, _Msg) -> + {grpc_error, {12, <<"UNIMPLEMENTED">>}}. + +load_region(_Ctx, _Msg) -> + {grpc_error, {12, <<"UNIMPLEMENTED">>}}. + +location(Ctx, Req) -> + case verify_location_req(Req) of + true -> + lager:info("got location req ~p", [Req]), + Res = #iot_config_gateway_location_res_v1_pb{ + location = "8828308281fffff" + }, + catch persistent_term:get(?MODULE) ! {?MODULE, location, Req}, + {ok, Res, Ctx}; + false -> + lager:error("failed to verify location req ~p", [Req]), + {grpc_error, {7, <<"PERMISSION_DENIED">>}} + end. + +-spec verify_location_req(Req :: #iot_config_gateway_location_req_v1_pb{}) -> boolean(). +verify_location_req(Req) -> + EncodedReq = iot_config_pb:encode_msg( + Req#iot_config_gateway_location_req_v1_pb{ + signature = <<>> + }, + iot_config_gateway_location_req_v1_pb + ), + libp2p_crypto:verify( + EncodedReq, + Req#iot_config_gateway_location_req_v1_pb.signature, + libp2p_crypto:bin_to_pubkey(blockchain_swarm:pubkey_bin()) + ). diff --git a/test/test_utils.erl b/test/test_utils.erl index e523b1bef..ce3c94c6b 100644 --- a/test/test_utils.erl +++ b/test/test_utils.erl @@ -124,6 +124,9 @@ init_per_testcase(TestCase, Config) -> {lager_console_backend, [{module, router_test_ics_skf_service}], debug}, {lager_console_backend, [{module, router_ics_skf_worker_SUITE}], debug}, {lager_console_backend, [{module, router_ics_skf_list_handler}], debug}, + {lager_console_backend, [{module, router_ics_gateway_location_worker_SUITE}], + debug}, + {lager_console_backend, [{module, router_test_ics_gateway_service}], debug}, {{lager_file_backend, "router.log"}, [{application, router}], debug}, {{lager_file_backend, "router.log"}, [{module, router_console_api}], debug}, {{lager_file_backend, "router.log"}, [{module, router_device_routing}], debug},