From 70b6ac869b7e7b9dcc023ee0352ee3c1b0f074fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E9=B9=8F=E4=B8=BE?= Date: Wed, 3 Aug 2022 12:08:19 +0800 Subject: [PATCH] Support application hot release --- src/esockd.erl | 25 ++++++++++++++++++++++++ src/esockd_listener.erl | 39 ++++++++++++++++++++++++++++++++++++++ src/esockd_sup.erl | 42 ++++++++++++++++++++++++++++++++++++++--- test/esockd_SUITE.erl | 18 ++++++++++++++++++ 4 files changed, 121 insertions(+), 3 deletions(-) diff --git a/src/esockd.erl b/src/esockd.erl index af01dcc2..f518cbbb 100644 --- a/src/esockd.erl +++ b/src/esockd.erl @@ -28,6 +28,13 @@ , close/1 ]). +%% Support Hot Release +-export([ close_port/1 + , close_port/2 + , resume_port/1 + , resume_port/2 + ]). + -export([ reopen/1 , reopen/2 ]). @@ -159,6 +166,24 @@ close({Proto, ListenOn}) when is_atom(Proto) -> close(Proto, ListenOn) when is_atom(Proto) -> esockd_sup:stop_listener(Proto, fixaddr(ListenOn)). +%% @doc just close port, don't kill esockd supervisor process made connection close +-spec(close_port({atom(), listen_on()}) -> ok | {error, term()}). +close_port({Proto, ListenOn}) when is_atom(Proto) -> + close_port(Proto, ListenOn). + +-spec close_port(atom(), listen_on()) -> ok | {error, term()}. +close_port(Proto, ListenOn) when is_atom(Proto) -> + esockd_sup:close_port(Proto, fixaddr(ListenOn)). + +%% @doc resume port when use close port +-spec(resume_port({atom(), listen_on()}) -> ok | {error, term()}). +resume_port({Proto, ListenOn}) when is_atom(Proto) -> + resume_port(Proto, ListenOn). + +-spec resume_port(atom(), listen_on()) -> ok | {error, term()}. +resume_port(Proto, ListenOn) when is_atom(Proto) -> + esockd_sup:resume_port(Proto, fixaddr(ListenOn)). + %% @doc Reopen the listener -spec(reopen({atom(), listen_on()}) -> {ok, pid()} | {error, term()}). reopen({Proto, ListenOn}) when is_atom(Proto) -> diff --git a/src/esockd_listener.erl b/src/esockd_listener.erl index 60f1ec38..3902bd58 100644 --- a/src/esockd_listener.erl +++ b/src/esockd_listener.erl @@ -24,6 +24,10 @@ -export([ options/1 , get_port/1 + , get_sock/1 + ]). + +-export([ resume_sock/2 ]). %% gen_server callbacks @@ -65,6 +69,16 @@ options(Listener) -> get_port(Listener) -> gen_server:call(Listener, get_port). +-spec(get_sock(pid()) -> inet:socket()). +get_sock(Listener) -> + gen_server:call(Listener, get_sock). + +-spec(resume_sock(pid(), pid()) -> ok | {error, term()}). +resume_sock(Listener, AcceptorSup) -> + case gen_server:call(Listener, {resume_sock, AcceptorSup}) of + ok -> ok; + {error, Reason} -> {error, Reason} + end. %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -109,6 +123,31 @@ handle_call(options, _From, State = #state{options = Opts}) -> handle_call(get_port, _From, State = #state{lport = LPort}) -> {reply, LPort, State}; +handle_call(get_sock, _From, State = #state{lsock = LSock}) -> + {reply, LSock, State}; + +handle_call({resume_sock, AcceptorSup}, _From, + State = #state{proto = Proto, + listen_on = ListenOn, + options = Opts}) -> + Port = port(ListenOn), + SockOpts = merge_addr(ListenOn, sockopts(Opts)), + case esockd_transport:listen(Port, [{active, false} | proplists:delete(active, SockOpts)]) of + {ok, LSock} -> + AcceptorNum = proplists:get_value(acceptors, Opts, ?ACCEPTOR_POOL), + lists:foreach(fun (_) -> + {ok, _APid} = esockd_acceptor_sup:start_acceptor(AcceptorSup, LSock) + end, lists:seq(1, AcceptorNum)), + {ok, {LAddr, LPort}} = inet:sockname(LSock), + %%error_logger:info_msg("~s listen on ~s:~p with ~p acceptors.~n", + %% [Proto, inet:ntoa(LAddr), LPort, AcceptorNum]), + {reply, ok, #state{lsock = LSock, laddr = LAddr, lport = LPort}}; + {error, Reason} -> + error_logger:error_msg("~s failed to resume port ~p - ~p (~s)", + [Proto, Port, Reason, inet:format_error(Reason)]), + {reply, Reason, State} + end; + handle_call(Req, _From, State) -> error_logger:error_msg("[~s] Unexpected call: ~p", [?MODULE, Req]), {noreply, State}. diff --git a/src/esockd_sup.erl b/src/esockd_sup.erl index 46eaf1f0..275188de 100644 --- a/src/esockd_sup.erl +++ b/src/esockd_sup.erl @@ -27,6 +27,10 @@ , restart_listener/2 ]). +-export([ close_port/2 + , resume_port/2 + ]). + -export([ listeners/0 , listener/1 , listener_and_module/1 @@ -93,7 +97,23 @@ stop_listener(Proto, ListenOn) -> case match_listeners(Proto, ListenOn) of [] -> {error, not_found}; Listeners -> - return_ok_or_error([terminate_and_delete(ChildId) || ChildId <- Listeners]) + return_ok_or_error([terminate_and_delete(ChildId) || {ChildId, _} <- Listeners]) + end. + +-spec(close_port(atom(), esockd:listen_on()) -> ok | {error, term()}). +close_port(Proto, ListenOn) -> + case match_listeners(Proto, ListenOn) of + [] -> {error, not_found}; + Listeners -> + return_ok_or_error([terminate_port(SupPid) || {_, SupPid} <- Listeners]) + end. + +-spec(resume_port(atom(), esockd:listen_on()) -> ok | {error, term()}). +resume_port(Proto, ListenOn) -> + case match_listeners(Proto, ListenOn) of + [] -> {error, not_found}; + Listeners -> + return_ok_or_error([resume(SupPid) || {_, SupPid} <- Listeners]) end. terminate_and_delete(ChildId) -> @@ -102,6 +122,22 @@ terminate_and_delete(ChildId) -> Error -> Error end. +-spec(terminate_port(pid()) -> ok | {error, tcp_close}). +terminate_port(ListenerSup) -> + try + Listener = esockd_listener_sup:listener(ListenerSup), + LSock = esockd_listener:get_sock(Listener), + gen_tcp:close(LSock) + catch _:_ -> + {error, tcp_close} + end. + +-spec resume(pid()) -> ok | {error, term()}. +resume(ListenerSup) -> + Listener = esockd_listener_sup:listener(ListenerSup), + AcceptorSup = esockd_listener_sup:acceptor_sup(ListenerSup), + esockd_listener:resume_sock(Listener, AcceptorSup). + -spec(listeners() -> [{term(), pid()}]). listeners() -> [{Id, Pid} || {{listener_sup, Id}, Pid, _Type, _} <- supervisor:which_children(?MODULE)]. @@ -129,7 +165,7 @@ restart_listener(Proto, ListenOn) -> case match_listeners(Proto, ListenOn) of [] -> {error, not_found}; Listeners -> - return_ok_or_error([terminate_and_restart(ChildId) || ChildId <- Listeners]) + return_ok_or_error([terminate_and_restart(ChildId) || {ChildId, _} <- Listeners]) end. terminate_and_restart(ChildId) -> @@ -139,7 +175,7 @@ terminate_and_restart(ChildId) -> end. match_listeners(Proto, ListenOn) -> - [ChildId || {ChildId, _Pid, _Type, _} <- supervisor:which_children(?MODULE), + [{ChildId, Pid} || {ChildId, Pid, _Type, _} <- supervisor:which_children(?MODULE), match_listener(Proto, ListenOn, ChildId)]. match_listener(Proto, ListenOn, {listener_sup, {Proto, ListenOn}}) -> diff --git a/test/esockd_SUITE.erl b/test/esockd_SUITE.erl index 39786404..1e98a7f4 100644 --- a/test/esockd_SUITE.erl +++ b/test/esockd_SUITE.erl @@ -347,6 +347,24 @@ t_allow_deny(_) -> ], esockd:get_access_rules({udp_echo, 7001})), ok = esockd:close(udp_echo, 7001). +t_close_port_and_resume_port(_) -> + {ok, _LSup} = esockd:open(echo, {"127.0.0.1", 5000}, [binary, {packet, raw}], + {echo_server, start_link, []}), + {ok, Sock} = gen_tcp:connect("127.0.0.1", 5000, [binary, {active, false}]), + ok = gen_tcp:send(Sock, <<"Hello">>), + {ok, <<"Hello">>} = gen_tcp:recv(Sock, 0), + ok = esockd:close_port(echo, {"127.0.0.1", 5000}), + ok = gen_tcp:send(Sock, <<"Hello">>), + {ok, <<"Hello">>} = gen_tcp:recv(Sock, 0), + {error, econnrefused} = gen_tcp:connect("127.0.0.1", 5000, [binary, {active, false}]), + esockd:resume_port(echo, {"127.0.0.1", 5000}), + {ok, Sock1} = gen_tcp:connect("127.0.0.1", 5000, [binary, {active, false}]), + ok = gen_tcp:send(Sock1, <<"Hello">>), + {ok, <<"Hello">>} = gen_tcp:recv(Sock1, 0), + ok = gen_tcp:send(Sock, <<"Hello">>), + {ok, <<"Hello">>} = gen_tcp:recv(Sock, 0), + ok = esockd:close(echo, {"127.0.0.1", 5000}). + t_ulimit(_) -> ?assert(is_integer(esockd:ulimit())).