Skip to content

Commit

Permalink
allow custom headers
Browse files Browse the repository at this point in the history
  • Loading branch information
ruslandoga committed Oct 29, 2023
1 parent 972f8d5 commit 9b08f52
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 6 deletions.
15 changes: 9 additions & 6 deletions lib/ch/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ defimpl DBConnection.Query, for: Ch.Query do
query_params: [{String.t(), String.t()}],
body: iodata | Enumerable.t()

def encode(%Query{command: :insert, encode: false, statement: statement}, data, _opts) do
def encode(%Query{command: :insert, encode: false, statement: statement}, data, opts) do
body =
case data do
_ when is_list(data) or is_binary(data) -> [statement, ?\n | data]
_ -> Stream.concat([[statement, ?\n]], data)
end

{_query_params = [], _extra_headers = [], body}
{_query_params = [], headers(opts), body}
end

def encode(%Query{command: :insert, statement: statement}, params, opts) do
Expand All @@ -91,23 +91,23 @@ defimpl DBConnection.Query, for: Ch.Query do
types = Keyword.fetch!(opts, :types)
header = RowBinary.encode_names_and_types(names, types)
data = RowBinary.encode_rows(params, types)
{_query_params = [], _extra_headers = [], [statement, ?\n, header | data]}
{_query_params = [], headers(opts), [statement, ?\n, header | data]}

format_row_binary?(statement) ->
types = Keyword.fetch!(opts, :types)
data = RowBinary.encode_rows(params, types)
{_query_params = [], _extra_headers = [], [statement, ?\n | data]}
{_query_params = [], headers(opts), [statement, ?\n | data]}

true ->
{query_params(params), _extra_headers = [], statement}
{query_params(params), headers(opts), statement}
end
end

def encode(%Query{statement: statement}, params, opts) do
types = Keyword.get(opts, :types)
default_format = if types, do: "RowBinary", else: "RowBinaryWithNamesAndTypes"
format = Keyword.get(opts, :format) || default_format
{query_params(params), [{"x-clickhouse-format", format}], statement}
{query_params(params), [{"x-clickhouse-format", format} | headers(opts)], statement}
end

defp format_row_binary?(statement) when is_binary(statement) do
Expand Down Expand Up @@ -266,6 +266,9 @@ defimpl DBConnection.Query, for: Ch.Query do
end

defp escape_param([], param), do: param

@spec headers(Keyword.t()) :: Mint.Types.headers()
defp headers(opts), do: Keyword.get(opts, :headers, [])
end

defimpl String.Chars, for: Ch.Query do
Expand Down
56 changes: 56 additions & 0 deletions test/ch/headers_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
defmodule Ch.HeadersTest do
use ExUnit.Case, async: true

setup do
{:ok, conn} = Ch.start_link()
{:ok, conn: conn}
end

test "can request gzipped response through headers", %{conn: conn} do
assert {:ok, %{rows: rows, headers: headers}} =
Ch.query(conn, "select number from system.numbers limit 100", [],
decode: false,
settings: [enable_http_compression: 1],
headers: [{"accept-encoding", "gzip"}]
)

assert :proplists.get_value("content-type", headers) == "application/octet-stream"
assert :proplists.get_value("content-encoding", headers) == "gzip"
assert :proplists.get_value("x-clickhouse-format", headers) == "RowBinaryWithNamesAndTypes"

# https://en.wikipedia.org/wiki/Gzip
assert <<0x1F, 0x8B, _rest::bytes>> = IO.iodata_to_binary(rows)
end

test "can request lz4 response through headers", %{conn: conn} do
assert {:ok, %{rows: rows, headers: headers}} =
Ch.query(conn, "select number from system.numbers limit 100", [],
decode: false,
settings: [enable_http_compression: 1],
headers: [{"accept-encoding", "lz4"}]
)

assert :proplists.get_value("content-type", headers) == "application/octet-stream"
assert :proplists.get_value("content-encoding", headers) == "lz4"
assert :proplists.get_value("x-clickhouse-format", headers) == "RowBinaryWithNamesAndTypes"

# https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)
assert <<0x04, 0x22, 0x4D, 0x18, _rest::bytes>> = IO.iodata_to_binary(rows)
end

test "can request zstd response through headers", %{conn: conn} do
assert {:ok, %{rows: rows, headers: headers}} =
Ch.query(conn, "select number from system.numbers limit 100", [],
decode: false,
settings: [enable_http_compression: 1],
headers: [{"accept-encoding", "zstd"}]
)

assert :proplists.get_value("content-type", headers) == "application/octet-stream"
assert :proplists.get_value("content-encoding", headers) == "zstd"
assert :proplists.get_value("x-clickhouse-format", headers) == "RowBinaryWithNamesAndTypes"

# https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)
assert <<0x28, 0xB5, 0x2F, 0xFD, _rest::bytes>> = IO.iodata_to_binary(rows)
end
end

0 comments on commit 9b08f52

Please sign in to comment.