diff --git a/test/ch/faults_test.exs b/test/ch/faults_test.exs index e92941d..cf4a280 100644 --- a/test/ch/faults_test.exs +++ b/test/ch/faults_test.exs @@ -1,23 +1,11 @@ defmodule Ch.FaultsTest do use ExUnit.Case - - defp intercept_packets(socket, timeout \\ 100) do - receive do - {:tcp, ^socket, packet} -> - [packet | intercept_packets(socket, timeout)] - after - timeout -> [] - end - end + import Ch.Test, only: [intercept_packets: 1] defp capture_async_log(f) do ExUnit.CaptureLog.capture_log([async: true], f) end - defp first_byte(binary) do - :binary.part(binary, 0, 1) - end - @socket_opts [:binary, {:active, true}, {:packet, :raw}] setup do @@ -63,37 +51,40 @@ defmodule Ch.FaultsTest do end describe "ping/1" do - test "disconnects on timeout", %{port: port, listen: listen, clickhouse: clickhouse} do - Ch.start_link(port: port, timeout: 100, idle_interval: 20) - - {:ok, mint} = :gen_tcp.accept(listen) - + test "reconnects after timeout", %{port: port, listen: listen, clickhouse: clickhouse} do log = capture_async_log(fn -> + Ch.start_link(port: port, timeout: 100, idle_interval: 20) + + {:ok, mint} = :gen_tcp.accept(listen) + :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) + :ok = :gen_tcp.send(mint, first_byte(intercept_packets(clickhouse))) - [first | _rest] = intercept_packets(clickhouse) - :ok = :gen_tcp.send(mint, first_byte(first)) + {:ok, mint} = :gen_tcp.accept(listen) - refute_receive _anything + :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) + :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) end) assert log =~ "disconnected: ** (Mint.TransportError) timeout" end - test "disconnects on closed", %{port: port, listen: listen, clickhouse: clickhouse} do - Ch.start_link(port: port, idle_interval: 20) - {:ok, mint} = :gen_tcp.accept(listen) - + test "reconnects after close", %{port: port, listen: listen, clickhouse: clickhouse} do log = capture_async_log(fn -> - :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) + Ch.start_link(port: port, idle_interval: 40) - [first | _rest] = intercept_packets(clickhouse) - :ok = :gen_tcp.send(mint, first_byte(first)) + {:ok, mint} = :gen_tcp.accept(listen) + + :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) + :ok = :gen_tcp.send(mint, first_byte(intercept_packets(clickhouse))) :ok = :gen_tcp.close(mint) - refute_receive _anything + {:ok, mint} = :gen_tcp.accept(listen) + + :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) + :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) end) assert log =~ "disconnected: ** (Mint.TransportError) socket closed" @@ -101,119 +92,110 @@ defmodule Ch.FaultsTest do end describe "query" do - test "timeouts on slow response", %{port: port, listen: listen, clickhouse: clickhouse} do - {:ok, conn} = Ch.start_link(port: port, timeout: 100) - {:ok, mint} = :gen_tcp.accept(listen) - + test "reconnects after timeout", %{port: port, listen: listen, clickhouse: clickhouse} do test = self() log = capture_async_log(fn -> + {:ok, conn} = Ch.start_link(port: port, timeout: 100) + {:ok, mint} = :gen_tcp.accept(listen) + spawn_link(fn -> assert {:error, %Mint.TransportError{reason: :timeout}} = Ch.query(conn, "select 1 + 1") - - send(test, :done) end) :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) + :ok = :gen_tcp.send(mint, first_byte(intercept_packets(clickhouse))) - [first | _rest] = intercept_packets(clickhouse) - :ok = :gen_tcp.send(mint, first_byte(first)) + {:ok, mint} = :gen_tcp.accept(listen) + + spawn_link(fn -> + assert {:ok, %{num_rows: 1, rows: [[2]]}} = Ch.query(conn, "select 1 + 1") + send(test, :done) + end) + :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) + :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) assert_receive :done - refute_receive _anything end) assert log =~ "disconnected: ** (Mint.TransportError) timeout" end - test "closed when receiving response", %{port: port, listen: listen, clickhouse: clickhouse} do - {:ok, conn} = Ch.start_link(port: port) - {:ok, mint} = :gen_tcp.accept(listen) - + test "reconnects after closed on response", ctx do + %{port: port, listen: listen, clickhouse: clickhouse} = ctx test = self() log = capture_async_log(fn -> + {:ok, conn} = Ch.start_link(port: port) + {:ok, mint} = :gen_tcp.accept(listen) + spawn_link(fn -> assert {:error, %Mint.TransportError{reason: :closed}} = Ch.query(conn, "select 1 + 1") - - send(test, :done) end) :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) - - [first | _rest] = intercept_packets(clickhouse) - :ok = :gen_tcp.send(mint, first_byte(first)) + :ok = :gen_tcp.send(mint, first_byte(intercept_packets(clickhouse))) :ok = :gen_tcp.close(mint) - assert_receive :done - refute_receive _anything - end) - - assert log =~ "disconnected: ** (Mint.TransportError) socket closed" - end - - test "closed when sending data", %{port: port, listen: listen} do - {:ok, conn} = Ch.start_link(port: port) - {:ok, mint} = :gen_tcp.accept(listen) - - test = self() + {:ok, mint} = :gen_tcp.accept(listen) - log = - capture_async_log(fn -> spawn_link(fn -> - data = Ch.RowBinary.encode_rows([[1, 2], [3, 4]], [:u8, :u8]) - - assert {:error, %Mint.TransportError{reason: :closed}} = - Ch.query(conn, "insert into table(a,b)", data, format: "RowBinary") - + assert {:ok, %{num_rows: 1, rows: [[2]]}} = Ch.query(conn, "select 1 + 1") send(test, :done) end) - assert_receive {:tcp, ^mint, _packet} - :ok = :gen_tcp.close(mint) - + :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) + :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) assert_receive :done - refute_receive _anything end) assert log =~ "disconnected: ** (Mint.TransportError) socket closed" end - test "reconnects after disconnect", %{port: port, listen: listen, clickhouse: clickhouse} do - {:ok, conn} = Ch.start_link(port: port) - {:ok, mint} = :gen_tcp.accept(listen) + test "reconnects after closed while streaming request", ctx do + %{port: port, listen: listen, clickhouse: clickhouse} = ctx test = self() + data = Ch.RowBinary.encode_rows([[1, 2], [3, 4]], [:u8, :u8]) log = capture_async_log(fn -> + {:ok, conn} = Ch.start_link(port: port) + {:ok, mint} = :gen_tcp.accept(listen) + spawn_link(fn -> assert {:error, %Mint.TransportError{reason: :closed}} = - Ch.query(conn, "select 1 + 1") + Ch.query(conn, "insert into example(a,b)", data, format: "RowBinary") end) - _ = intercept_packets(mint) + assert_receive {:tcp, ^mint, _packet} :ok = :gen_tcp.close(mint) {:ok, mint} = :gen_tcp.accept(listen) spawn_link(fn -> - assert {:ok, %{num_rows: 1, rows: [[2]]}} = Ch.query(conn, "select 1 + 1") + assert {:error, %Ch.Error{code: 60, message: message}} = + Ch.query(conn, "insert into example(a,b)", data, format: "RowBinary") + + assert message =~ ~r/UNKNOWN_TABLE/ + send(test, :done) end) :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) - assert_receive :done - refute_receive _anything end) assert log =~ "disconnected: ** (Mint.TransportError) socket closed" end end + + defp first_byte(binary) do + :binary.part(binary, 0, 1) + end end diff --git a/test/support/test.ex b/test/support/test.ex index 89f47bc..3e7c5f6 100644 --- a/test/support/test.ex +++ b/test/support/test.ex @@ -50,4 +50,60 @@ defmodule Ch.Test do def drop_table(table) do sql_exec("drop table `#{table}`") end + + def intercept_packets(socket, buffer \\ <<>>) do + receive do + {:tcp, ^socket, packet} -> + buffer = buffer <> packet + + if complete?(buffer) do + buffer + else + intercept_packets(socket, buffer) + end + end + end + + defp complete?(buffer) do + with {:ok, rest} <- eat_status(buffer), + {:ok, content_length, rest} <- eat_headers(rest) do + verify_body(content_length, rest) + else + _ -> false + end + end + + defp eat_status(buffer) do + case :erlang.decode_packet(:http_bin, buffer, []) do + {:ok, _, rest} -> {:ok, rest} + {:more, _} -> {:more, buffer} + end + end + + defp eat_headers(buffer, content_length \\ nil) do + case :erlang.decode_packet(:httph_bin, buffer, []) do + {:ok, {_, _, :"Content-Length", _, content_length}, rest} -> + eat_headers(rest, String.to_integer(content_length)) + + {:ok, {_, _, :"Transfer-Encoding", _, "chunked"}, rest} -> + eat_headers(rest, :chunked) + + {:ok, :http_eoh, rest} -> + {:ok, content_length, rest} + + {:ok, _, rest} -> + eat_headers(rest, content_length) + + {:more, _} -> + {:more, buffer} + end + end + + defp verify_body(:chunked, chunks) do + String.ends_with?(chunks, "\r\n0\r\n\r\n") + end + + defp verify_body(content_length, body) do + byte_size(body) == content_length + end end