From ffe013a4ba3434779a68c8b876dd38292e352c1d Mon Sep 17 00:00:00 2001 From: zmstone Date: Sat, 9 Nov 2024 12:07:34 +0100 Subject: [PATCH] feat: enhance connection rate limit Prior to this change, if connection rate is limited, the acceptors will enter suspending state and stop accepting the sockets leaving the sockets in the system backlog. If the acceptor backlog (default=1024) is filled up, for long enough time to cause the majority of the clients to have closed socket from their end and try to reconnect aggressively, the acceptor may never be able to get a normal socket again. The fix is: in suspending state, accept the sockets and immediately cose them to free up the backlog. The close triggers TCP-RST to cut the TCP graceful close overheads. --- src/esockd_acceptor.erl | 89 ++++++++++++++++++++++++++++++----------- 1 file changed, 65 insertions(+), 24 deletions(-) diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index a672947..b1a1e6b 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -60,7 +60,7 @@ upgrade_funs :: [esockd:sock_fun()], conn_limiter :: undefined | esockd_generic_limiter:limiter(), conn_sup :: pid(), - accept_ref :: term() + accept_ref = no_ref :: term() }). %% @doc Start an acceptor @@ -121,7 +121,10 @@ init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter, LSock]) -> }, {next_event, internal, begin_waiting}}. -handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock}) -> +handle_event(internal, begin_waiting, waiting, #state{accept_ref = Ref}) when Ref =/= no_ref -> + %% already waiting + keep_state_and_data; +handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock, accept_ref = no_ref}) -> case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {keep_state, State#state{accept_ref = Ref}}; @@ -129,13 +132,30 @@ handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock}) -> Reason =:= emfile; Reason =:= enfile -> - {next_state, suspending, State, {state_timeout, 1000, begin_waiting}}; + start_suspending(State, 1000); {error, econnaborted} -> {next_state, waiting, State, {next_event, internal, begin_waiting}}; {error, closed} -> {stop, normal, State}; {error, Reason} -> - error_logger:error_msg("~p async_accept error: ~p", [?MODULE, Reason]), + logger:log(error, "~p async_accept error: ~p", [?MODULE, Reason]), + {stop, Reason, State} + end; +handle_event(internal, accept_and_close, suspending, State = #state{lsock = LSock}) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> + {keep_state, State#state{accept_ref = Ref}}; + {error, Reason} when + Reason =:= emfile; + Reason =:= enfile + -> + {keep_state_and_data, {next_event, internal, accept_and_close}}; + {error, econnaborted} -> + {keep_state_and_data, {next_event, internal, accept_and_close}}; + {error, closed} -> + {stop, normal, State}; + {error, Reason} -> + logger:log(error, "~p async_accept error: ~p", [?MODULE, Reason]), {stop, Reason, State} end; handle_event( @@ -144,17 +164,27 @@ handle_event( waiting, State = #state{lsock = LSock, accept_ref = Ref} ) -> - {next_state, token_request, State, {next_event, internal, {token_request, Sock}}}; + NextEvent = {next_event, internal, {token_request, Sock}}, + {next_state, token_request, State#state{accept_ref = no_ref}, NextEvent}; +handle_event( + info, + {inet_async, LSock, Ref, {ok, Sock}}, + suspending, + State = #state{lsock = LSock, accept_ref = Ref} +) -> + _ = close(Sock), + NextEvent = {next_event, internal, accept_and_close}, + {keep_state, State#state{accept_ref = no_ref}, NextEvent}; handle_event( - internal, {token_request, Sock} = Content, token_request, State = #state{conn_limiter = Limiter} + internal, {token_request, Sock}, token_request, State = #state{conn_limiter = Limiter} ) -> case esockd_generic_limiter:consume(1, Limiter) of {ok, Limiter2} -> {next_state, accepting, State#state{conn_limiter = Limiter2}, {next_event, internal, {accept, Sock}}}; {pause, PauseTime, Limiter2} -> - {next_state, suspending, State#state{conn_limiter = Limiter2}, - {state_timeout, PauseTime, Content}} + _ = close(Sock), + start_suspending(State#state{conn_limiter = Limiter2}, PauseTime) end; handle_event( internal, @@ -189,19 +219,17 @@ handle_event( close(Sock) end, {next_state, waiting, State, {next_event, internal, begin_waiting}}; -handle_event(state_timeout, {token_request, _} = Content, suspending, State) -> - {next_state, token_request, State, {next_event, internal, Content}}; handle_event(state_timeout, begin_waiting, suspending, State) -> {next_state, waiting, State, {next_event, internal, begin_waiting}}; handle_event( info, {inet_async, LSock, Ref, {error, Reason}}, - _, + StateName, State = #state{lsock = LSock, accept_ref = Ref} ) -> - handle_socket_error(Reason, State); + handle_socket_error(Reason, State#state{accept_ref = no_ref}, StateName); handle_event(Type, Content, StateName, _) -> - error_logger:warning_msg( + logger:log(warning, "Unhandled message, State:~p, Type:~p Content:~p", [StateName, Type, Content] ), @@ -212,7 +240,7 @@ terminate(normal, _StateName, #state{}) -> terminate(shutdown, _StateName, #state{}) -> ok; terminate(Reason, _StateName, #state{}) -> - error_logger:error_msg("~p terminating due to ~p", [?MODULE, Reason]), + logger:log(error, "~p terminating due to ~p", [?MODULE, Reason]), ok. code_change(_OldVsn, StateName, State, _Extra) -> @@ -224,6 +252,7 @@ code_change(_OldVsn, StateName, State, _Extra) -> close(Sock) -> try + %% port-close leads to a TPC reset which cuts out the tcp graceful close overheads true = port_close(Sock), receive {'EXIT', Sock, _} -> ok after 1 -> ok end catch @@ -241,28 +270,40 @@ handle_accept_error(overloaded, _, #state{proto = Proto, listen_on = ListenOn}) esockd_server:inc_stats({Proto, ListenOn}, closed_overloaded, 1), ok; handle_accept_error(Reason, Msg, #state{sockname = Sockname}) -> - error_logger:error_msg(Msg, [esockd:format(Sockname), Reason]). + logger:log(error, Msg, [esockd:format(Sockname), Reason]). -handle_socket_error(closed, State) -> +handle_socket_error(closed, State, _StateName) -> {stop, normal, State}; %% {error, econnaborted} -> accept %% {error, esslaccept} -> accept %% {error, esslaccept} -> accept -handle_socket_error(Reason, State) when Reason =:= econnaborted; Reason =:= esslaccept -> +handle_socket_error(Reason, State, suspending) when Reason =:= econnaborted; Reason =:= esslaccept -> + {keep_state, State, {next_event, internal, accept_and_close}}; +handle_socket_error(Reason, State, _StateName) when Reason =:= econnaborted; Reason =:= esslaccept -> {next_state, waiting, State, {next_event, internal, begin_waiting}}; %% emfile: The per-process limit of open file descriptors has been reached. %% enfile: The system limit on the total number of open files has been reached. %% enfile: The system limit on the total number of open files has been reached. -handle_socket_error(Reason, State) when Reason =:= emfile; Reason =:= enfile -> - error_logger:error_msg( - "Accept error on ~s: ~s", - [esockd:format(State#state.sockname), explain_posix(Reason)] - ), - {next_state, suspending, State, {state_timeout, 1000, begin_waiting}}; -handle_socket_error(Reason, State) -> +handle_socket_error(Reason, State, suspending) when Reason =:= emfile; Reason =:= enfile -> + log_system_limit(State, Reason), + {keep_state, State, {next_event, internal, accept_and_close}}; +handle_socket_error(Reason, State, _StateName) when Reason =:= emfile; Reason =:= enfile -> + log_system_limit(State, Reason), + start_suspending(State, 1000); +handle_socket_error(Reason, State, _StateName) -> {stop, Reason, State}. explain_posix(emfile) -> "EMFILE (Too many open files)"; explain_posix(enfile) -> "ENFILE (File table overflow)". + +log_system_limit(State, Reason) -> + logger:log(error, #{msg => cannot_accept_more_connections, + acceptor => esockd:format(State#state.sockname), + cause => explain_posix(Reason)}). + +start_suspending(State, Timeout) -> + Actions = [{next_event, internal, accept_and_close}, + {state_timeout, Timeout, begin_waiting}], + {next_state, suspending, State, Actions}.