Skip to content

Commit

Permalink
Merge pull request #189 from lafirest/feat/hc
Browse files Browse the repository at this point in the history
feat(udp): Support UDP port health check
  • Loading branch information
lafirest authored Jul 12, 2024
2 parents 3251771 + 044df43 commit 4ec0382
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 14 deletions.
44 changes: 34 additions & 10 deletions src/esockd_udp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@
peers :: map(),
options :: [esockd:option()],
access_rules :: list(),
mfa :: esockd:mfargs()
mfa :: esockd:mfargs(),
health_check_request :: maybe(binary()),
health_check_reply :: maybe(binary())
}).

-define(ACTIVE_N, 100).
Expand Down Expand Up @@ -169,15 +171,16 @@ init([Proto, ListenOn, Opts]) ->
ok = inet:setopts(Sock, [{active, 1}]),
Limiter = conn_rate_limiter(conn_limiter_opts(Opts, {listener, Proto, ListenOn})),
MaxPeers = proplists:get_value(max_connections, Opts, infinity),
{ok, #state{proto = Proto,
sock = Sock,
port = Port,
max_peers = MaxPeers,
peers = #{},
access_rules = AccessRules,
conn_limiter = Limiter,
options = Opts,
mfa = MFA}};
init_health_check(#state{proto = Proto,
sock = Sock,
port = Port,
max_peers = MaxPeers,
peers = #{},
access_rules = AccessRules,
conn_limiter = Limiter,
options = Opts,
mfa = MFA},
Opts);
{error, Reason} ->
{stop, Reason}
end.
Expand Down Expand Up @@ -238,6 +241,17 @@ handle_cast(Msg, State) ->
?ERROR_MSG("Unexpected cast: ~p", [Msg]),
{noreply, State}.

handle_info({udp, Sock, IP, Port, Request},
State = #state{sock = Sock,
health_check_request = Request,
health_check_reply = Reply}) ->
case gen_udp:send(Sock, IP, Port, Reply) of
ok -> ok;
{error, Reason} ->
?ERROR_MSG("Health check response to: ~s failed, reason: ~s",
[esockd:format({IP, Port}), Reason])
end,
{noreply, State};
handle_info({udp, Sock, IP, InPortNo, Packet},
State = #state{sock = Sock, peers = Peers, access_rules = Rules}) ->
case maps:find(Peer = {IP, InPortNo}, Peers) of
Expand Down Expand Up @@ -370,3 +384,13 @@ raw({deny, CIDR = {_Start, _End, _Len}}) ->
{deny, esockd_cidr:to_string(CIDR)};
raw(Rule) ->
Rule.

init_health_check(State, Opts) ->
case proplists:get_value(health_check, Opts) of
#{request := Request, reply := Reply} when is_binary(Request), is_binary(Reply) ->
{ok, State#state{health_check_request = Request, health_check_reply = Reply}};
undefined ->
{ok, State};
Any ->
{error, {invalid_health_check_data, Any}}
end.
24 changes: 20 additions & 4 deletions test/esockd_udp_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-include_lib("eunit/include/eunit.hrl").
-record(state, {proto, sock, port, rate_limit,
conn_limiter, limit_timer, max_peers,
peers, options, access_rules, mfa}).
peers, options, access_rules, mfa, health_check_request, health_check_reply}).

all() -> esockd_ct:all(?MODULE).

Expand Down Expand Up @@ -88,17 +88,33 @@ t_udp_error(_) ->
end, Peers)
end).

t_health_check(_) ->
with_udp_server(fun(_Srv, Port) ->
{ok, Sock} = gen_udp:open(0, [binary, {active, false}]),
ok = udp_send_and_recv(Sock, Port, <<"hello">>),
ok = udp_send_and_recv(Sock, Port, <<"request">>, <<"reply">>)
end,
[{health_check, #{request => <<"request">>, reply => <<"reply">>}}]).

udp_send_and_recv(Sock, Port, Data) ->
ok = gen_udp:send(Sock, {127,0,0,1}, Port, Data),
{ok, {_Addr, Port, Data}} = gen_udp:recv(Sock, 0),
udp_send_and_recv(Sock, Port, Data, Data).

udp_send_and_recv(Sock, Port, Send, Recv) ->
ok = gen_udp:send(Sock, {127,0,0,1}, Port, Send),
{ok, {_Addr, Port, Recv}} = gen_udp:recv(Sock, 0),
ok.

with_udp_server(TestFun) ->
with_udp_server(TestFun, []).

with_udp_server(TestFun, ExtraOpts) ->
dbg:tracer(),
dbg:p(all, c),
dbg:tpl({emqx_connection_sup, '_', '_'}, x),
MFA = {?MODULE, udp_echo_init},
{ok, Srv} = esockd_udp:server(test, {{127,0,0,1}, 6000}, [{connection_mfargs, MFA}]),
{ok, Srv} = esockd_udp:server(test,
{{127,0,0,1}, 6000},
[{connection_mfargs, MFA}] ++ ExtraOpts),
TestFun(Srv, 6000),
is_process_alive(Srv) andalso (ok = esockd_udp:stop(Srv)).

Expand Down

0 comments on commit 4ec0382

Please sign in to comment.