Skip to content

Commit

Permalink
feat: enhance connection rate limit
Browse files Browse the repository at this point in the history
Prior to this change, if connection rate is limited, the acceptors
will enter suspending state and stop accepting the sockets
leaving the sockets in the system backlog.

If the acceptor backlog (default=1024) is filled up, for long enough
time to cause the majority of the clients to have closed socket from
their end and try to reconnect aggressively, the acceptor may never
be able to get a normal socket again.

The fix is: in suspending state, accept the sockets and immediately
cose them to free up the backlog.
The close triggers TCP-RST to cut the TCP graceful close overheads.
  • Loading branch information
zmstone committed Nov 9, 2024
1 parent 81f4223 commit ffe013a
Showing 1 changed file with 65 additions and 24 deletions.
89 changes: 65 additions & 24 deletions src/esockd_acceptor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
upgrade_funs :: [esockd:sock_fun()],
conn_limiter :: undefined | esockd_generic_limiter:limiter(),
conn_sup :: pid(),
accept_ref :: term()
accept_ref = no_ref :: term()
}).

%% @doc Start an acceptor
Expand Down Expand Up @@ -121,21 +121,41 @@ init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter, LSock]) ->
},
{next_event, internal, begin_waiting}}.

handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock}) ->
handle_event(internal, begin_waiting, waiting, #state{accept_ref = Ref}) when Ref =/= no_ref ->
%% already waiting
keep_state_and_data;
handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock, accept_ref = no_ref}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} ->
{keep_state, State#state{accept_ref = Ref}};
{error, Reason} when
Reason =:= emfile;
Reason =:= enfile
->
{next_state, suspending, State, {state_timeout, 1000, begin_waiting}};
start_suspending(State, 1000);
{error, econnaborted} ->
{next_state, waiting, State, {next_event, internal, begin_waiting}};
{error, closed} ->
{stop, normal, State};
{error, Reason} ->
error_logger:error_msg("~p async_accept error: ~p", [?MODULE, Reason]),
logger:log(error, "~p async_accept error: ~p", [?MODULE, Reason]),
{stop, Reason, State}
end;
handle_event(internal, accept_and_close, suspending, State = #state{lsock = LSock}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} ->
{keep_state, State#state{accept_ref = Ref}};
{error, Reason} when
Reason =:= emfile;
Reason =:= enfile
->
{keep_state_and_data, {next_event, internal, accept_and_close}};
{error, econnaborted} ->
{keep_state_and_data, {next_event, internal, accept_and_close}};
{error, closed} ->
{stop, normal, State};
{error, Reason} ->
logger:log(error, "~p async_accept error: ~p", [?MODULE, Reason]),
{stop, Reason, State}
end;
handle_event(
Expand All @@ -144,17 +164,27 @@ handle_event(
waiting,
State = #state{lsock = LSock, accept_ref = Ref}
) ->
{next_state, token_request, State, {next_event, internal, {token_request, Sock}}};
NextEvent = {next_event, internal, {token_request, Sock}},
{next_state, token_request, State#state{accept_ref = no_ref}, NextEvent};
handle_event(
info,
{inet_async, LSock, Ref, {ok, Sock}},
suspending,
State = #state{lsock = LSock, accept_ref = Ref}
) ->
_ = close(Sock),
NextEvent = {next_event, internal, accept_and_close},
{keep_state, State#state{accept_ref = no_ref}, NextEvent};
handle_event(
internal, {token_request, Sock} = Content, token_request, State = #state{conn_limiter = Limiter}
internal, {token_request, Sock}, token_request, State = #state{conn_limiter = Limiter}
) ->
case esockd_generic_limiter:consume(1, Limiter) of
{ok, Limiter2} ->
{next_state, accepting, State#state{conn_limiter = Limiter2},
{next_event, internal, {accept, Sock}}};
{pause, PauseTime, Limiter2} ->
{next_state, suspending, State#state{conn_limiter = Limiter2},
{state_timeout, PauseTime, Content}}
_ = close(Sock),
start_suspending(State#state{conn_limiter = Limiter2}, PauseTime)
end;
handle_event(
internal,
Expand Down Expand Up @@ -189,19 +219,17 @@ handle_event(
close(Sock)
end,
{next_state, waiting, State, {next_event, internal, begin_waiting}};
handle_event(state_timeout, {token_request, _} = Content, suspending, State) ->
{next_state, token_request, State, {next_event, internal, Content}};
handle_event(state_timeout, begin_waiting, suspending, State) ->
{next_state, waiting, State, {next_event, internal, begin_waiting}};
handle_event(
info,
{inet_async, LSock, Ref, {error, Reason}},
_,
StateName,
State = #state{lsock = LSock, accept_ref = Ref}
) ->
handle_socket_error(Reason, State);
handle_socket_error(Reason, State#state{accept_ref = no_ref}, StateName);
handle_event(Type, Content, StateName, _) ->
error_logger:warning_msg(
logger:log(warning,
"Unhandled message, State:~p, Type:~p Content:~p",
[StateName, Type, Content]
),
Expand All @@ -212,7 +240,7 @@ terminate(normal, _StateName, #state{}) ->
terminate(shutdown, _StateName, #state{}) ->
ok;
terminate(Reason, _StateName, #state{}) ->
error_logger:error_msg("~p terminating due to ~p", [?MODULE, Reason]),
logger:log(error, "~p terminating due to ~p", [?MODULE, Reason]),
ok.

code_change(_OldVsn, StateName, State, _Extra) ->
Expand All @@ -224,6 +252,7 @@ code_change(_OldVsn, StateName, State, _Extra) ->

close(Sock) ->
try
%% port-close leads to a TPC reset which cuts out the tcp graceful close overheads
true = port_close(Sock),
receive {'EXIT', Sock, _} -> ok after 1 -> ok end
catch
Expand All @@ -241,28 +270,40 @@ handle_accept_error(overloaded, _, #state{proto = Proto, listen_on = ListenOn})
esockd_server:inc_stats({Proto, ListenOn}, closed_overloaded, 1),
ok;
handle_accept_error(Reason, Msg, #state{sockname = Sockname}) ->
error_logger:error_msg(Msg, [esockd:format(Sockname), Reason]).
logger:log(error, Msg, [esockd:format(Sockname), Reason]).

handle_socket_error(closed, State) ->
handle_socket_error(closed, State, _StateName) ->
{stop, normal, State};
%% {error, econnaborted} -> accept
%% {error, esslaccept} -> accept
%% {error, esslaccept} -> accept
handle_socket_error(Reason, State) when Reason =:= econnaborted; Reason =:= esslaccept ->
handle_socket_error(Reason, State, suspending) when Reason =:= econnaborted; Reason =:= esslaccept ->
{keep_state, State, {next_event, internal, accept_and_close}};
handle_socket_error(Reason, State, _StateName) when Reason =:= econnaborted; Reason =:= esslaccept ->
{next_state, waiting, State, {next_event, internal, begin_waiting}};
%% emfile: The per-process limit of open file descriptors has been reached.
%% enfile: The system limit on the total number of open files has been reached.
%% enfile: The system limit on the total number of open files has been reached.
handle_socket_error(Reason, State) when Reason =:= emfile; Reason =:= enfile ->
error_logger:error_msg(
"Accept error on ~s: ~s",
[esockd:format(State#state.sockname), explain_posix(Reason)]
),
{next_state, suspending, State, {state_timeout, 1000, begin_waiting}};
handle_socket_error(Reason, State) ->
handle_socket_error(Reason, State, suspending) when Reason =:= emfile; Reason =:= enfile ->
log_system_limit(State, Reason),
{keep_state, State, {next_event, internal, accept_and_close}};
handle_socket_error(Reason, State, _StateName) when Reason =:= emfile; Reason =:= enfile ->
log_system_limit(State, Reason),
start_suspending(State, 1000);
handle_socket_error(Reason, State, _StateName) ->
{stop, Reason, State}.

explain_posix(emfile) ->
"EMFILE (Too many open files)";
explain_posix(enfile) ->
"ENFILE (File table overflow)".

log_system_limit(State, Reason) ->
logger:log(error, #{msg => cannot_accept_more_connections,
acceptor => esockd:format(State#state.sockname),
cause => explain_posix(Reason)}).

start_suspending(State, Timeout) ->
Actions = [{next_event, internal, accept_and_close},
{state_timeout, Timeout, begin_waiting}],
{next_state, suspending, State, Actions}.

0 comments on commit ffe013a

Please sign in to comment.