Skip to content

Commit

Permalink
fix: enhance connection rate limit
Browse files Browse the repository at this point in the history
Prior to this change, if connection rate is limited, the acceptors
will enter suspending state and stop accepting the sockets
leaving the sockets in the system backlog.

If the acceptor backlog (default=1024) is filled up, for long enough
time to cause the majority of the clients to have closed socket from
their end and try to reconnect aggressively, the acceptor may never
be able to get a normal socket again.

The fix is: in suspending state, accept the sockets and immediately
cose them to free up the backlog.
The close triggers TCP-RST to cut the TCP graceful close overheads.
  • Loading branch information
zmstone committed Nov 9, 2024
1 parent cf15a53 commit 655e361
Showing 1 changed file with 71 additions and 29 deletions.
100 changes: 71 additions & 29 deletions src/esockd_acceptor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
upgrade_funs :: [esockd:sock_fun()],
conn_limiter :: undefined | esockd_generic_limiter:limiter(),
conn_sup :: pid(),
accept_ref :: term()
accept_ref = no_ref :: term()
}).

%% @doc Start an acceptor
Expand Down Expand Up @@ -121,21 +121,39 @@ init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter, LSock]) ->
},
{next_event, internal, begin_waiting}}.

handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock}) ->
handle_event(internal, begin_waiting, waiting, #state{accept_ref = Ref}) when Ref =/= no_ref ->
%% already waiting
keep_state_and_data;
handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock, accept_ref = no_ref}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} ->
{keep_state, State#state{accept_ref = Ref}};
{error, Reason} when
Reason =:= emfile;
Reason =:= enfile
->
{next_state, suspending, State, {state_timeout, 1000, begin_waiting}};
start_suspending(State, 1000);
{error, econnaborted} ->
{next_state, waiting, State, {next_event, internal, begin_waiting}};
{error, closed} ->
{stop, normal, State};
{error, Reason} ->
error_logger:error_msg("~p async_accept error: ~p", [?MODULE, Reason]),
{stop, Reason, State}
end;
handle_event(internal, accept_and_close, suspending, State = #state{lsock = LSock}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} ->
{keep_state, State#state{accept_ref = Ref}};
{error, Reason} when
Reason =:= emfile;
Reason =:= enfile
->
{keep_state_and_data, {next_event, internal, accept_and_close}};
{error, econnaborted} ->
{keep_state_and_data, {next_event, internal, accept_and_close}};
{error, closed} ->
{stop, normal, State};
{error, Reason} ->
{stop, Reason, State}
end;
handle_event(
Expand All @@ -144,17 +162,27 @@ handle_event(
waiting,
State = #state{lsock = LSock, accept_ref = Ref}
) ->
{next_state, token_request, State, {next_event, internal, {token_request, Sock}}};
NextEvent = {next_event, internal, {token_request, Sock}},
{next_state, token_request, State#state{accept_ref = no_ref}, NextEvent};
handle_event(
info,
{inet_async, LSock, Ref, {ok, Sock}},
suspending,
State = #state{lsock = LSock, accept_ref = Ref}
) ->
_ = close(Sock),
NextEvent = {next_event, internal, accept_and_close},
{keep_state, State#state{accept_ref = no_ref}, NextEvent};
handle_event(
internal, {token_request, Sock} = Content, token_request, State = #state{conn_limiter = Limiter}
internal, {token_request, Sock}, token_request, State = #state{conn_limiter = Limiter}
) ->
case esockd_generic_limiter:consume(1, Limiter) of
{ok, Limiter2} ->
{next_state, accepting, State#state{conn_limiter = Limiter2},
{next_event, internal, {accept, Sock}}};
{pause, PauseTime, Limiter2} ->
{next_state, suspending, State#state{conn_limiter = Limiter2},
{state_timeout, PauseTime, Content}}
_ = close(Sock),
start_suspending(State#state{conn_limiter = Limiter2}, PauseTime)
end;
handle_event(
internal,
Expand All @@ -181,38 +209,37 @@ handle_event(
{ok, _Pid} ->
ok;
{error, Reason} ->
handle_accept_error(Reason, "Failed to start connection on ~s: ~p", State),
handle_accept_error(Reason, "failed_to_start_connection_process", State),
close(Sock)
end;
{error, Reason} ->
handle_accept_error(Reason, "Tune buffer failed on ~s: ~s", State),
handle_accept_error(Reason, "failed_to_apply_tune_funcs", State),
close(Sock)
end,
{next_state, waiting, State, {next_event, internal, begin_waiting}};
handle_event(state_timeout, {token_request, _} = Content, suspending, State) ->
{next_state, token_request, State, {next_event, internal, Content}};
handle_event(state_timeout, begin_waiting, suspending, State) ->
{next_state, waiting, State, {next_event, internal, begin_waiting}};
handle_event(
info,
{inet_async, LSock, Ref, {error, Reason}},
_,
StateName,
State = #state{lsock = LSock, accept_ref = Ref}
) ->
handle_socket_error(Reason, State);
handle_socket_error(Reason, State#state{accept_ref = no_ref}, StateName);
handle_event(Type, Content, StateName, _) ->
error_logger:warning_msg(
"Unhandled message, State:~p, Type:~p Content:~p",
[StateName, Type, Content]
),
logger:log(warning, #{msg => "esockd_acceptor_unhandled_event",
state_name => StateName,
event_type => Type,
event_content => Content}),
keep_state_and_data.

terminate(normal, _StateName, #state{}) ->
ok;
terminate(shutdown, _StateName, #state{}) ->
ok;
terminate(Reason, _StateName, #state{}) ->
error_logger:error_msg("~p terminating due to ~p", [?MODULE, Reason]),
logger:log(error, #{msg => "esockd_acceptor_terminating",
reaseon => Reason}),
ok.

code_change(_OldVsn, StateName, State, _Extra) ->
Expand All @@ -224,6 +251,7 @@ code_change(_OldVsn, StateName, State, _Extra) ->

close(Sock) ->
try
%% port-close leads to a TPC reset which cuts out the tcp graceful close overheads
true = port_close(Sock),
receive {'EXIT', Sock, _} -> ok after 1 -> ok end
catch
Expand All @@ -241,28 +269,42 @@ handle_accept_error(overloaded, _, #state{proto = Proto, listen_on = ListenOn})
esockd_server:inc_stats({Proto, ListenOn}, closed_overloaded, 1),
ok;
handle_accept_error(Reason, Msg, #state{sockname = Sockname}) ->
error_logger:error_msg(Msg, [esockd:format(Sockname), Reason]).
logger:log(error, #{msg => Msg,
listener => esockd:format(Sockname),
cause => Reason}).

handle_socket_error(closed, State) ->
handle_socket_error(closed, State, _StateName) ->
{stop, normal, State};
%% {error, econnaborted} -> accept
%% {error, esslaccept} -> accept
%% {error, esslaccept} -> accept
handle_socket_error(Reason, State) when Reason =:= econnaborted; Reason =:= esslaccept ->
handle_socket_error(Reason, State, suspending) when Reason =:= econnaborted; Reason =:= esslaccept ->
{keep_state, State, {next_event, internal, accept_and_close}};
handle_socket_error(Reason, State, _StateName) when Reason =:= econnaborted; Reason =:= esslaccept ->
{next_state, waiting, State, {next_event, internal, begin_waiting}};
%% emfile: The per-process limit of open file descriptors has been reached.
%% enfile: The system limit on the total number of open files has been reached.
%% enfile: The system limit on the total number of open files has been reached.
handle_socket_error(Reason, State) when Reason =:= emfile; Reason =:= enfile ->
error_logger:error_msg(
"Accept error on ~s: ~s",
[esockd:format(State#state.sockname), explain_posix(Reason)]
),
{next_state, suspending, State, {state_timeout, 1000, begin_waiting}};
handle_socket_error(Reason, State) ->
handle_socket_error(Reason, State, suspending) when Reason =:= emfile; Reason =:= enfile ->
log_system_limit(State, Reason),
{keep_state, State, {next_event, internal, accept_and_close}};
handle_socket_error(Reason, State, _StateName) when Reason =:= emfile; Reason =:= enfile ->
log_system_limit(State, Reason),
start_suspending(State, 1000);
handle_socket_error(Reason, State, _StateName) ->
{stop, Reason, State}.

explain_posix(emfile) ->
"EMFILE (Too many open files)";
explain_posix(enfile) ->
"ENFILE (File table overflow)".

log_system_limit(State, Reason) ->
logger:log(error, #{msg => "cannot_accept_more_connections",
listener => esockd:format(State#state.sockname),
cause => explain_posix(Reason)}).

start_suspending(State, Timeout) ->
Actions = [{next_event, internal, accept_and_close},
{state_timeout, Timeout, begin_waiting}],
{next_state, suspending, State, Actions}.

0 comments on commit 655e361

Please sign in to comment.