diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index c11dc56..967967d 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -88,19 +88,26 @@ init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter, LSock]) -> callback_mode() -> state_functions. -accepting(internal, accept, State = #state{lsock = LSock}) -> +do_async_accept(StateName, State = #state{lsock = LSock}) -> case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {keep_state, State#state{accept_ref = Ref}}; {error, Reason} when Reason =:= emfile; Reason =:= enfile -> - {next_state, suspending, State, 1000}; + case StateName of + accepting -> + to_suspending(State, 10_000); + _ -> + {keep_state, State} + end; {error, closed} -> {stop, normal, State}; {error, Reason} -> {stop, Reason, State} - end; + end. +accepting(internal, accept, State) -> + do_async_accept(accepting, State); accepting({call, From}, {set_conn_limiter, Limiter}, State) -> {keep_state, State#state{conn_limiter = Limiter}, {reply, From, ok}}; @@ -148,14 +155,14 @@ accepting(info, {inet_async, LSock, Ref, {ok, Sock}}, rate_limit(State, Result); accepting(info, {inet_async, LSock, Ref, {error, closed}}, State = #state{lsock = LSock, accept_ref = Ref}) -> - {stop, normal, State}; + {stop, normal, State#state{accept_ref = false}}; %% {error, econnaborted} -> accept %% {error, esslaccept} -> accept accepting(info, {inet_async, LSock, Ref, {error, Reason}}, - #state{lsock = LSock, accept_ref = Ref}) + State = #state{lsock = LSock, accept_ref = Ref}) when Reason =:= econnaborted; Reason =:= esslaccept -> - {keep_state_and_data, {next_event, internal, accept}}; + {keep_state, State#state{accept_ref = false}, {next_event, internal, accept}}; %% emfile: The per-process limit of open file descriptors has been reached. %% enfile: The system limit on the total number of open files has been reached. @@ -164,17 +171,54 @@ accepting(info, {inet_async, LSock, Ref, {error, Reason}}, when Reason =:= emfile; Reason =:= enfile -> error_logger:error_msg("Accept error on ~s: ~s", [esockd:format(Sockname), esockd_utils:explain_posix(Reason)]), - {next_state, suspending, State, 1000}; - + to_suspending(State#state{accept_ref = false}, 10_000); accepting(info, {inet_async, LSock, Ref, {error, Reason}}, State = #state{lsock = LSock, accept_ref = Ref}) -> - {stop, Reason, State}. + {stop, Reason, State#state{accept_ref = false}}. suspending({call, From}, {set_conn_limiter, Limiter}, State) -> {keep_state, State#state{conn_limiter = Limiter}, {reply, From, ok}}; +suspending(info, start_accepting, State) -> + Actions = + case State of + #state{accept_ref = false} -> + {next_event, internal, accept}; + _ -> + [] + end, + {next_state, accepting, State, Actions}; +suspending(internal, accept_and_close, State) -> + do_async_accept(suspending, State); +suspending(info, {inet_async, _LSock, Ref, {ok, Sock}}, + #state{proto = Proto, + listen_on = ListenOn, + sockmod = SockMod, + accept_ref = Ref} = State) -> + %% make it look like gen_tcp:accept + inet_db:register_socket(Sock, SockMod), -suspending(timeout, _Timeout, State) -> - {next_state, accepting, State, {next_event, internal, accept}}. + %% Inc limited stats. + %% catch for hot-upgrade, the metrics is not initialized yet + _ = catch esockd_server:inc_stats({Proto, ListenOn}, limited, 1), + close(Sock), + {keep_state, State#state{accept_ref = false}, + {next_event, internal, accept_and_close}}; +suspending(info, {inet_async, LSock, Ref, {error, closed}}, + State = #state{lsock = LSock, accept_ref = Ref}) -> + {stop, normal, State#state{accept_ref = false}}; +suspending(info, {inet_async, LSock, Ref, {error, Reason}}, + #state{lsock = LSock, accept_ref = Ref} = State) + when Reason =:= econnaborted; Reason =:= esslaccept -> + {keep_state, State#state{accept_ref = false}, {next_event, internal, accept_and_close}}; +suspending(info, {inet_async, LSock, Ref, {error, Reason}}, + State = #state{lsock = LSock, sockname = Sockname, accept_ref = Ref}) + when Reason =:= emfile; Reason =:= enfile -> + error_logger:error_msg("Accept error on ~s: ~s", + [esockd:format(Sockname), esockd_utils:explain_posix(Reason)]), + {keep_state, State#state{accept_ref = false}}; +suspending(info, {inet_async, LSock, Ref, {error, Reason}}, + State = #state{lsock = LSock, accept_ref = Ref}) -> + {stop, Reason, State#state{accept_ref = false}}. terminate(_Reason, _StateName, _State) -> ok. @@ -186,12 +230,16 @@ code_change(_OldVsn, StateName, State, _Extra) -> %% Internal funcs %%-------------------------------------------------------------------- +to_suspending(State, Pause) -> + _ = erlang:send_after(Pause, self(), start_accepting), + {next_state, suspending, State, {next_event, internal, accept_and_close}}. + close(Sock) -> catch port_close(Sock). rate_limit(State = #state{conn_limiter = Limiter}, consume_limiter) -> case esockd_limiter:consume(Limiter, 1) of {I, Pause} when I =< 0 -> - {next_state, suspending, State, Pause}; + to_suspending(State, Pause); _ -> {keep_state, State, {next_event, internal, accept}} end; diff --git a/src/esockd_listener_sup.erl b/src/esockd_listener_sup.erl index 8de1906..2e90eeb 100644 --- a/src/esockd_listener_sup.erl +++ b/src/esockd_listener_sup.erl @@ -76,6 +76,7 @@ start_link(Type, Proto, ListenOn, Opts, MFA) -> %% Start acceptor sup ok = esockd_server:init_stats({Proto, ListenOn}, accepted), + ok = esockd_server:init_stats({Proto, ListenOn}, limited), TuneFun = tune_socket_fun(Opts), UpgradeFuns = upgrade_funs(Type, Opts), Limiter = conn_rate_limiter({listener, Proto, ListenOn}, conn_rate_opt(Opts)),