Skip to content

Commit

Permalink
Merge pull request #190 from lafirest/fix/proxy_release
Browse files Browse the repository at this point in the history
fix: refactor the udp proxy
  • Loading branch information
lafirest authored Jul 13, 2024
2 parents 4ec0382 + 8478e68 commit 7e9e98f
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 94 deletions.
1 change: 1 addition & 0 deletions include/esockd_proxy.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
-type get_connection_id_result() ::
%% send decoded packet
{ok, connection_id(), connection_packet(), connection_state()}
| {error, binary()}
| invalid.

-type connection_options() :: #{
Expand Down
10 changes: 10 additions & 0 deletions src/esockd_udp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
, code_change/3
]).

-export([proxy_request/1]).

-type(maybe(T) :: undefined | T).

-record(state, {
Expand Down Expand Up @@ -106,6 +108,10 @@ count_peers(Pid) ->
-spec(stop(pid()) -> ok).
stop(Pid) -> gen_server:stop(Pid).

proxy_request(Fun) ->
Parent = gen:get_parent(),
gen_server:call(Parent, {?FUNCTION_NAME, Fun}, infinity).

%%--------------------------------------------------------------------
%% GET/SET APIs
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -233,6 +239,10 @@ handle_call(which_children, _From, State = #state{peers = Peers, mfa = {Mod, _Fu
{reply, [{undefined, Pid, worker, [Mod]}
|| Pid <- maps:keys(Peers), is_pid(Pid), erlang:is_process_alive(Pid)], State};

handle_call({proxy_request, Fun}, _From, State) ->
Result = Fun(),
{reply, Result, State};

handle_call(Req, _From, State) ->
?ERROR_MSG("Unexpected call: ~p", [Req]),
{reply, ignore, State}.
Expand Down
129 changes: 79 additions & 50 deletions src/udp_proxy/esockd_udp_proxy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
-include("include/esockd_proxy.hrl").

%% API
-export([start_link/3, send/2, close/1]).
-export([start_link/3, send/2, close/1, takeover/2]).

%% gen_server callbacks
-export([
Expand Down Expand Up @@ -50,6 +50,8 @@
connection_mod := connection_module(),
connection_id := connection_id() | undefined,
connection_state := connection_state(),
connection_pid := pid() | undefined,
connection_ref := reference() | undefined,
connection_options := connection_options(),
%% last source's connection active time
last_time := pos_integer(),
Expand All @@ -76,6 +78,10 @@ close(ProxyId) ->
ok
end.

takeover(ProxyId, CId) ->
_ = gen_server:cast(ProxyId, {?FUNCTION_NAME, CId}),
ok.

%%--------------------------------------------------------------------
%%- gen_server callbacks
%%--------------------------------------------------------------------
Expand All @@ -88,7 +94,9 @@ init([Transport, Peer, #{esockd_proxy_opts := Opts} = COpts]) ->
connection_mod => Mod,
connection_options => COpts,
connection_state => esockd_udp_proxy_connection:initialize(Mod, COpts),
connection_id => undefined
connection_id => undefined,
connection_pid => undefined,
connection_ref => undefined
}).

handle_call(close, _From, State) ->
Expand All @@ -105,14 +113,18 @@ handle_cast({send, Data}, #{transport := Transport, peer := Peer} = State) ->
?ERROR_MSG("Send failed, Reason: ~0p", [Reason]),
{stop, {sock_error, Reason}, State}
end;
handle_cast({takeover, CId}, #{connection_id := CId} = State) ->
{stop, {shutdown, takeover}, State};
handle_cast({takeover, _CId}, State) ->
{noreply, State};
handle_cast(Request, State) ->
?ERROR_MSG("Unexpected cast: ~p", [Request]),
{noreply, State}.

handle_info({datagram, _SockPid, Data}, State) ->
{noreply, handle_incoming(Data, State)};
handle_incoming(Data, State);
handle_info({ssl, _Socket, Data}, State) ->
{noreply, handle_incoming(Data, State)};
handle_incoming(Data, State);
handle_info({heartbeat, Span}, #{last_time := LastTime} = State) ->
Now = ?NOW,
case Now - LastTime > Span of
Expand All @@ -127,7 +139,7 @@ handle_info({ssl_error, _Sock, Reason}, State) ->
handle_info({ssl_closed, _Sock}, State) ->
{stop, ssl_closed, socket_exit(State)};
handle_info(
{'DOWN', _, process, _, _Reason},
{'DOWN', _, process, _Pid, _Reason},
State
) ->
{stop, {shutdown, connection_closed}, State};
Expand All @@ -143,6 +155,8 @@ terminate(Reason, #{transport := Transport} = State) ->
false;
connection_closed ->
false;
takeover ->
false;
_ ->
true
end,
Expand All @@ -151,7 +165,7 @@ terminate(Reason, #{transport := Transport} = State) ->
%%--------------------------------------------------------------------
%%- Internal functions
%%--------------------------------------------------------------------
-spec handle_incoming(socket_packet(), state()) -> state().
-spec handle_incoming(socket_packet(), state()) -> _.
handle_incoming(
Data,
#{transport := Transport, peer := Peer, connection_mod := Mod, connection_state := CState} =
Expand All @@ -161,11 +175,17 @@ handle_incoming(
case esockd_udp_proxy_connection:get_connection_id(Mod, Transport, Peer, CState, Data) of
{ok, CId, Packet, CState2} ->
dispatch(Mod, CId, Data, Packet, State2#{connection_state := CState2});
{error, Reply} ->
?ERROR_MSG("Can't get connection id, Transport:~0p, Peer:~0p, Mod:~0p", [
Transport, Peer, Mod
]),
_ = send(Transport, Peer, Reply),
{stop, {shutdown, no_clientid}, State2};
invalid ->
?ERROR_MSG("Can't get connection id, Transport:~0p, Peer:~0p, Mod:~0p", [
Transport, Peer, Mod
]),
State2
{stop, {shutdown, no_clientid}, State2}
end.

-spec dispatch(
Expand All @@ -174,52 +194,62 @@ handle_incoming(
connection_id(),
connection_packet(),
state()
) ->
state().
) -> _.
dispatch(
Mod,
CId,
Data,
Packet,
#{
transport := Transport,
peer := Peer,
connection_state := CState,
connection_options := Opts
connection_state := CState
} =
State
) ->
case lookup(Mod, Transport, Peer, CId, Opts) of
case lookup(CId, State) of
{ok, Pid} ->
Result = attach(CId, State, Pid),
esockd_udp_proxy_connection:dispatch(
Mod, Pid, CState, {Transport, Data, Packet}
),
attach(CId, State);
{noreply, Result};
{error, Reason} ->
?ERROR_MSG("Dispatch failed, Reason:~0p", [Reason]),
State
{noreply, State}
end.

-spec attach(connection_id(), state()) -> state().
attach(CId, #{connection_mod := Mod, connection_id := undefined} = State) ->
-spec attach(connection_id(), state(), pid()) -> state().
attach(CId, #{connection_mod := Mod, connection_id := undefined} = State, Pid) ->
esockd_udp_proxy_db:attach(Mod, CId),
State#{connection_id := CId};
attach(CId, #{connection_id := OldId} = State) when CId =/= OldId ->
State2 = detach(State),
attach(CId, State2);
attach(_CId, State) ->
Ref = erlang:monitor(process, Pid),
State#{connection_id := CId, connection_pid := Pid, connection_ref := Ref};
attach(CId, #{connection_id := OldId} = State, Pid) when CId =/= OldId ->
State2 = detach(State, false),
attach(CId, State2, Pid);
attach(_CId, State, _Pid) ->
State.

-spec detach(state()) -> state().
detach(State) ->
detach(State, true).

-spec detach(state(), boolean()) -> state().
-spec detach(state()) -> state().
detach(#{connection_id := undefined} = State, _Clear) ->
State;
detach(#{connection_id := CId, connection_mod := Mod, connection_state := CState} = State, Clear) ->
case esockd_udp_proxy_db:detach(Mod, CId) of
{Clear, Pid} ->
detach(
#{
connection_id := CId,
connection_pid := Pid,
connection_ref := Ref,
connection_mod := Mod,
connection_state := CState
} = State,
Clear
) ->
erlang:demonitor(Ref),

Result = esockd_udp_proxy_db:detach(Mod, CId),
case Clear andalso Result of
true ->
case erlang:is_process_alive(Pid) of
true ->
esockd_udp_proxy_connection:close(Mod, Pid, CState);
Expand All @@ -229,7 +259,7 @@ detach(#{connection_id := CId, connection_mod := Mod, connection_state := CState
_ ->
ok
end,
State#{connection_id := undefined}.
State#{connection_id := undefined, connection_pid := undefined, connection_ref := undefined}.

-spec socket_exit(state()) -> state().
socket_exit(State) ->
Expand All @@ -240,28 +270,27 @@ heartbeat(Span) ->
erlang:send_after(timer:seconds(Span), self(), {?FUNCTION_NAME, Span}),
ok.

-spec lookup(
connection_module(),
proxy_transport(),
peer(),
connection_id(),
connection_options()
) -> {ok, pid()} | {error, Reason :: term()}.
lookup(Mod, Transport, Peer, CId, Opts) ->
case esockd_udp_proxy_db:lookup(Mod, CId) of
{ok, _} = Ok ->
Ok;
undefined ->
case esockd_udp_proxy_connection:create(Mod, Transport, Peer, Opts) of
{ok, Pid} ->
esockd_udp_proxy_db:insert(Mod, CId, Pid),
_ = erlang:monitor(process, Pid),
{ok, Pid};
ignore ->
{error, ignore};
Error ->
Error
end
-spec lookup(connection_id(), state()) -> {ok, pid()} | {error, Reason :: term()}.
lookup(_CId, #{connection_pid := Pid}) when is_pid(Pid) ->
{ok, Pid};
lookup(CId, #{
connection_pid := undefined,
connection_mod := Mod,
transport := Transport,
peer := Peer,
connection_options := Opts
}) ->
%% TODO: use proc_lib:start_link to instead of this call
Fun = fun() ->
esockd_udp_proxy_connection:find_or_create(Mod, CId, Transport, Peer, Opts)
end,
case esockd_udp:proxy_request(Fun) of
{ok, Pid} ->
{ok, Pid};
ignore ->
{error, ignore};
Error ->
Error
end.

-spec send(proxy_transport(), peer(), binary()) -> _.
Expand Down
9 changes: 5 additions & 4 deletions src/udp_proxy/esockd_udp_proxy_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

-export([
initialize/2,
create/4,
find_or_create/5,
get_connection_id/5,
dispatch/4,
close/3
Expand All @@ -34,7 +34,8 @@
-callback initialize(connection_options()) -> connection_state().

%% Create new connection
-callback create(proxy_transport(), peer(), connection_options()) -> gen_server:start_ret().
-callback find_or_create(connection_id(), proxy_transport(), peer(), connection_options()) ->
gen_server:start_ret().

%% Find routing information
-callback get_connection_id(
Expand All @@ -54,8 +55,8 @@
initialize(Mod, Opts) ->
Mod:initialize(Opts).

create(Mod, Transport, Peer, Opts) ->
Mod:create(Transport, Peer, Opts).
find_or_create(Mod, CId, Transport, Peer, Opts) ->
Mod:find_or_create(CId, Transport, Peer, Opts).

get_connection_id(Mod, Transport, Peer, State, Data) ->
Mod:get_connection_id(Transport, Peer, State, Data).
Expand Down
Loading

0 comments on commit 7e9e98f

Please sign in to comment.