Skip to content

Commit

Permalink
Basic gateway_location_worker (#911)
Browse files Browse the repository at this point in the history
* Basic gateway_location_worker

* First simple test

* Finish CT

* Upgrade proto

* Wrap location calls around is_chain_dead
  • Loading branch information
macpie authored Mar 1, 2023
1 parent 12d9b7a commit 117cf42
Show file tree
Hide file tree
Showing 7 changed files with 439 additions and 19 deletions.
2 changes: 1 addition & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
{<<"hackney">>,{pkg,<<"hackney">>,<<"1.18.1">>},0},
{<<"helium_proto">>,
{git,"https://github.com/helium/proto.git",
{ref,"149997d2a74e08679e56c2c892d7e46f2d0d1c46"}},
{ref,"816d924df69cd5d37f21007712d0d88964549845"}},
0},
{<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.2.3">>},2},
{<<"httpc_aws">>,
Expand Down
206 changes: 206 additions & 0 deletions src/grpc/router_ics_gateway_location_worker.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
%%%-------------------------------------------------------------------
%% @doc
%% == Router IOT Config Service Gateway Location Worker ==
%% @end
%%%-------------------------------------------------------------------
-module(router_ics_gateway_location_worker).

-behavior(gen_server).

-include("./autogen/iot_config_pb.hrl").

%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([
start_link/1,
init_ets/0,
get/1
]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).

-define(SERVER, ?MODULE).
-define(ETS, router_ics_gateway_location_worker_ets).
-define(INIT, init).
-ifdef(TEST).
-define(BACKOFF_MIN, 100).
-else.
-define(BACKOFF_MIN, timer:seconds(10)).
-endif.
-define(BACKOFF_MAX, timer:minutes(5)).

-record(state, {
pubkey_bin :: libp2p_crypto:pubkey_bin(),
sig_fun :: function(),
host :: string(),
port :: non_neg_integer(),
conn_backoff :: backoff:backoff()
}).

-record(location, {
gateway :: libp2p_crypto:pubkey_bin(),
timestamp :: non_neg_integer(),
h3_index :: h3:index()
}).

-type state() :: #state{}.

%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
start_link(#{host := ""}) ->
ignore;
start_link(#{port := Port} = Args) when is_list(Port) ->
?MODULE:start_link(Args#{port => erlang:list_to_integer(Port)});
start_link(#{host := Host, port := Port} = Args) when
is_list(Host) andalso is_integer(Port)
->
gen_server:start_link({local, ?SERVER}, ?SERVER, Args, []);
start_link(_Args) ->
ignore.

-spec init_ets() -> ok.
init_ets() ->
?ETS = ets:new(?ETS, [
public, named_table, set, {read_concurrency, true}, {keypos, #location.gateway}
]),
ok.

-spec get(libp2p_crypto:pubkey_bin()) -> {ok, h3:index()} | {error, any()}.
get(PubKeyBin) ->
case lookup(PubKeyBin) of
{error, _Reason} ->
gen_server:call(?SERVER, {get, PubKeyBin});
{ok, _} = OK ->
OK
end.

%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
init(#{pubkey_bin := PubKeyBin, sig_fun := SigFun, host := Host, port := Port} = Args) ->
lager:info("~p init with ~p", [?SERVER, Args]),
{ok, _, SigFun, _} = blockchain_swarm:keys(),
Backoff = backoff:type(backoff:init(?BACKOFF_MIN, ?BACKOFF_MAX), normal),
self() ! ?INIT,
{ok, #state{
pubkey_bin = PubKeyBin,
sig_fun = SigFun,
host = Host,
port = Port,
conn_backoff = Backoff
}}.

handle_call({get, PubKeyBin}, _From, #state{conn_backoff = Backoff0} = State) ->
case get_gateway_location(PubKeyBin, State) of
{error, Reason} = Error ->
{Delay, Backoff1} = backoff:fail(Backoff0),
_ = erlang:send_after(Delay, self(), ?INIT),
lager:warning("fail to get_gateway_location ~p, reconnecting in ~wms", [Reason, Delay]),
{reply, Error, State#state{conn_backoff = Backoff1}};
{ok, H3IndexString} ->
H3Index = h3:from_string(H3IndexString),
ok = insert(PubKeyBin, H3Index),
{reply, {ok, H3Index}, State}
end;
handle_call(_Msg, _From, State) ->
lager:warning("rcvd unknown call msg: ~p from: ~p", [_Msg, _From]),
{reply, ok, State}.

handle_cast(_Msg, State) ->
lager:warning("rcvd unknown cast msg: ~p", [_Msg]),
{noreply, State}.

handle_info(?INIT, #state{conn_backoff = Backoff0} = State) ->
{Delay, Backoff1} = backoff:fail(Backoff0),
case connect(State) of
{error, _Reason} ->
lager:warning("fail to connect ~p, reconnecting in ~wms", [_Reason, Delay]),
_ = erlang:send_after(Delay, self(), ?INIT),
{noreply, State#state{conn_backoff = Backoff1}};
ok ->
lager:info("connected"),
{_, Backoff2} = backoff:succeed(Backoff0),
{noreply, State#state{conn_backoff = Backoff2}}
end;
handle_info(_Msg, State) ->
lager:warning("rcvd unknown info msg: ~p", [_Msg]),
{noreply, State}.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

terminate(_Reason, _State) ->
ok.

%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------

-spec lookup(PubKeyBin :: libp2p_crypto:pubkey_bin()) ->
{ok, h3:index()} | {error, not_found | outdated}.
lookup(PubKeyBin) ->
Yesterday = erlang:system_time(millisecond) - timer:hours(24),
case ets:lookup(?ETS, PubKeyBin) of
[] ->
{error, not_found};
[#location{timestamp = T}] when T < Yesterday ->
{error, outdated};
[#location{h3_index = H3Index}] ->
{ok, H3Index}
end.

-spec insert(PubKeyBin :: libp2p_crypto:pubkey_bin(), H3Index :: h3:index()) -> ok.
insert(PubKeyBin, H3Index) ->
true = ets:insert(?ETS, #location{
gateway = PubKeyBin,
timestamp = erlang:system_time(millisecond),
h3_index = H3Index
}),
ok.

-spec connect(State :: state()) -> ok | {error, any()}.
connect(#state{host = Host, port = Port} = State) ->
case grpcbox_channel:pick(?MODULE, stream) of
{error, _} ->
case
grpcbox_client:connect(?MODULE, [{http, Host, Port, []}], #{
sync_start => true
})
of
{ok, _Conn} ->
connect(State);
{error, _Reason} = Error ->
Error
end;
{ok, {_Conn, _Interceptor}} ->
ok
end.

-spec get_gateway_location(PubKeyBin :: libp2p_crypto:pubkey_bin(), state()) ->
{ok, string()} | {error, any()}.
get_gateway_location(PubKeyBin, #state{sig_fun = SigFun}) ->
Req = #iot_config_gateway_location_req_v1_pb{
gateway = PubKeyBin
},
EncodedReq = iot_config_pb:encode_msg(Req, iot_config_gateway_location_req_v1_pb),
SignedReq = Req#iot_config_gateway_location_req_v1_pb{signature = SigFun(EncodedReq)},
case helium_iot_config_gateway_client:location(SignedReq, #{channel => ?MODULE}) of
{grpc_error, Reason} ->
{error, Reason};
{error, _} = Error ->
Error;
{ok, #iot_config_gateway_location_res_v1_pb{location = Location}, _Meta} ->
{ok, Location}
end.
49 changes: 32 additions & 17 deletions src/router_blockchain.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,31 +54,46 @@ calculate_dc_amount(PayloadSize) ->
-spec get_hotspot_lat_lon(PubKeyBin :: libp2p_crypto:pubkey_bin()) ->
{float(), float()} | {unknown, unknown}.
get_hotspot_lat_lon(PubKeyBin) ->
Ledger = ledger(),
case blockchain_ledger_v1:find_gateway_info(PubKeyBin, Ledger) of
{error, _} ->
{unknown, unknown};
{ok, Hotspot} ->
case blockchain_ledger_gateway_v2:location(Hotspot) of
undefined ->
case ?MODULE:is_chain_dead() of
false ->
Ledger = ledger(),
case blockchain_ledger_v1:find_gateway_info(PubKeyBin, Ledger) of
{error, _} ->
{unknown, unknown};
{ok, Hotspot} ->
case blockchain_ledger_gateway_v2:location(Hotspot) of
undefined ->
{unknown, unknown};
Loc ->
h3:to_geo(Loc)
end
end;
true ->
case router_ics_gateway_location_worker:get(PubKeyBin) of
{error, _} ->
{unknown, unknown};
Loc ->
h3:to_geo(Loc)
{ok, Index} ->
h3:to_geo(Index)
end
end.

%% Assigning DevAddrs
-spec get_hotspot_location_index(PubKeybin :: libp2p_crypto:pubkey_bin()) ->
{ok, non_neg_integer()} | {error, any()}.
get_hotspot_location_index(PubKeyBin) ->
case blockchain_ledger_v1:find_gateway_info(PubKeyBin, ledger()) of
{error, _} = Error ->
Error;
{ok, Hotspot} ->
case blockchain_ledger_gateway_v2:location(Hotspot) of
undefined -> {error, undef_index};
Index -> {ok, Index}
end
case ?MODULE:is_chain_dead() of
false ->
case blockchain_ledger_v1:find_gateway_info(PubKeyBin, ledger()) of
{error, _} = Error ->
Error;
{ok, Hotspot} ->
case blockchain_ledger_gateway_v2:location(Hotspot) of
undefined -> {error, undef_index};
Index -> {ok, Index}
end
end;
true ->
router_ics_gateway_location_worker:get(PubKeyBin)
end.

%% Assigning DevAddrs
Expand Down
6 changes: 5 additions & 1 deletion src/router_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ init([]) ->
PubKeyBin = libp2p_crypto:pubkey_to_bin(PubKey0),
ICSOptsDefault = application:get_env(router, ics, #{}),
ICSOpts = ICSOptsDefault#{pubkey_bin => PubKeyBin, sig_fun => SigFun},

router_ics_gateway_location_worker:init_ets(),

{ok,
{?FLAGS, [
?WORKER(ru_poc_denylist, [POCDenyListArgs]),
Expand All @@ -144,7 +147,8 @@ init([]) ->
?WORKER(router_device_devaddr, [#{}]),
?WORKER(router_xor_filter_worker, [#{}]),
?WORKER(router_ics_eui_worker, [ICSOpts]),
?WORKER(router_ics_skf_worker, [ICSOpts])
?WORKER(router_ics_skf_worker, [ICSOpts]),
?WORKER(router_ics_gateway_location_worker, [ICSOpts])
]}}.

%%====================================================================
Expand Down
Loading

0 comments on commit 117cf42

Please sign in to comment.