Skip to content

Commit

Permalink
Merge pull request #182 from emqx/dev/william/improve-robustness
Browse files Browse the repository at this point in the history
improve robustness
  • Loading branch information
qzhuyan authored Nov 28, 2023
2 parents e150e09 + cbf63b5 commit 5cb22a8
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 12 deletions.
18 changes: 13 additions & 5 deletions .github/workflows/run_test_case.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,21 @@ jobs:
make eunit
make ct
make cover
- name: Coveralls
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
make coveralls
- uses: actions/upload-artifact@v1
if: failure()
with:
name: logs
path: _build/test/logs

coveralls:
runs-on: ubuntu-latest
container:
image: erlang:25
steps:
- uses: actions/checkout@v1
- name: Coveralls
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
rebar3 as test do eunit,ct,cover
make coveralls
10 changes: 8 additions & 2 deletions src/esockd_acceptor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,12 @@ handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock}) ->
Reason =:= enfile
->
{next_state, suspending, State, {state_timeout, 1000, begin_waiting}};
{error, econnaborted} ->
{next_state, waiting, State, {next_event, internal, begin_waiting}};
{error, closed} ->
{stop, normal, State};
{error, Reason} ->
error_logger:error_msg("~p async_accept error: ~p", [?MODULE, Reason]),
{stop, Reason, State}
end;
handle_event(
Expand Down Expand Up @@ -210,8 +213,11 @@ handle_event(Type, Content, StateName, _) ->
),
keep_state_and_data.

terminate(_Reason, _StateName, #state{lsock = LSock}) ->
close(LSock).
terminate(normal, _StateName, #state{}) ->
ok;
terminate(Reason, _StateName, #state{}) ->
error_logger:error_msg("~p terminating due to ~p", [?MODULE, Reason]),
ok.

code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
Expand Down
4 changes: 2 additions & 2 deletions src/esockd_acceptor_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ count_acceptors(AcceptorSup) ->

init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter]) ->
SupFlags = #{strategy => simple_one_for_one,
intensity => 100,
period => 3600
intensity => 100000,
period => 1
},
Acceptor = #{id => acceptor,
start => {esockd_acceptor, start_link,
Expand Down
1 change: 1 addition & 0 deletions src/esockd_dtls_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ get_port(Listener) ->
init({Proto, ListenOn, Opts, AcceptorSup}) ->
Port = port(ListenOn),
process_flag(trap_exit, true),
esockd_server:ensure_stats({Proto, ListenOn}),
SockOpts = merge_addr(ListenOn, dltsopts(Opts)),
%% Don't active the socket...
case ssl:listen(Port, SockOpts) of
Expand Down
12 changes: 12 additions & 0 deletions src/esockd_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

-export([ options/1
, get_port/1
, get_lsock/1
]).

%% gen_server callbacks
Expand Down Expand Up @@ -65,13 +66,18 @@ options(Listener) ->
get_port(Listener) ->
gen_server:call(Listener, get_port).

-spec get_lsock(pid()) -> inet:socket().
get_lsock(Listener) ->
gen_server:call(Listener, get_lsock).

%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------

init({Proto, ListenOn, Opts, AcceptorSup}) ->
Port = port(ListenOn),
process_flag(trap_exit, true),
esockd_server:ensure_stats({Proto, ListenOn}),
SockOpts = merge_addr(ListenOn, sockopts(Opts)),
%% Don't active the socket...
case esockd_transport:listen(Port, [{active, false} | proplists:delete(active, SockOpts)]) of
Expand Down Expand Up @@ -109,6 +115,9 @@ handle_call(options, _From, State = #state{options = Opts}) ->
handle_call(get_port, _From, State = #state{lport = LPort}) ->
{reply, LPort, State};

handle_call(get_lsock, _From, State = #state{lsock = LSock}) ->
{reply, LSock, State};

handle_call(Req, _From, State) ->
error_logger:error_msg("[~s] Unexpected call: ~p", [?MODULE, Req]),
{noreply, State}.
Expand All @@ -117,6 +126,9 @@ handle_cast(Msg, State) ->
error_logger:error_msg("[~s] Unexpected cast: ~p", [?MODULE, Msg]),
{noreply, State}.

handle_info({'EXIT', LSock, _}, #state{lsock = LSock} = State) ->
error_logger:error_msg("~s Lsocket ~p closed", [?MODULE, LSock]),
{stop, lsock_closed, State};
handle_info(Info, State) ->
error_logger:error_msg("[~s] Unexpected info: ~p", [?MODULE, Info]),
{noreply, State}.
Expand Down
3 changes: 0 additions & 3 deletions src/esockd_listener_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@ start_link(Type, Proto, ListenOn, Opts, MFA) ->
type => supervisor,
modules => [esockd_connection_sup]},
{ok, ConnSup} = supervisor:start_child(Sup, ConnSupSpec),

%% Start acceptor sup
ok = esockd_server:init_stats({Proto, ListenOn}, accepted),
ok = esockd_server:init_stats({Proto, ListenOn}, closed_overloaded),
TuneFun = tune_socket_fun(Opts),
UpgradeFuns = upgrade_funs(Type, Opts),
Limiter = conn_rate_limiter(conn_limiter_opts(Opts, {listener, Proto, ListenOn})),
Expand Down
5 changes: 5 additions & 0 deletions src/esockd_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
, inc_stats/3
, dec_stats/3
, del_stats/1
, ensure_stats/1
]).

%% gen_server callbacks
Expand Down Expand Up @@ -85,6 +86,10 @@ update_counter(Key, Num) ->
del_stats({Protocol, ListenOn}) ->
gen_server:cast(?SERVER, {del, {Protocol, ListenOn}}).

-spec ensure_stats({atom(), esockd:listen_on()}) -> ok.
ensure_stats(StatsKey) ->
ok = ?MODULE:init_stats(StatsKey, accepted),
ok = ?MODULE:init_stats(StatsKey, closed_overloaded).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
Expand Down
77 changes: 77 additions & 0 deletions test/esockd_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,83 @@ t_tune_fun_ok(_) ->
?assertEqual(1, proplists:get_value(accepted, Cnts)),
ok = esockd:close(Name, LPort).


t_listener_handle_port_exit_tcp(Config) ->
do_listener_handle_port_exit(Config, false).

t_listener_handle_port_exit_tls(Config) ->
do_listener_handle_port_exit(Config, true).

do_listener_handle_port_exit(Config, IsTls) ->
LPort = 7005,
Name = ?FUNCTION_NAME,
SslOpts = [{certfile, esockd_ct:certfile(Config)},
{keyfile, esockd_ct:keyfile(Config)},
{gc_after_handshake, true},
{verify, verify_none}
],
OpenOpts = case IsTls of
true ->
ssl:start(),
[{ssl_options, SslOpts}];
false -> []
end,
%% GIVEN: when listener is started
{ok, LSup} = esockd:open(Name, LPort, OpenOpts,
{echo_server, start_link, []}),
L = esockd_listener_sup:listener(LSup),
PortInUse = esockd_listener:get_port(L),
?assertEqual(LPort, PortInUse),
LSock = esockd_listener:get_lsock(L),
erlang:process_flag(trap_exit, true),
link(L),
Acceptors = get_acceptors(LSup),
?assertNotEqual([], Acceptors),

case IsTls of
true ->
{ok, ClientSock} = ssl:connect("localhost", LPort, [{verify, verify_none}
], 1000),
ssl:close(ClientSock);
false ->
{ok, ClientSock} = gen_tcp:connect("localhost", LPort, [], 1000),
ok = gen_tcp:close(ClientSock)
end,

timer:sleep(100),

%% WHEN: when port is closed
erlang:port_close(LSock),
%% THEN: listener process should EXIT, ABNORMALLY
receive
{'EXIT', L, lsock_closed} ->
ok
after 300 ->
ct:fail(listener_still_alive)
end,

%% THEN: listener should be restarted
NewListener = esockd_listener_sup:listener(LSup),
?assertNotEqual(L, NewListener),

%% THEN: listener should be listening on the same port
?assertEqual(PortInUse, esockd_listener:get_port(NewListener)),
?assertMatch({error, eaddrinuse}, gen_tcp:listen(PortInUse, [])),

%% THEN: New acceptors should be started with new LSock to accept,
%% (old acceptors should be terminated due to `closed')
NewAcceptors = get_acceptors(LSup),
?assertNotEqual([], NewAcceptors),
?assert(sets:is_empty(sets:intersection(sets:from_list(Acceptors), sets:from_list(NewAcceptors)))),

ok = esockd:close(Name, LPort).

%% helper
sock_tune_fun(Ret) ->
Ret.

-spec get_acceptors(supervisor:supervisor()) -> [Acceptor::pid()].
get_acceptors(LSup) ->
Children = supervisor:which_children(LSup),
{acceptor_sup, AcceptorSup, _, _} = lists:keyfind(acceptor_sup, 1, Children),
supervisor:which_children(AcceptorSup).

0 comments on commit 5cb22a8

Please sign in to comment.