Skip to content

Commit

Permalink
Proxy info messages to the adapter (#316)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruslandoga authored Nov 12, 2024
1 parent c6dcdf9 commit 5d088f7
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test,integration_test}/**/*.{ex,exs}"]
inputs: ["{mix,.formatter}.exs", "{config,lib,test,examples,integration_test}/**/*.{ex,exs}"]
]
17 changes: 17 additions & 0 deletions examples/tcp_connection/lib/tcp_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ defmodule TCPConnection do

case :gen_tcp.connect(host, port, socket_opts, timeout) do
{:ok, sock} ->
# Monitor the socket so we can react to it being closed. See handle_info/2.
_ref = :inet.monitor(sock)
{:ok, {sock, <<>>}}

{:error, reason} ->
Expand Down Expand Up @@ -143,6 +145,21 @@ defmodule TCPConnection do
end
end

# The handle_info callback is optional and can be removed if not needed.
# Here it is used to react to `:inet.monitor/1` messages which arrive
# when socket gets closed while the connection is idle.
def handle_info({:DOWN, _ref, _type, sock, _info}, {sock, _buffer}) do
{:disconnect, TCPConnection.Error.exception({:idle, :closed})}
end

def handle_info(msg, state) do
Logger.info(fn ->
["#{__MODULE__} (", inspect(self()), ") missed message: ", inspect(msg)]
end)

:ok
end

@impl true
def handle_close(_, _, s) do
{:ok, nil, s}
Expand Down
100 changes: 100 additions & 0 deletions integration_test/cases/info_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
defmodule InfoTest do
use ExUnit.Case, async: true

alias TestPool, as: P
alias TestAgent, as: A
alias TestQuery, as: Q

test "handle_info handles harmless message and moves on" do
stack = [
fn opts ->
send(opts[:parent], {:connected, self()})
{:ok, :state}
end,
:ok,
{:idle, :state},
{:idle, :state}
]

{:ok, agent} = A.start_link(stack)
{:ok, pool} = P.start_link(agent: agent, parent: self())

assert_receive {:connected, conn}
send(conn, "some harmless message")
assert P.run(pool, fn _ -> :result end) == :result

assert [
connect: _,
handle_info: _,
handle_status: _,
handle_status: _
] = A.record(agent)
end

test "handle_info can force disconnect" do
stack = [
fn opts ->
send(opts[:parent], {:connected, self()})
{:ok, :state}
end,
{:disconnect, RuntimeError.exception("TCP connection just closed")},
:ok,
fn opts ->
send(opts[:parent], :reconnected)
{:ok, :state}
end
]

{:ok, agent} = A.start_link(stack)
P.start_link(agent: agent, parent: self())

assert_receive {:connected, conn}
send(conn, "monitor says TCP connection just closed")
assert_receive :reconnected

assert [
connect: _,
handle_info: _,
disconnect: _,
connect: _
] = A.record(agent)
end

test "handle_info's disconnect while checked out client crashes is no-op" do
stack = [
fn _opts ->
{:ok, %{conn_pid: self()}}
end,
fn _query, _params, _opts, %{conn_pid: conn_pid} ->
send(conn_pid, "monitor says TCP connection just closed")

# This waits for the info message to be processed.
:sys.get_state(conn_pid)

{:disconnect, RuntimeError.exception("TCP connection is closed"), :new_state}
end,
{:disconnect, RuntimeError.exception("TCP connection just closed")},
:ok,
fn opts ->
send(opts[:parent], :reconnected)
{:ok, :state}
end
]

{:ok, agent} = A.start_link(stack)
{:ok, pool} = P.start_link(agent: agent, parent: self())

assert {:error, %RuntimeError{message: "TCP connection is closed"}} =
P.execute(pool, %Q{}, [])

assert_receive :reconnected

assert [
connect: _,
handle_execute: _,
handle_info: _,
disconnect: _,
connect: _
] = A.record(agent)
end
end
20 changes: 15 additions & 5 deletions lib/db_connection/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,22 @@ defmodule DBConnection.Connection do
handle_timeout(s)
end

def handle_event(:info, msg, :no_state, %{mod: mod} = s) do
Logger.info(fn ->
[inspect(mod), ?\s, ?(, inspect(self()), ") missed message: " | inspect(msg)]
end)
def handle_event(:info, msg, :no_state, %{mod: mod, state: state} = s) do
if function_exported?(mod, :handle_info, 2) do
case apply(mod, :handle_info, [msg, state]) do
:ok ->
handle_timeout(s)

{:disconnect, err} ->
{:keep_state, s, {:next_event, :internal, {:disconnect, {:log, err}}}}
end
else
Logger.info(fn ->
[inspect(mod), ?\s, ?(, inspect(self()), ") missed message: " | inspect(msg)]
end)

handle_timeout(s)
handle_timeout(s)
end
end

@doc false
Expand Down
4 changes: 4 additions & 0 deletions test/test_support.exs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ defmodule TestConnection do
TestAgent.eval(:handle_deallocate, [query, cursor, opts, state])
end

def handle_info(message, state) do
TestAgent.eval(:handle_info, [message, state])
end

defp put_agent_from_opts(opts) do
Process.get(:agent) || Process.put(:agent, agent_from_opts(opts))
end
Expand Down

0 comments on commit 5d088f7

Please sign in to comment.