diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index a672947..b309e4d 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -60,7 +60,7 @@ upgrade_funs :: [esockd:sock_fun()], conn_limiter :: undefined | esockd_generic_limiter:limiter(), conn_sup :: pid(), - accept_ref :: term() + accept_ref = no_ref :: term() }). %% @doc Start an acceptor @@ -121,7 +121,10 @@ init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter, LSock]) -> }, {next_event, internal, begin_waiting}}. -handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock}) -> +handle_event(internal, begin_waiting, waiting, #state{accept_ref = Ref}) when Ref =/= no_ref -> + %% already waiting + keep_state_and_data; +handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock, accept_ref = no_ref}) -> case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {keep_state, State#state{accept_ref = Ref}}; @@ -129,13 +132,28 @@ handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock}) -> Reason =:= emfile; Reason =:= enfile -> - {next_state, suspending, State, {state_timeout, 1000, begin_waiting}}; + start_suspending(State, 1000); {error, econnaborted} -> {next_state, waiting, State, {next_event, internal, begin_waiting}}; {error, closed} -> {stop, normal, State}; {error, Reason} -> - error_logger:error_msg("~p async_accept error: ~p", [?MODULE, Reason]), + {stop, Reason, State} + end; +handle_event(internal, accept_and_close, suspending, State = #state{lsock = LSock}) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> + {keep_state, State#state{accept_ref = Ref}}; + {error, Reason} when + Reason =:= emfile; + Reason =:= enfile + -> + {keep_state_and_data, {next_event, internal, accept_and_close}}; + {error, econnaborted} -> + {keep_state_and_data, {next_event, internal, accept_and_close}}; + {error, closed} -> + {stop, normal, State}; + {error, Reason} -> {stop, Reason, State} end; handle_event( @@ -144,17 +162,27 @@ handle_event( waiting, State = #state{lsock = LSock, accept_ref = Ref} ) -> - {next_state, token_request, State, {next_event, internal, {token_request, Sock}}}; + NextEvent = {next_event, internal, {token_request, Sock}}, + {next_state, token_request, State#state{accept_ref = no_ref}, NextEvent}; +handle_event( + info, + {inet_async, LSock, Ref, {ok, Sock}}, + suspending, + State = #state{lsock = LSock, accept_ref = Ref} +) -> + _ = close(Sock), + NextEvent = {next_event, internal, accept_and_close}, + {keep_state, State#state{accept_ref = no_ref}, NextEvent}; handle_event( - internal, {token_request, Sock} = Content, token_request, State = #state{conn_limiter = Limiter} + internal, {token_request, Sock}, token_request, State = #state{conn_limiter = Limiter} ) -> case esockd_generic_limiter:consume(1, Limiter) of {ok, Limiter2} -> {next_state, accepting, State#state{conn_limiter = Limiter2}, {next_event, internal, {accept, Sock}}}; {pause, PauseTime, Limiter2} -> - {next_state, suspending, State#state{conn_limiter = Limiter2}, - {state_timeout, PauseTime, Content}} + _ = close(Sock), + start_suspending(State#state{conn_limiter = Limiter2}, PauseTime) end; handle_event( internal, @@ -181,30 +209,28 @@ handle_event( {ok, _Pid} -> ok; {error, Reason} -> - handle_accept_error(Reason, "Failed to start connection on ~s: ~p", State), + handle_accept_error(Reason, "failed_to_start_connection_process", State), close(Sock) end; {error, Reason} -> - handle_accept_error(Reason, "Tune buffer failed on ~s: ~s", State), + handle_accept_error(Reason, "failed_to_apply_tune_funcs", State), close(Sock) end, {next_state, waiting, State, {next_event, internal, begin_waiting}}; -handle_event(state_timeout, {token_request, _} = Content, suspending, State) -> - {next_state, token_request, State, {next_event, internal, Content}}; handle_event(state_timeout, begin_waiting, suspending, State) -> {next_state, waiting, State, {next_event, internal, begin_waiting}}; handle_event( info, {inet_async, LSock, Ref, {error, Reason}}, - _, + StateName, State = #state{lsock = LSock, accept_ref = Ref} ) -> - handle_socket_error(Reason, State); + handle_socket_error(Reason, State#state{accept_ref = no_ref}, StateName); handle_event(Type, Content, StateName, _) -> - error_logger:warning_msg( - "Unhandled message, State:~p, Type:~p Content:~p", - [StateName, Type, Content] - ), + logger:log(warning, #{msg => "esockd_acceptor_unhandled_event", + state_name => StateName, + event_type => Type, + event_content => Content}), keep_state_and_data. terminate(normal, _StateName, #state{}) -> @@ -212,7 +238,8 @@ terminate(normal, _StateName, #state{}) -> terminate(shutdown, _StateName, #state{}) -> ok; terminate(Reason, _StateName, #state{}) -> - error_logger:error_msg("~p terminating due to ~p", [?MODULE, Reason]), + logger:log(error, #{msg => "esockd_acceptor_terminating", + reaseon => Reason}), ok. code_change(_OldVsn, StateName, State, _Extra) -> @@ -224,6 +251,7 @@ code_change(_OldVsn, StateName, State, _Extra) -> close(Sock) -> try + %% port-close leads to a TPC reset which cuts out the tcp graceful close overheads true = port_close(Sock), receive {'EXIT', Sock, _} -> ok after 1 -> ok end catch @@ -241,28 +269,42 @@ handle_accept_error(overloaded, _, #state{proto = Proto, listen_on = ListenOn}) esockd_server:inc_stats({Proto, ListenOn}, closed_overloaded, 1), ok; handle_accept_error(Reason, Msg, #state{sockname = Sockname}) -> - error_logger:error_msg(Msg, [esockd:format(Sockname), Reason]). + logger:log(error, #{msg => Msg, + listener => esockd:format(Sockname), + cause => Reason}). -handle_socket_error(closed, State) -> +handle_socket_error(closed, State, _StateName) -> {stop, normal, State}; %% {error, econnaborted} -> accept %% {error, esslaccept} -> accept %% {error, esslaccept} -> accept -handle_socket_error(Reason, State) when Reason =:= econnaborted; Reason =:= esslaccept -> +handle_socket_error(Reason, State, suspending) when Reason =:= econnaborted; Reason =:= esslaccept -> + {keep_state, State, {next_event, internal, accept_and_close}}; +handle_socket_error(Reason, State, _StateName) when Reason =:= econnaborted; Reason =:= esslaccept -> {next_state, waiting, State, {next_event, internal, begin_waiting}}; %% emfile: The per-process limit of open file descriptors has been reached. %% enfile: The system limit on the total number of open files has been reached. %% enfile: The system limit on the total number of open files has been reached. -handle_socket_error(Reason, State) when Reason =:= emfile; Reason =:= enfile -> - error_logger:error_msg( - "Accept error on ~s: ~s", - [esockd:format(State#state.sockname), explain_posix(Reason)] - ), - {next_state, suspending, State, {state_timeout, 1000, begin_waiting}}; -handle_socket_error(Reason, State) -> +handle_socket_error(Reason, State, suspending) when Reason =:= emfile; Reason =:= enfile -> + log_system_limit(State, Reason), + {keep_state, State, {next_event, internal, accept_and_close}}; +handle_socket_error(Reason, State, _StateName) when Reason =:= emfile; Reason =:= enfile -> + log_system_limit(State, Reason), + start_suspending(State, 1000); +handle_socket_error(Reason, State, _StateName) -> {stop, Reason, State}. explain_posix(emfile) -> "EMFILE (Too many open files)"; explain_posix(enfile) -> "ENFILE (File table overflow)". + +log_system_limit(State, Reason) -> + logger:log(error, #{msg => "cannot_accept_more_connections", + listener => esockd:format(State#state.sockname), + cause => explain_posix(Reason)}). + +start_suspending(State, Timeout) -> + Actions = [{next_event, internal, accept_and_close}, + {state_timeout, Timeout, begin_waiting}], + {next_state, suspending, State, Actions}.