Skip to content

Commit

Permalink
refactor: accept and drop during suspend
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Oct 11, 2024
1 parent 380ea49 commit 72b0db8
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 12 deletions.
72 changes: 60 additions & 12 deletions src/esockd_acceptor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,26 @@ init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter, LSock]) ->

callback_mode() -> state_functions.

accepting(internal, accept, State = #state{lsock = LSock}) ->
do_async_accept(StateName, State = #state{lsock = LSock}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} ->
{keep_state, State#state{accept_ref = Ref}};
{error, Reason} when Reason =:= emfile;
Reason =:= enfile ->
{next_state, suspending, State, 1000};
case StateName of
accepting ->
to_suspending(State, 10_000);
_ ->
{keep_state, State}
end;
{error, closed} ->
{stop, normal, State};
{error, Reason} ->
{stop, Reason, State}
end;
end.

accepting(internal, accept, State) ->
do_async_accept(accepting, State);
accepting({call, From}, {set_conn_limiter, Limiter}, State) ->
{keep_state, State#state{conn_limiter = Limiter}, {reply, From, ok}};

Expand Down Expand Up @@ -148,14 +155,14 @@ accepting(info, {inet_async, LSock, Ref, {ok, Sock}},
rate_limit(State, Result);
accepting(info, {inet_async, LSock, Ref, {error, closed}},
State = #state{lsock = LSock, accept_ref = Ref}) ->
{stop, normal, State};
{stop, normal, State#state{accept_ref = false}};

%% {error, econnaborted} -> accept
%% {error, esslaccept} -> accept
accepting(info, {inet_async, LSock, Ref, {error, Reason}},
#state{lsock = LSock, accept_ref = Ref})
State = #state{lsock = LSock, accept_ref = Ref})
when Reason =:= econnaborted; Reason =:= esslaccept ->
{keep_state_and_data, {next_event, internal, accept}};
{keep_state, State#state{accept_ref = false}, {next_event, internal, accept}};

%% emfile: The per-process limit of open file descriptors has been reached.
%% enfile: The system limit on the total number of open files has been reached.
Expand All @@ -164,17 +171,54 @@ accepting(info, {inet_async, LSock, Ref, {error, Reason}},
when Reason =:= emfile; Reason =:= enfile ->
error_logger:error_msg("Accept error on ~s: ~s",
[esockd:format(Sockname), esockd_utils:explain_posix(Reason)]),
{next_state, suspending, State, 1000};

to_suspending(State#state{accept_ref = false}, 10_000);
accepting(info, {inet_async, LSock, Ref, {error, Reason}},
State = #state{lsock = LSock, accept_ref = Ref}) ->
{stop, Reason, State}.
{stop, Reason, State#state{accept_ref = false}}.

suspending({call, From}, {set_conn_limiter, Limiter}, State) ->
{keep_state, State#state{conn_limiter = Limiter}, {reply, From, ok}};
suspending(info, start_accepting, State) ->
Actions =
case State of
#state{accept_ref = false} ->
{next_event, internal, accept};
_ ->
[]
end,
{next_state, accepting, State, Actions};
suspending(internal, accept_and_close, State) ->
do_async_accept(suspending, State);
suspending(info, {inet_async, _LSock, Ref, {ok, Sock}},
#state{proto = Proto,
listen_on = ListenOn,
sockmod = SockMod,
accept_ref = Ref} = State) ->
%% make it look like gen_tcp:accept
inet_db:register_socket(Sock, SockMod),

suspending(timeout, _Timeout, State) ->
{next_state, accepting, State, {next_event, internal, accept}}.
%% Inc limited stats.
%% catch for hot-upgrade, the metrics is not initialized yet
_ = catch esockd_server:inc_stats({Proto, ListenOn}, limited, 1),
close(Sock),
{keep_state, State#state{accept_ref = false},
{next_event, internal, accept_and_close}};
suspending(info, {inet_async, LSock, Ref, {error, closed}},
State = #state{lsock = LSock, accept_ref = Ref}) ->
{stop, normal, State#state{accept_ref = false}};
suspending(info, {inet_async, LSock, Ref, {error, Reason}},
#state{lsock = LSock, accept_ref = Ref} = State)
when Reason =:= econnaborted; Reason =:= esslaccept ->
{keep_state, State#state{accept_ref = false}, {next_event, internal, accept_and_close}};
suspending(info, {inet_async, LSock, Ref, {error, Reason}},
State = #state{lsock = LSock, sockname = Sockname, accept_ref = Ref})
when Reason =:= emfile; Reason =:= enfile ->
error_logger:error_msg("Accept error on ~s: ~s",
[esockd:format(Sockname), esockd_utils:explain_posix(Reason)]),
{keep_state, State#state{accept_ref = false}};
suspending(info, {inet_async, LSock, Ref, {error, Reason}},
State = #state{lsock = LSock, accept_ref = Ref}) ->
{stop, Reason, State#state{accept_ref = false}}.

terminate(_Reason, _StateName, _State) ->
ok.
Expand All @@ -186,12 +230,16 @@ code_change(_OldVsn, StateName, State, _Extra) ->
%% Internal funcs
%%--------------------------------------------------------------------

to_suspending(State, Pause) ->
_ = erlang:send_after(Pause, self(), start_accepting),
{next_state, suspending, State, {next_event, internal, accept_and_close}}.

close(Sock) -> catch port_close(Sock).

rate_limit(State = #state{conn_limiter = Limiter}, consume_limiter) ->
case esockd_limiter:consume(Limiter, 1) of
{I, Pause} when I =< 0 ->
{next_state, suspending, State, Pause};
to_suspending(State, Pause);
_ ->
{keep_state, State, {next_event, internal, accept}}
end;
Expand Down
1 change: 1 addition & 0 deletions src/esockd_listener_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ start_link(Type, Proto, ListenOn, Opts, MFA) ->

%% Start acceptor sup
ok = esockd_server:init_stats({Proto, ListenOn}, accepted),
ok = esockd_server:init_stats({Proto, ListenOn}, limited),
TuneFun = tune_socket_fun(Opts),
UpgradeFuns = upgrade_funs(Type, Opts),
Limiter = conn_rate_limiter({listener, Proto, ListenOn}, conn_rate_opt(Opts)),
Expand Down

0 comments on commit 72b0db8

Please sign in to comment.