From 655e36139469255ddb1487949b2653206747a708 Mon Sep 17 00:00:00 2001 From: zmstone Date: Sat, 9 Nov 2024 12:07:34 +0100 Subject: [PATCH] fix: enhance connection rate limit Prior to this change, if connection rate is limited, the acceptors will enter suspending state and stop accepting the sockets leaving the sockets in the system backlog. If the acceptor backlog (default=1024) is filled up, for long enough time to cause the majority of the clients to have closed socket from their end and try to reconnect aggressively, the acceptor may never be able to get a normal socket again. The fix is: in suspending state, accept the sockets and immediately cose them to free up the backlog. The close triggers TCP-RST to cut the TCP graceful close overheads. --- src/esockd_acceptor.erl | 100 ++++++++++++++++++++++++++++------------ 1 file changed, 71 insertions(+), 29 deletions(-) diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index a672947..b309e4d 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -60,7 +60,7 @@ upgrade_funs :: [esockd:sock_fun()], conn_limiter :: undefined | esockd_generic_limiter:limiter(), conn_sup :: pid(), - accept_ref :: term() + accept_ref = no_ref :: term() }). %% @doc Start an acceptor @@ -121,7 +121,10 @@ init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter, LSock]) -> }, {next_event, internal, begin_waiting}}. -handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock}) -> +handle_event(internal, begin_waiting, waiting, #state{accept_ref = Ref}) when Ref =/= no_ref -> + %% already waiting + keep_state_and_data; +handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock, accept_ref = no_ref}) -> case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {keep_state, State#state{accept_ref = Ref}}; @@ -129,13 +132,28 @@ handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock}) -> Reason =:= emfile; Reason =:= enfile -> - {next_state, suspending, State, {state_timeout, 1000, begin_waiting}}; + start_suspending(State, 1000); {error, econnaborted} -> {next_state, waiting, State, {next_event, internal, begin_waiting}}; {error, closed} -> {stop, normal, State}; {error, Reason} -> - error_logger:error_msg("~p async_accept error: ~p", [?MODULE, Reason]), + {stop, Reason, State} + end; +handle_event(internal, accept_and_close, suspending, State = #state{lsock = LSock}) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> + {keep_state, State#state{accept_ref = Ref}}; + {error, Reason} when + Reason =:= emfile; + Reason =:= enfile + -> + {keep_state_and_data, {next_event, internal, accept_and_close}}; + {error, econnaborted} -> + {keep_state_and_data, {next_event, internal, accept_and_close}}; + {error, closed} -> + {stop, normal, State}; + {error, Reason} -> {stop, Reason, State} end; handle_event( @@ -144,17 +162,27 @@ handle_event( waiting, State = #state{lsock = LSock, accept_ref = Ref} ) -> - {next_state, token_request, State, {next_event, internal, {token_request, Sock}}}; + NextEvent = {next_event, internal, {token_request, Sock}}, + {next_state, token_request, State#state{accept_ref = no_ref}, NextEvent}; +handle_event( + info, + {inet_async, LSock, Ref, {ok, Sock}}, + suspending, + State = #state{lsock = LSock, accept_ref = Ref} +) -> + _ = close(Sock), + NextEvent = {next_event, internal, accept_and_close}, + {keep_state, State#state{accept_ref = no_ref}, NextEvent}; handle_event( - internal, {token_request, Sock} = Content, token_request, State = #state{conn_limiter = Limiter} + internal, {token_request, Sock}, token_request, State = #state{conn_limiter = Limiter} ) -> case esockd_generic_limiter:consume(1, Limiter) of {ok, Limiter2} -> {next_state, accepting, State#state{conn_limiter = Limiter2}, {next_event, internal, {accept, Sock}}}; {pause, PauseTime, Limiter2} -> - {next_state, suspending, State#state{conn_limiter = Limiter2}, - {state_timeout, PauseTime, Content}} + _ = close(Sock), + start_suspending(State#state{conn_limiter = Limiter2}, PauseTime) end; handle_event( internal, @@ -181,30 +209,28 @@ handle_event( {ok, _Pid} -> ok; {error, Reason} -> - handle_accept_error(Reason, "Failed to start connection on ~s: ~p", State), + handle_accept_error(Reason, "failed_to_start_connection_process", State), close(Sock) end; {error, Reason} -> - handle_accept_error(Reason, "Tune buffer failed on ~s: ~s", State), + handle_accept_error(Reason, "failed_to_apply_tune_funcs", State), close(Sock) end, {next_state, waiting, State, {next_event, internal, begin_waiting}}; -handle_event(state_timeout, {token_request, _} = Content, suspending, State) -> - {next_state, token_request, State, {next_event, internal, Content}}; handle_event(state_timeout, begin_waiting, suspending, State) -> {next_state, waiting, State, {next_event, internal, begin_waiting}}; handle_event( info, {inet_async, LSock, Ref, {error, Reason}}, - _, + StateName, State = #state{lsock = LSock, accept_ref = Ref} ) -> - handle_socket_error(Reason, State); + handle_socket_error(Reason, State#state{accept_ref = no_ref}, StateName); handle_event(Type, Content, StateName, _) -> - error_logger:warning_msg( - "Unhandled message, State:~p, Type:~p Content:~p", - [StateName, Type, Content] - ), + logger:log(warning, #{msg => "esockd_acceptor_unhandled_event", + state_name => StateName, + event_type => Type, + event_content => Content}), keep_state_and_data. terminate(normal, _StateName, #state{}) -> @@ -212,7 +238,8 @@ terminate(normal, _StateName, #state{}) -> terminate(shutdown, _StateName, #state{}) -> ok; terminate(Reason, _StateName, #state{}) -> - error_logger:error_msg("~p terminating due to ~p", [?MODULE, Reason]), + logger:log(error, #{msg => "esockd_acceptor_terminating", + reaseon => Reason}), ok. code_change(_OldVsn, StateName, State, _Extra) -> @@ -224,6 +251,7 @@ code_change(_OldVsn, StateName, State, _Extra) -> close(Sock) -> try + %% port-close leads to a TPC reset which cuts out the tcp graceful close overheads true = port_close(Sock), receive {'EXIT', Sock, _} -> ok after 1 -> ok end catch @@ -241,28 +269,42 @@ handle_accept_error(overloaded, _, #state{proto = Proto, listen_on = ListenOn}) esockd_server:inc_stats({Proto, ListenOn}, closed_overloaded, 1), ok; handle_accept_error(Reason, Msg, #state{sockname = Sockname}) -> - error_logger:error_msg(Msg, [esockd:format(Sockname), Reason]). + logger:log(error, #{msg => Msg, + listener => esockd:format(Sockname), + cause => Reason}). -handle_socket_error(closed, State) -> +handle_socket_error(closed, State, _StateName) -> {stop, normal, State}; %% {error, econnaborted} -> accept %% {error, esslaccept} -> accept %% {error, esslaccept} -> accept -handle_socket_error(Reason, State) when Reason =:= econnaborted; Reason =:= esslaccept -> +handle_socket_error(Reason, State, suspending) when Reason =:= econnaborted; Reason =:= esslaccept -> + {keep_state, State, {next_event, internal, accept_and_close}}; +handle_socket_error(Reason, State, _StateName) when Reason =:= econnaborted; Reason =:= esslaccept -> {next_state, waiting, State, {next_event, internal, begin_waiting}}; %% emfile: The per-process limit of open file descriptors has been reached. %% enfile: The system limit on the total number of open files has been reached. %% enfile: The system limit on the total number of open files has been reached. -handle_socket_error(Reason, State) when Reason =:= emfile; Reason =:= enfile -> - error_logger:error_msg( - "Accept error on ~s: ~s", - [esockd:format(State#state.sockname), explain_posix(Reason)] - ), - {next_state, suspending, State, {state_timeout, 1000, begin_waiting}}; -handle_socket_error(Reason, State) -> +handle_socket_error(Reason, State, suspending) when Reason =:= emfile; Reason =:= enfile -> + log_system_limit(State, Reason), + {keep_state, State, {next_event, internal, accept_and_close}}; +handle_socket_error(Reason, State, _StateName) when Reason =:= emfile; Reason =:= enfile -> + log_system_limit(State, Reason), + start_suspending(State, 1000); +handle_socket_error(Reason, State, _StateName) -> {stop, Reason, State}. explain_posix(emfile) -> "EMFILE (Too many open files)"; explain_posix(enfile) -> "ENFILE (File table overflow)". + +log_system_limit(State, Reason) -> + logger:log(error, #{msg => "cannot_accept_more_connections", + listener => esockd:format(State#state.sockname), + cause => explain_posix(Reason)}). + +start_suspending(State, Timeout) -> + Actions = [{next_event, internal, accept_and_close}, + {state_timeout, Timeout, begin_waiting}], + {next_state, suspending, State, Actions}.