Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: extract handling network data to separate function #505

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 112 additions & 95 deletions lib/supavisor/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@

key = {:secrets, tenant_or_alias, user}

case auth_secrets(info, user, key, :timer.hours(24)) do

Check warning on line 278 in lib/supavisor/client_handler.ex

View workflow job for this annotation

GitHub Actions / Code style

Function body is nested too deep (max depth is 2, was 3).
{:ok, auth_secrets} ->
Logger.debug("ClientHandler: Authentication method: #{inspect(auth_secrets)}")

Expand Down Expand Up @@ -334,7 +334,7 @@
Cachex.get(Supavisor.Cache, key) == {:ok, nil} do
case auth_secrets(info, data.user, key, 15_000) do
{:ok, {method2, secrets2}} = value ->
if method != method2 or Map.delete(secrets.(), :client_key) != secrets2.() do

Check warning on line 337 in lib/supavisor/client_handler.ex

View workflow job for this annotation

GitHub Actions / Code style

Function body is nested too deep (max depth is 2, was 4).
Logger.warning("ClientHandler: Update secrets and terminate pool")

Cachex.update(
Expand Down Expand Up @@ -506,103 +506,21 @@
{:keep_state_and_data, {:timeout, data.heartbeat_interval, :heartbeat_check}}
end

# handle Terminate message
def handle_event(:info, {proto, _, <<?X, 4::32>>}, :idle, _)
when proto in @proto do
Logger.info("ClientHandler: Terminate received from client")
{:stop, {:shutdown, :terminate_received}}
end

# handle Sync message
def handle_event(:info, {proto, _, <<?S, 4::32, _::binary>> = msg}, :idle, data)
when proto in @proto do
Logger.debug("ClientHandler: Receive sync")

# db_pid can be nil in transaction mode, so we will send ready_for_query
# without checking out a direct connection. If there is a linked db_pid,
# we will forward the message to it
if data.db_pid != nil,
do: :ok = sock_send_maybe_active_once(msg, data),
else: :ok = HandlerHelpers.sock_send(data.sock, Server.ready_for_query())

{:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
end

def handle_event(:info, {proto, _, <<?S, 4::32, _::binary>> = msg}, _, data)
when proto in @proto do
Logger.debug("ClientHandler: Receive sync while not idle")
:ok = sock_send_maybe_active_once(msg, data)
{:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
end

# handle Flush message
def handle_event(:info, {proto, _, <<?H, 4::32, _::binary>> = msg}, _, data)
when proto in @proto do
Logger.debug("ClientHandler: Receive flush while not idle")
:ok = sock_send_maybe_active_once(msg, data)
{:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
end

# incoming query with a single pool
def handle_event(:info, {proto, _, bin}, :idle, %{pool: pid} = data)
when is_binary(bin) and is_pid(pid) and proto in @proto do
Logger.debug("ClientHandler: Receive query #{inspect(bin)}")
db_pid = db_checkout(:both, :on_query, data)
handle_prepared_statements(db_pid, bin, data)

{:next_state, :busy, %{data | db_pid: db_pid, query_start: System.monotonic_time()},
{:next_event, :internal, {proto, nil, bin}}}
end

def handle_event(:info, {proto, _, bin}, _, %{mode: :proxy} = data) when proto in @proto do
{:next_state, :busy, %{data | query_start: System.monotonic_time()},
{:next_event, :internal, {proto, nil, bin}}}
end

# incoming query with read/write pools
def handle_event(:info, {proto, _, bin}, :idle, data) when proto in @proto do
query_type =
with {:ok, payload} <- Client.get_payload(bin),
{:ok, statements} <- Supavisor.PgParser.statements(payload) do
Logger.debug(
"ClientHandler: Receive payload #{inspect(payload, pretty: true)} statements #{inspect(statements)}"
)
def handle_event(kind, {proto, socket, msg}, state, data)
when proto in @proto and is_binary(msg) do
with {:next_state, next_state, new_data, actions} <- handle_data(kind, msg, state, data) do
new_actions =
actions
|> List.wrap()
|> Enum.map(fn
{:next_event, type, bin} when is_binary(bin) ->
{:next_event, type, {proto, socket, bin}}

case statements do
# naive check for read only queries
["SelectStmt"] -> :read
_ -> :write
end
else
other ->
Logger.error("ClientHandler: Receive query error: #{inspect(other)}")
:write
end

ts = System.monotonic_time()
db_pid = db_checkout(query_type, :on_query, data)

{:next_state, :busy, %{data | db_pid: db_pid, query_start: ts, last_query: bin},
{:next_event, :internal, {proto, nil, bin}}}
end

# forward query to db
def handle_event(_, {proto, _, bin}, :busy, data) when proto in @proto do
Logger.debug("ClientHandler: Forward query to db #{inspect(bin)} #{inspect(data.db_pid)}")

case sock_send_maybe_active_once(bin, data) do
:ok ->
{:keep_state, %{data | active_count: data.active_count + 1}}

error ->
Logger.error("ClientHandler: error while sending query: #{inspect(error)}")

HandlerHelpers.sock_send(
data.sock,
Server.error_message("XX000", "Error while sending query")
)
other ->
other
end)

{:stop, {:shutdown, :send_query_error}}
{:next_state, next_state, new_data, new_actions}
end
end

Expand Down Expand Up @@ -1048,6 +966,105 @@
end
end

@spec handle_data(kind :: atom(), data :: binary(), state, data) ::
:gen_statem.event_handler_result(data)
when state: atom() | term(),
data: term()

# handle Terminate message
defp handle_data(:info, <<?X, 4::32>>, :idle, _data) do
Logger.info("ClientHandler: Terminate received from client")
{:stop, {:shutdown, :terminate_received}}
end

defp handle_data(:info, <<?S, 4::32>> <> _ = msg, :idle, data) do
Logger.debug("ClientHandler: Receive sync")

# db_pid can be nil in transaction mode, so we will send ready_for_query
# without checking out a direct connection. If there is a linked db_pid,
# we will forward the message to it
if data.db_pid != nil,
do: :ok = sock_send_maybe_active_once(msg, data),
else: :ok = HandlerHelpers.sock_send(data.sock, Server.ready_for_query())

{:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
end

defp handle_data(:info, <<?S, 4::32, _::binary>> = msg, _, data) do
Logger.debug("ClientHandler: Receive sync while not idle")
:ok = sock_send_maybe_active_once(msg, data)
{:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
end

# handle Flush message
defp handle_data(:info, <<?H, 4::32, _::binary>> = msg, _, data) do
Logger.debug("ClientHandler: Receive flush while not idle")
:ok = sock_send_maybe_active_once(msg, data)
{:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
end

# incoming query with a single pool
defp handle_data(:info, bin, :idle, %{pool: pid} = data) when is_pid(pid) do
Logger.debug("ClientHandler: Receive query #{inspect(bin)}")
db_pid = db_checkout(:both, :on_query, data)
handle_prepared_statements(db_pid, bin, data)

{:next_state, :busy, %{data | db_pid: db_pid, query_start: System.monotonic_time()},
{:next_event, :internal, bin}}
end

defp handle_data(:info, bin, _, %{mode: :proxy} = data) do
{:next_state, :busy, %{data | query_start: System.monotonic_time()},
{:next_event, :internal, bin}}
end

# incoming query with read/write pools
defp handle_data(:info, bin, :idle, data) do
query_type =
with {:ok, payload} <- Client.get_payload(bin),
{:ok, statements} <- Supavisor.PgParser.statements(payload) do
Logger.debug(
"ClientHandler: Receive payload #{inspect(payload, pretty: true)} statements #{inspect(statements)}"
)

case statements do
# naive check for read only queries
["SelectStmt"] -> :read
_ -> :write
end
else
other ->
Logger.error("ClientHandler: Receive query error: #{inspect(other)}")
:write
end

ts = System.monotonic_time()
db_pid = db_checkout(query_type, :on_query, data)

{:next_state, :busy, %{data | db_pid: db_pid, query_start: ts, last_query: bin},
{:next_event, :internal, bin}}
end

# forward query to db
defp handle_data(_, bin, :busy, data) do
Logger.debug("ClientHandler: Forward query to db #{inspect(bin)} #{inspect(data.db_pid)}")

case sock_send_maybe_active_once(bin, data) do
:ok ->
{:keep_state, %{data | active_count: data.active_count + 1}}

error ->
Logger.error("ClientHandler: error while sending query: #{inspect(error)}")

HandlerHelpers.sock_send(
data.sock,
Server.error_message("XX000", "Error while sending query")
)

{:stop, {:shutdown, :send_query_error}}
end
end

@spec handle_prepared_statements({pid, pid, Supavisor.sock()}, binary, map) :: :ok | nil
defp handle_prepared_statements({_, pid, _}, bin, %{mode: :transaction} = data) do
with {:ok, payload} <- Client.get_payload(bin),
Expand Down
Loading