Skip to content

Commit

Permalink
feat: add content-sensitive proxy behaviour for UDP
Browse files Browse the repository at this point in the history
  • Loading branch information
lafirest committed Jul 8, 2024
1 parent 826f2a0 commit 4d93131
Show file tree
Hide file tree
Showing 10 changed files with 451 additions and 781 deletions.
62 changes: 62 additions & 0 deletions include/esockd_proxy.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-ifndef(ESOCKD_PROXY_HRL).
-define(ESOCKD_PROXY_HRL, true).

-define(SSL_TRANSPORT, esockd_transport).
-define(PROXY_TRANSPORT, esockd_udp_proxy).

-type proxy_id() :: pid().
-type socket_packet() :: binary().
-type socket() :: inet:socket() | ssl:sslsocket().

-type transport() :: {udp, pid(), socket()} | ?SSL_TRANSPORT.
-type address() :: {inet:ip_address(), inet:port_number()}.
-type peer() :: socket() | address().

-type connection_module() :: atom().
-type connection_state() :: term().
-type connection_packet() :: term().

-type connection_id() ::
peer()
| integer()
| string()
| binary().

-type proxy_packet() ::
{?PROXY_TRANSPORT, proxy_id(), binary(), connection_packet()}.

%% Routing information search results

%% send raw socket packet
-type get_connection_id_result() ::
%% send decoded packet
{ok, connection_id(), connection_packet(), connection_state()}
| invalid.

-type connection_options() :: #{
esockd_proxy_opts := proxy_options(),
atom() => term()
}.

-type proxy_options() :: #{
connection_mod := connection_module(),
heartbeat => non_neg_integer()
}.

-endif.
File renamed without changes.
File renamed without changes
223 changes: 223 additions & 0 deletions src/udp_proxy/esockd_udp_proxy.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(esockd_udp_proxy).

-behaviour(gen_server).

-include("include/esockd_proxy.hrl").

%% API
-export([start_link/3, send/2]).

%% gen_server callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2
]).

-export_type([connection_options/0]).

-define(NOW, erlang:system_time(second)).
-define(ERROR_MSG(Format, Args),
error_logger:error_msg("[~s]: " ++ Format, [?MODULE | Args])
).
-define(DEF_HEARTBEAT, 60).

-type timespan() :: non_neg_integer().

%%--------------------------------------------------------------------
%% Definitions
%%--------------------------------------------------------------------

-type state() :: #{
connection_mod := connection_module(),
connection_id := connection_id() | undefined,
connection_state := connection_state(),
connection_options := connection_options(),
%% last source's connection active time
last_time := pos_integer(),
transport := transport(),

peer := peer()
}.

%%--------------------------------------------------------------------
%%- API
%%--------------------------------------------------------------------

start_link(Transport, Peer, Opts) ->
gen_server:start_link(?MODULE, [Transport, Peer, Opts], []).

-spec send(proxy_id(), binary()) -> ok.
send(ProxyId, Data) ->
gen_server:cast(ProxyId, {send, Data}).

%%--------------------------------------------------------------------
%%- gen_server callbacks
%%--------------------------------------------------------------------

init([Transport, Peer, #{esockd_proxy_opts := Opts} = COpts]) ->
#{connection_mod := Mod} = Opts,
heartbeat(maps:get(heartbeat, Opts, ?DEF_HEARTBEAT)),
{ok, #{
last_time => ?NOW,
peer => Peer,
transport => Transport,
connection_mod => Mod,
connection_options => COpts,
connection_state => esockd_udp_proxy_connection:initialize(Mod, COpts),
connection_id => undefined
}}.

handle_call(Request, _From, State) ->
?ERROR_MSG("Unexpected call: ~p", [Request]),
{reply, ok, State}.

handle_cast({send, Data}, #{transport := Transport, peer := Peer} = State) ->
send(Transport, Peer, Data),
{noreply, State};
handle_cast(Request, State) ->
?ERROR_MSG("Unexpected cast: ~p", [Request]),
{noreply, State}.

handle_info({udp, Socket, IP, Port, Data}, State) ->
Transport = {udp, self(), Socket},
Peer = {IP, Port},
{noreply, handle_incoming(Transport, Peer, Socket, Data, State)};
handle_info({ssl, Socket, Data}, State) ->
{noreply, handle_incoming(?SSL_TRANSPORT, Socket, Socket, Data, State)};
handle_info({heartbeat, Span}, #{last_time := LastTime} = State) ->
Now = ?NOW,
case Now - LastTime > Span of
true ->
{stop, normal, State};
_ ->
heartbeat(Span),
{noreply, State}
end;
handle_info({ssl_error, _Sock, Reason}, State) ->
{stop, Reason, socket_exit(State)};
handle_info({ssl_closed, _Sock}, State) ->
{stop, ssl_closed, socket_exit(State)};
handle_info(
{'DOWN', _, process, _, _Reason},
State
) ->
{stop, connection_closed, State};
handle_info(Info, State) ->
?ERROR_MSG("Unexpected info: ~p", [Info]),
{noreply, State}.

terminate(Reason, State) ->
detach(State, Reason =/= connection_closed).

%%--------------------------------------------------------------------
%%- Internal functions
%%--------------------------------------------------------------------
-spec handle_incoming(transport(), peer(), socket(), socket_packet(), state()) -> state().
handle_incoming(
Transport, Peer, Socket, Data, #{connection_mod := Mod, connection_state := CState} = State
) ->
State2 = State#{peer := Peer, transport := Transport, last_time := ?NOW},
case
esockd_udp_proxy_connection:get_connection_id(Mod, Transport, Peer, Socket, CState, Data)
of
{ok, CId, Packet, CState2} ->
dispatch(Mod, CId, Data, Packet, State2#{connection_state := CState2});
invalid ->
State2
end.

-spec dispatch(
connection_module(),
connection_id(),
socket_packet(),
connection_packet(),
state()
) ->
state().
dispatch(
Mod,
CId,
Data,
Packet,
#{connection_state := CState, connection_options := Opts} = State
) ->
{ok, Pid} = lookup(Mod, CId, Opts),
esockd_udp_proxy_connection:dispatch(
Mod, Pid, CState, {?PROXY_TRANSPORT, self(), Data, Packet}
),
attach(CId, State).

-spec attach(connection_id(), state()) -> state().
attach(CId, #{connection_id := undefined} = State) ->
esockd_udp_proxy_db:attach(CId),
State#{connection_id := CId};
attach(CId, #{connection_id := OldId} = State) when CId =/= OldId ->
State2 = detach(State),
attach(CId, State2);
attach(_CId, State) ->
State.

-spec detach(state()) -> state().
detach(State) ->
detach(State, true).

-spec detach(connection_id(), state()) -> state().
detach(#{connection_id := undefined} = State, _Clear) ->
State;
detach(#{connection_id := CId, connection_mod := Mod, connection_state := CState} = State, Clear) ->
case esockd_udp_proxy_db:detach(CId) of
{Clear, Pid} ->
esockd_udp_proxy_connection:close(Mod, Pid, CState);
_ ->
ok
end,
State#{connection_id := undefined}.

-spec socket_exit(state()) -> state().
socket_exit(State) ->
detach(State).

-spec heartbeat(timespan()) -> ok.
heartbeat(Span) ->
erlang:send_after(self(), timer:seconds(Span), {?FUNCTION_NAME, Span}),
ok.

-spec lookup(
connection_module(),
connection_id(),
connection_options()
) -> {ok, pid()}.
lookup(Mod, CId, Opts) ->
case esockd_udp_proxy_db:lookup(CId) of
{ok, _} = Ok ->
Ok;
undefined ->
{ok, Pid} = esockd_udp_proxy_connection:create(Mod, ?PROXY_TRANSPORT, self(), Opts),
_ = erlang:monitor(process, Pid),
{ok, Pid}
end.

-spec send(transport(), peer(), binary()) -> _.
send({udp, _, Socket}, {IP, Port}, Data) ->
gen_udp:send(Socket, IP, Port, Data);
send(?SSL_TRANSPORT, Socket, Data) ->
socket:send(Socket, Data).
67 changes: 67 additions & 0 deletions src/udp_proxy/esockd_udp_proxy_connection.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(esockd_udp_proxy_connection).

-include("include/esockd_proxy.hrl").

-export([
initialize/2,
create/4,
get_connection_id_result/6,
dispatch/4,
close/3
]).

-export_type([connection_id/0, connection_module/0]).

%%--------------------------------------------------------------------
%%- Callbacks
%%--------------------------------------------------------------------
-callback initialize(connection_options()) -> connection_state().

%% Create new connection
-callback create(transport(), peer(), connection_options()) -> gen_server:start_ret().

%% Find routing information
-callback get_connection_id(
transport(), peer(), socket(), connection_state(), socket_packet()
) ->
get_connection_id_result().

%% Dispacth message
-callback dispatch(pid(), connection_state(), proxy_packet()) -> ok.

%% Close Connection
-callback close(pid(), connection_state()) -> ok.

%%--------------------------------------------------------------------
%%- API
%%--------------------------------------------------------------------
initialize(Mod, Opts) ->
Mod:initialize(Opts).

create(Mod, Transport, Peer, Opts) ->
Mod:create(Transport, Peer, Opts).

get_connection_id_result(Mod, Transport, Peer, Socket, State, Data) ->
Mod:get_connection_id_result(Transport, Peer, Socket, State, Data).

dispatch(Mod, Pid, State, Packet) ->
Mod:dispatch(Pid, State, Packet).

close(Mod, Pid, State) ->
Mod:close(Pid, State).
Loading

0 comments on commit 4d93131

Please sign in to comment.