Skip to content

Commit

Permalink
move data to sql
Browse files Browse the repository at this point in the history
  • Loading branch information
ruslandoga committed Dec 29, 2023
1 parent 705bb31 commit fe4ed23
Show file tree
Hide file tree
Showing 9 changed files with 510 additions and 416 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## Unreleased

- move rows payload (RowBinary, CSV, etc.) to SQL statement
- remove pseudo-positional binds, make param names explicit

## 0.2.2 (2023-12-23)

- fix query encoding for datetimes with zeroed microseconds `~U[****-**-** **:**:**.000000]` https://github.com/plausible/ch/pull/138
Expand Down
92 changes: 54 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")
Ch.query!(pid, "INSERT INTO ch_demo(id) SELECT number FROM system.numbers LIMIT {limit:UInt8}", %{"limit" => 2})
```

#### Insert rows as [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary) (efficient)
#### Insert [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary)

```elixir
{:ok, pid} = Ch.start_link()
Expand All @@ -98,20 +98,26 @@ types = [Ch.Types.u64()]
# or
types = [:u64]

rows = [[0], [1]]
row_binary = Ch.RowBinary.encode_rows(rows, types)

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", [[0], [1]], types: types)
Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT RowBinary\n" | row_binary])
```

Note that RowBinary format encoding requires `:types` option to be provided.

Similarly, you can use [`RowBinaryWithNamesAndTypes`](https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes) which would additionally do something like a type check.

```elixir
sql = "INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes"
opts = [names: ["id"], types: ["UInt64"]]
rows = [[0], [1]]
names = ["id"]
types = ["UInt64"]

header = Ch.RowBinary.encode_names_and_types(names, types)
row_binary = Ch.RowBinary.encode_rows(rows, types)

%Ch.Result{num_rows: 2} = Ch.query!(pid, sql, rows, opts)
%Ch.Result{num_rows: 3} =
Ch.query!(pid, ["INSERT INTO ch_demo FORMAT RowBinaryWithNamesAndTypes\n", header | row_binary])
```

#### Insert rows in custom [format](https://clickhouse.com/docs/en/interfaces/formats)
Expand All @@ -121,29 +127,42 @@ rows = [[0], [1]]

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

csv = [0, 1] |> Enum.map(&to_string/1) |> Enum.intersperse(?\n)
csv = "0\n1"

%Ch.Result{num_rows: 2} =
Ch.query!(pid, "INSERT INTO ch_demo(id) FORMAT CSV", csv, encode: false)
Ch.query!(pid, ["INSERT INTO ch_demo(id) FORMAT CSV\n" | csv])
```

#### Insert rows as chunked RowBinary stream
#### Insert rows as [chunked](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) RowBinary stream

```elixir
{:ok, pid} = Ch.start_link()

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

stream = Stream.repeatedly(fn -> [:rand.uniform(100)] end)
chunked = Stream.chunk_every(stream, 100)
encoded = Stream.map(chunked, fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
ten_encoded_chunks = Stream.take(encoded, 10)
row_binary =
Stream.repeatedly(fn -> [:rand.uniform(100)] end)
|> Stream.chunk_every(100_000)
|> Stream.map(fn chunk -> Ch.RowBinary.encode_rows(chunk, _types = ["UInt64"]) end)
|> Stream.take(10)

%Ch.Result{num_rows: 1000} =
Ch.query(pid, "INSERT INTO ch_demo(id) FORMAT RowBinary", ten_encoded_chunks, encode: false)
%Ch.Result{num_rows: 1_000_000} =
Ch.query(pid, Stream.concat(["INSERT INTO ch_demo(id) FORMAT RowBinary\n"], row_binary))
```

This query makes a [`transfer-encoding: chunked`](https://en.wikipedia.org/wiki/Chunked_transfer_encoding) HTTP request while unfolding the stream resulting in lower memory usage.
#### Insert rows via [input](https://clickhouse.com/docs/en/sql-reference/table-functions/input) function

```elixir
{:ok, pid} = Ch.start_link()

Ch.query!(pid, "CREATE TABLE IF NOT EXISTS ch_demo(id UInt64) ENGINE Null")

sql = "INSERT INTO ch_demo SELECT id + {ego:Int64} FROM input('id UInt64') FORMAT RowBinary\n"
row_binary = Ch.RowBinary.encode_rows([[1], [2], [3]], ["UInt64"])

%Ch.Result{num_rows: 3} =
Ch.query!(pid, [sql | row_binary], %{"ego" => -1})
```

#### Query with custom [settings](https://clickhouse.com/docs/en/operations/settings/settings)

Expand All @@ -156,7 +175,7 @@ settings = [async_insert: 1]
Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'")

%Ch.Result{rows: [["async_insert", "Bool", "1"]]} =
Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'", [], settings: settings)
Ch.query!(pid, "SHOW SETTINGS LIKE 'async_insert'", _params = [], settings: settings)
```

## Caveats
Expand All @@ -179,13 +198,13 @@ CREATE TABLE ch_nulls (
""")

types = ["Nullable(UInt8)", "UInt8", "UInt8"]
inserted_rows = [[nil, nil, nil]]
selected_rows = [[nil, 0, 0]]
rows = [[nil, nil, nil]]
row_binary = Ch.RowBinary.encode_rows(rows, types)

%Ch.Result{num_rows: 1} =
Ch.query!(pid, "INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary", inserted_rows, types: types)
Ch.query!(pid, ["INSERT INTO ch_nulls(a, b, c) FORMAT RowBinary\n" | row_binary])

%Ch.Result{rows: ^selected_rows} =
%Ch.Result{rows: [[nil, 0, 0]]} =
Ch.query!(pid, "SELECT * FROM ch_nulls")
```

Expand All @@ -197,13 +216,17 @@ However, [`input()`](https://clickhouse.com/docs/en/sql-reference/table-function
sql = """
INSERT INTO ch_nulls
SELECT * FROM input('a Nullable(UInt8), b Nullable(UInt8), c UInt8')
FORMAT RowBinary\
FORMAT RowBinary
"""

Ch.query!(pid, sql, inserted_rows, types: ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"])
types = ["Nullable(UInt8)", "Nullable(UInt8)", "UInt8"]
row_binary = Ch.RowBinary.encode_rows(rows, types)

%Ch.Result{num_rows: 1} =
Ch.query!(pid, [sql | row_binary])

%Ch.Result{rows: [[0], [10]]} =
Ch.query!(pid, "SELECT b FROM ch_nulls ORDER BY b")
%Ch.Result{rows: [_before = [nil, 0, 0], _after = [nil, 10, 0]]} =
Ch.query!(pid, "SELECT * FROM ch_nulls ORDER BY b")
```

#### UTF-8 in RowBinary
Expand All @@ -215,26 +238,19 @@ When decoding [`String`](https://clickhouse.com/docs/en/sql-reference/data-types

Ch.query!(pid, "CREATE TABLE ch_utf8(str String) ENGINE Memory")

bin = "\x61\xF0\x80\x80\x80b"
utf8 = "a�b"
# "\x61\xF0\x80\x80\x80b" will become "a�b" on SELECT
row_binary = Ch.RowBinary.encode(:string, "\x61\xF0\x80\x80\x80b")

%Ch.Result{num_rows: 1} =
Ch.query!(pid, "INSERT INTO ch_utf8(str) FORMAT RowBinary", [[bin]], types: ["String"])
Ch.query!(pid, ["INSERT INTO ch_utf8(str) FORMAT RowBinary\n" | row_binary])

%Ch.Result{rows: [[^utf8]]} =
%Ch.Result{rows: [["a�b"]]} =
Ch.query!(pid, "SELECT * FROM ch_utf8")

%Ch.Result{rows: %{"data" => [[^utf8]]}} =
%Ch.Result{rows: %{"data" => [["a�b"]]}} =
pid |> Ch.query!("SELECT * FROM ch_utf8 FORMAT JSONCompact") |> Map.update!(:rows, &Jason.decode!/1)
```

To get raw binary from `String` columns use `:binary` type that skips UTF-8 checks.

```elixir
%Ch.Result{rows: [[^bin]]} =
Ch.query!(pid, "SELECT * FROM ch_utf8", [], types: [:binary])
```

#### Timezones in RowBinary

Decoding non-UTC datetimes like `DateTime('Asia/Taipei')` requires a [timezone database.](https://hexdocs.pm/elixir/DateTime.html#module-time-zone-database)
Expand Down Expand Up @@ -268,7 +284,7 @@ utc = DateTime.utc_now()
taipei = DateTime.shift_zone!(utc, "Asia/Taipei")

# ** (ArgumentError) non-UTC timezones are not supported for encoding: 2023-04-26 01:49:43.044569+08:00 CST Asia/Taipei
Ch.query!(pid, "INSERT INTO ch_datetimes(datetime) FORMAT RowBinary", [[naive], [utc], [taipei]], types: ["DateTime"])
Ch.RowBinary.encode_rows([[naive], [utc], [taipei]], ["DateTime"])
```

## Benchmarks
Expand Down
59 changes: 44 additions & 15 deletions lib/ch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,85 @@ defmodule Ch do
@moduledoc "Minimal HTTP ClickHouse client."
alias Ch.{Connection, Query, Result}

@type common_option ::
{:database, String.t()}
| {:username, String.t()}
| {:password, String.t()}
| {:settings, Keyword.t()}
| {:timeout, timeout}

@type start_option ::
common_option
| {:scheme, String.t()}
| {:hostname, String.t()}
| {:port, :inet.port_number()}
| {:transport_opts, :gen_tcp.connect_option()}
| DBConnection.start_option()

@doc """
Start the connection process and connect to ClickHouse.
## Options
* `:scheme` - HTTP scheme, defaults to `"http"`
* `:hostname` - server hostname, defaults to `"localhost"`
* `:port` - HTTP port, defualts to `8123`
* `:scheme` - HTTP scheme, defaults to `"http"`
* `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info
* `:database` - Database, defaults to `"default"`
* `:username` - Username
* `:password` - User password
* `:settings` - Keyword list of ClickHouse settings
* `:timeout` - HTTP receive timeout in milliseconds
* `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info
* [`DBConnection.start_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:start_option/0)
"""
@spec start_link([start_option]) :: GenServer.on_start()
def start_link(opts \\ []) do
DBConnection.start_link(Connection, opts)
end

@doc """
Returns a supervisor child specification for a DBConnection pool.
See `start_link/1` for supported options.
"""
@spec child_spec([start_option]) :: :supervisor.child_spec()
def child_spec(opts) do
DBConnection.child_spec(Connection, opts)
end

# TODO move streaming to Ch.stream/4
@type statement :: iodata | Enumerable.t()
@type params :: %{String.t() => term} | [{String.t(), term}]

@type query_option ::
common_option
| {:command, Ch.Query.command()}
| {:headers, [{String.t(), String.t()}]}
| {:format, String.t()}
| {:decode, boolean}
| DBConnection.connection_option()

@doc """
Runs a query and returns the result as `{:ok, %Ch.Result{}}` or
`{:error, Exception.t()}` if there was a database error.
## Options
* `:timeout` - Query request timeout
* `:settings` - Keyword list of settings
* `:database` - Database
* `:username` - Username
* `:password` - User password
* `:settings` - Keyword list of settings
* `:timeout` - Query request timeout
* `:command` - Command tag for the query
* `:headers` - Custom HTTP headers for the request
* `:format` - Custom response format for the request
* `:decode` - Whether to automatically decode the response
* [`DBConnection.connection_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:connection_option/0)
"""
@spec query(DBConnection.conn(), iodata, params, Keyword.t()) ::
@spec query(DBConnection.conn(), statement, params, [query_option]) ::
{:ok, Result.t()} | {:error, Exception.t()}
when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
def query(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)

Expand All @@ -57,26 +93,19 @@ defmodule Ch do
Runs a query and returns the result or raises `Ch.Error` if
there was an error. See `query/4`.
"""
@spec query!(DBConnection.conn(), iodata, params, Keyword.t()) :: Result.t()
when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
@spec query!(DBConnection.conn(), statement, params, [query_option]) :: Result.t()
def query!(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
DBConnection.execute!(conn, query, params, opts)
end

@doc false
@spec stream(DBConnection.t(), iodata, map | [term], Keyword.t()) :: DBConnection.Stream.t()
@spec stream(DBConnection.t(), statement, params, [query_option]) :: DBConnection.Stream.t()
def stream(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
DBConnection.stream(conn, query, params, opts)
end

@doc false
@spec run(DBConnection.conn(), (DBConnection.t() -> any), Keyword.t()) :: any
def run(conn, f, opts \\ []) when is_function(f, 1) do
DBConnection.run(conn, f, opts)
end

if Code.ensure_loaded?(Ecto.ParameterizedType) do
@behaviour Ecto.ParameterizedType

Expand Down
39 changes: 21 additions & 18 deletions lib/ch/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -127,35 +127,24 @@ defmodule Ch.Connection do
end

@impl true
def handle_execute(%Query{command: :insert} = query, params, opts, conn) do
{query_params, extra_headers, body} = params
def handle_execute(%Query{statement: statement} = query, params, opts, conn) do
{query_params, extra_headers} = params

path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)

result =
if is_function(body, 2) do
request_chunked(conn, "POST", path, headers, body, opts)
if is_list(statement) or is_binary(statement) do
request(conn, "POST", path, headers, statement, opts)
else
request(conn, "POST", path, headers, body, opts)
request_chunked(conn, "POST", path, headers, statement, opts)
end

with {:ok, conn, responses} <- result do
{:ok, query, responses, conn}
end
end

def handle_execute(query, params, opts, conn) do
{query_params, extra_headers, body} = params

path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)

with {:ok, conn, responses} <- request(conn, "POST", path, headers, body, opts) do
{:ok, query, responses, conn}
end
end

@impl true
def disconnect(_error, conn) do
{:ok = ok, _conn} = HTTP.close(conn)
Expand All @@ -164,7 +153,14 @@ defmodule Ch.Connection do

@typep response :: Mint.Types.status() | Mint.Types.headers() | binary

@spec request(conn, binary, binary, Mint.Types.headers(), iodata, Keyword.t()) ::
@spec request(
conn,
method :: String.t(),
path :: String.t(),
Mint.Types.headers(),
body :: iodata,
[Ch.query_option()]
) ::
{:ok, conn, [response]}
| {:error, Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
Expand All @@ -174,7 +170,14 @@ defmodule Ch.Connection do
end
end

@spec request_chunked(conn, binary, binary, Mint.Types.headers(), Enumerable.t(), Keyword.t()) ::
@spec request_chunked(
conn,
method :: String.t(),
path :: String.t(),
Mint.Types.headers(),
body :: Enumerable.t(),
[Ch.query_option()]
) ::
{:ok, conn, [response]}
| {:error, Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
Expand Down
Loading

0 comments on commit fe4ed23

Please sign in to comment.