Skip to content

Commit

Permalink
cleanup faults test
Browse files Browse the repository at this point in the history
  • Loading branch information
ruslandoga committed Feb 23, 2023
1 parent abdfcfa commit ccbe45f
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 78 deletions.
138 changes: 60 additions & 78 deletions test/ch/faults_test.exs
Original file line number Diff line number Diff line change
@@ -1,23 +1,11 @@
defmodule Ch.FaultsTest do
use ExUnit.Case

defp intercept_packets(socket, timeout \\ 100) do
receive do
{:tcp, ^socket, packet} ->
[packet | intercept_packets(socket, timeout)]
after
timeout -> []
end
end
import Ch.Test, only: [intercept_packets: 1]

defp capture_async_log(f) do
ExUnit.CaptureLog.capture_log([async: true], f)
end

defp first_byte(binary) do
:binary.part(binary, 0, 1)
end

@socket_opts [:binary, {:active, true}, {:packet, :raw}]

setup do
Expand Down Expand Up @@ -63,157 +51,151 @@ defmodule Ch.FaultsTest do
end

describe "ping/1" do
test "disconnects on timeout", %{port: port, listen: listen, clickhouse: clickhouse} do
Ch.start_link(port: port, timeout: 100, idle_interval: 20)

{:ok, mint} = :gen_tcp.accept(listen)

test "reconnects after timeout", %{port: port, listen: listen, clickhouse: clickhouse} do
log =
capture_async_log(fn ->
Ch.start_link(port: port, timeout: 100, idle_interval: 20)

{:ok, mint} = :gen_tcp.accept(listen)

:ok = :gen_tcp.send(clickhouse, intercept_packets(mint))
:ok = :gen_tcp.send(mint, first_byte(intercept_packets(clickhouse)))

[first | _rest] = intercept_packets(clickhouse)
:ok = :gen_tcp.send(mint, first_byte(first))
{:ok, mint} = :gen_tcp.accept(listen)

refute_receive _anything
:ok = :gen_tcp.send(clickhouse, intercept_packets(mint))
:ok = :gen_tcp.send(mint, intercept_packets(clickhouse))
end)

assert log =~ "disconnected: ** (Mint.TransportError) timeout"
end

test "disconnects on closed", %{port: port, listen: listen, clickhouse: clickhouse} do
Ch.start_link(port: port, idle_interval: 20)
{:ok, mint} = :gen_tcp.accept(listen)

test "reconnects after close", %{port: port, listen: listen, clickhouse: clickhouse} do
log =
capture_async_log(fn ->
:ok = :gen_tcp.send(clickhouse, intercept_packets(mint))
Ch.start_link(port: port, idle_interval: 40)

[first | _rest] = intercept_packets(clickhouse)
:ok = :gen_tcp.send(mint, first_byte(first))
{:ok, mint} = :gen_tcp.accept(listen)

:ok = :gen_tcp.send(clickhouse, intercept_packets(mint))
:ok = :gen_tcp.send(mint, first_byte(intercept_packets(clickhouse)))
:ok = :gen_tcp.close(mint)

refute_receive _anything
{:ok, mint} = :gen_tcp.accept(listen)

:ok = :gen_tcp.send(clickhouse, intercept_packets(mint))
:ok = :gen_tcp.send(mint, intercept_packets(clickhouse))
end)

assert log =~ "disconnected: ** (Mint.TransportError) socket closed"
end
end

describe "query" do
test "timeouts on slow response", %{port: port, listen: listen, clickhouse: clickhouse} do
{:ok, conn} = Ch.start_link(port: port, timeout: 100)
{:ok, mint} = :gen_tcp.accept(listen)

test "reconnects after timeout", %{port: port, listen: listen, clickhouse: clickhouse} do
test = self()

log =
capture_async_log(fn ->
{:ok, conn} = Ch.start_link(port: port, timeout: 100)
{:ok, mint} = :gen_tcp.accept(listen)

spawn_link(fn ->
assert {:error, %Mint.TransportError{reason: :timeout}} =
Ch.query(conn, "select 1 + 1")

send(test, :done)
end)

:ok = :gen_tcp.send(clickhouse, intercept_packets(mint))
:ok = :gen_tcp.send(mint, first_byte(intercept_packets(clickhouse)))

[first | _rest] = intercept_packets(clickhouse)
:ok = :gen_tcp.send(mint, first_byte(first))
{:ok, mint} = :gen_tcp.accept(listen)

spawn_link(fn ->
assert {:ok, %{num_rows: 1, rows: [[2]]}} = Ch.query(conn, "select 1 + 1")
send(test, :done)
end)

:ok = :gen_tcp.send(clickhouse, intercept_packets(mint))
:ok = :gen_tcp.send(mint, intercept_packets(clickhouse))
assert_receive :done
refute_receive _anything
end)

assert log =~ "disconnected: ** (Mint.TransportError) timeout"
end

test "closed when receiving response", %{port: port, listen: listen, clickhouse: clickhouse} do
{:ok, conn} = Ch.start_link(port: port)
{:ok, mint} = :gen_tcp.accept(listen)

test "reconnects after closed on response", ctx do
%{port: port, listen: listen, clickhouse: clickhouse} = ctx
test = self()

log =
capture_async_log(fn ->
{:ok, conn} = Ch.start_link(port: port)
{:ok, mint} = :gen_tcp.accept(listen)

spawn_link(fn ->
assert {:error, %Mint.TransportError{reason: :closed}} =
Ch.query(conn, "select 1 + 1")

send(test, :done)
end)

:ok = :gen_tcp.send(clickhouse, intercept_packets(mint))

[first | _rest] = intercept_packets(clickhouse)
:ok = :gen_tcp.send(mint, first_byte(first))
:ok = :gen_tcp.send(mint, first_byte(intercept_packets(clickhouse)))
:ok = :gen_tcp.close(mint)

assert_receive :done
refute_receive _anything
end)

assert log =~ "disconnected: ** (Mint.TransportError) socket closed"
end

test "closed when sending data", %{port: port, listen: listen} do
{:ok, conn} = Ch.start_link(port: port)
{:ok, mint} = :gen_tcp.accept(listen)

test = self()
{:ok, mint} = :gen_tcp.accept(listen)

log =
capture_async_log(fn ->
spawn_link(fn ->
data = Ch.RowBinary.encode_rows([[1, 2], [3, 4]], [:u8, :u8])

assert {:error, %Mint.TransportError{reason: :closed}} =
Ch.query(conn, "insert into table(a,b)", data, format: "RowBinary")

assert {:ok, %{num_rows: 1, rows: [[2]]}} = Ch.query(conn, "select 1 + 1")
send(test, :done)
end)

assert_receive {:tcp, ^mint, _packet}
:ok = :gen_tcp.close(mint)

:ok = :gen_tcp.send(clickhouse, intercept_packets(mint))
:ok = :gen_tcp.send(mint, intercept_packets(clickhouse))
assert_receive :done
refute_receive _anything
end)

assert log =~ "disconnected: ** (Mint.TransportError) socket closed"
end

test "reconnects after disconnect", %{port: port, listen: listen, clickhouse: clickhouse} do
{:ok, conn} = Ch.start_link(port: port)
{:ok, mint} = :gen_tcp.accept(listen)
test "reconnects after closed while streaming request", ctx do
%{port: port, listen: listen, clickhouse: clickhouse} = ctx

test = self()
data = Ch.RowBinary.encode_rows([[1, 2], [3, 4]], [:u8, :u8])

log =
capture_async_log(fn ->
{:ok, conn} = Ch.start_link(port: port)
{:ok, mint} = :gen_tcp.accept(listen)

spawn_link(fn ->
assert {:error, %Mint.TransportError{reason: :closed}} =
Ch.query(conn, "select 1 + 1")
Ch.query(conn, "insert into example(a,b)", data, format: "RowBinary")
end)

_ = intercept_packets(mint)
assert_receive {:tcp, ^mint, _packet}
:ok = :gen_tcp.close(mint)

{:ok, mint} = :gen_tcp.accept(listen)

spawn_link(fn ->
assert {:ok, %{num_rows: 1, rows: [[2]]}} = Ch.query(conn, "select 1 + 1")
assert {:error, %Ch.Error{code: 60, message: message}} =
Ch.query(conn, "insert into example(a,b)", data, format: "RowBinary")

assert message =~ ~r/UNKNOWN_TABLE/

send(test, :done)
end)

:ok = :gen_tcp.send(clickhouse, intercept_packets(mint))
:ok = :gen_tcp.send(mint, intercept_packets(clickhouse))

assert_receive :done
refute_receive _anything
end)

assert log =~ "disconnected: ** (Mint.TransportError) socket closed"
end
end

defp first_byte(binary) do
:binary.part(binary, 0, 1)
end
end
56 changes: 56 additions & 0 deletions test/support/test.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,60 @@ defmodule Ch.Test do
def drop_table(table) do
sql_exec("drop table `#{table}`")
end

def intercept_packets(socket, buffer \\ <<>>) do
receive do
{:tcp, ^socket, packet} ->
buffer = buffer <> packet

if complete?(buffer) do
buffer
else
intercept_packets(socket, buffer)
end
end
end

defp complete?(buffer) do
with {:ok, rest} <- eat_status(buffer),
{:ok, content_length, rest} <- eat_headers(rest) do
verify_body(content_length, rest)
else
_ -> false
end
end

defp eat_status(buffer) do
case :erlang.decode_packet(:http_bin, buffer, []) do
{:ok, _, rest} -> {:ok, rest}
{:more, _} -> {:more, buffer}
end
end

defp eat_headers(buffer, content_length \\ nil) do
case :erlang.decode_packet(:httph_bin, buffer, []) do
{:ok, {_, _, :"Content-Length", _, content_length}, rest} ->
eat_headers(rest, String.to_integer(content_length))

{:ok, {_, _, :"Transfer-Encoding", _, "chunked"}, rest} ->
eat_headers(rest, :chunked)

{:ok, :http_eoh, rest} ->
{:ok, content_length, rest}

{:ok, _, rest} ->
eat_headers(rest, content_length)

{:more, _} ->
{:more, buffer}
end
end

defp verify_body(:chunked, chunks) do
String.ends_with?(chunks, "\r\n0\r\n\r\n")
end

defp verify_body(content_length, body) do
byte_size(body) == content_length
end
end

0 comments on commit ccbe45f

Please sign in to comment.