diff --git a/lib/ch/row_binary.ex b/lib/ch/row_binary.ex index f056c32..ad82826 100644 --- a/lib/ch/row_binary.ex +++ b/lib/ch/row_binary.ex @@ -148,7 +148,7 @@ defmodule Ch.RowBinary do def decode_rows(<<>>, _types), do: [] def decode_rows(<>, types) do - decode_rows(types, data, [], [], types) + decode_rows(types, data, 1, [], [], types) end defp skip_names(<>, 0, count), do: decode_types(rest, count, _acc = []) @@ -191,7 +191,7 @@ defmodule Ch.RowBinary do defp decode_types(<>, 0, types) do types = types |> decode_types() |> :lists.reverse() - decode_rows(types, rest, _row = [], _rows = [], types) + decode_rows(types, rest, 1, _row = [], _rows = [], types) end defp decode_types(<>, count, acc) do @@ -298,23 +298,24 @@ defmodule Ch.RowBinary do raise ArgumentError, "#{type} type is not supported" end - @compile inline: [decode_string_decode_rows: 5] + @compile inline: [decode_string_decode_rows: 6] for {pattern, size} <- varints do defp decode_string_decode_rows( <>, types_rest, + count, row, rows, types ) do - decode_rows(types_rest, bin, [s | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [s | row], rows, types) end end - @compile inline: [decode_array_decode_rows: 6] - defp decode_array_decode_rows(<<0, bin::bytes>>, _type, types_rest, row, rows, types) do - decode_rows(types_rest, bin, [[] | row], rows, types) + @compile inline: [decode_array_decode_rows: 7] + defp decode_array_decode_rows(<<0, bin::bytes>>, _type, types_rest, count, row, rows, types) do + decode_rows_cont(types_rest, bin, count - 1, [[] | row], rows, types) end for {pattern, size} <- varints do @@ -322,109 +323,128 @@ defmodule Ch.RowBinary do <>, type, types_rest, + count, row, rows, types ) do - array_types = List.duplicate(type, unquote(size)) - types_rest = array_types ++ [{:array_over, row} | types_rest] - decode_rows(types_rest, bin, [], rows, types) + types_rest = [type, {:array_over, count, row} | types_rest] + decode_rows(types_rest, bin, unquote(size), [], rows, types) end end - defp decode_rows([type | types_rest], <>, row, rows, types) do + defp decode_rows([type | _] = types_rest, <>, count, row, rows, types) do case type do :u8 -> <> = bin - decode_rows(types_rest, bin, [u | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [u | row], rows, types) :u16 -> <> = bin - decode_rows(types_rest, bin, [u | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [u | row], rows, types) :u32 -> <> = bin - decode_rows(types_rest, bin, [u | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [u | row], rows, types) :u64 -> <> = bin - decode_rows(types_rest, bin, [u | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [u | row], rows, types) :u128 -> <> = bin - decode_rows(types_rest, bin, [u | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [u | row], rows, types) :u256 -> <> = bin - decode_rows(types_rest, bin, [u | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [u | row], rows, types) :i8 -> <> = bin - decode_rows(types_rest, bin, [i | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [i | row], rows, types) :i16 -> <> = bin - decode_rows(types_rest, bin, [i | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [i | row], rows, types) :i32 -> <> = bin - decode_rows(types_rest, bin, [i | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [i | row], rows, types) :i64 -> <> = bin - decode_rows(types_rest, bin, [i | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [i | row], rows, types) :i128 -> <> = bin - decode_rows(types_rest, bin, [i | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [i | row], rows, types) :i256 -> <> = bin - decode_rows(types_rest, bin, [i | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [i | row], rows, types) :f32 -> case bin do <> -> - decode_rows(types_rest, bin, [f | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [f | row], rows, types) <<_nan_or_inf::32, bin::bytes>> -> - decode_rows(types_rest, bin, [nil | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [nil | row], rows, types) end :f64 -> case bin do <> -> - decode_rows(types_rest, bin, [f | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [f | row], rows, types) <<_nan_or_inf::64, bin::bytes>> -> - decode_rows(types_rest, bin, [nil | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [nil | row], rows, types) end :string -> - decode_string_decode_rows(bin, types_rest, row, rows, types) + decode_string_decode_rows(bin, types_rest, count, row, rows, types) {:string, size} -> <> = bin - decode_rows(types_rest, bin, [s | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [s | row], rows, types) :boolean -> case bin do - <<0, bin::bytes>> -> decode_rows(types_rest, bin, [false | row], rows, types) - <<1, bin::bytes>> -> decode_rows(types_rest, bin, [true | row], rows, types) + <<0, bin::bytes>> -> + decode_rows_cont(types_rest, bin, count - 1, [false | row], rows, types) + + <<1, bin::bytes>> -> + decode_rows_cont(types_rest, bin, count - 1, [true | row], rows, types) end :uuid -> <> = bin uuid = <> - decode_rows(types_rest, bin, [uuid | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [uuid | row], rows, types) :date -> <> = bin - decode_rows(types_rest, bin, [Date.add(@epoch_date, d) | row], rows, types) + + decode_rows_cont( + types_rest, + bin, + count - 1, + [Date.add(@epoch_date, d) | row], + rows, + types + ) :date32 -> <> = bin - decode_rows(types_rest, bin, [Date.add(@epoch_date, d) | row], rows, types) + + decode_rows_cont( + types_rest, + bin, + count - 1, + [Date.add(@epoch_date, d) | row], + rows, + types + ) {:datetime, timezone} -> <> = bin @@ -437,26 +457,36 @@ defmodule Ch.RowBinary do _ -> s |> DateTime.from_unix!() |> DateTime.shift_zone!(timezone) end - decode_rows(types_rest, bin, [dt | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [dt | row], rows, types) {:decimal, p, s} -> size = decimal_size(p) <> = bin sign = if val < 0, do: -1, else: 1 d = Decimal.new(sign, abs(val), -s) - decode_rows(types_rest, bin, [d | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [d | row], rows, types) {:nullable, type} -> case bin do - <<1, bin::bytes>> -> decode_rows(types_rest, bin, [nil | row], rows, types) - <<0, bin::bytes>> -> decode_rows([type | types_rest], bin, row, rows, types) + <<1, bin::bytes>> -> + decode_rows_cont(types_rest, bin, count - 1, [nil | row], rows, types) + + <<0, bin::bytes>> -> + decode_rows([type | types_rest], bin, count, row, rows, types) end {:array, type} -> - decode_array_decode_rows(bin, type, types_rest, row, rows, types) - - {:array_over, original_row} -> - decode_rows(types_rest, bin, [:lists.reverse(row) | original_row], rows, types) + decode_array_decode_rows(bin, type, types_rest, count, row, rows, types) + + {:array_over, original_count, original_row} -> + decode_rows_cont( + types_rest, + bin, + original_count - 1, + [:lists.reverse(row) | original_row], + rows, + types + ) {:datetime64, time_unit, timezone} -> <> = bin @@ -475,25 +505,33 @@ defmodule Ch.RowBinary do |> DateTime.shift_zone!(timezone) end - decode_rows(types_rest, bin, [dt | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [dt | row], rows, types) {:enum8, mapping} -> <> = bin - decode_rows(types_rest, bin, [Map.fetch!(mapping, v) | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [Map.fetch!(mapping, v) | row], rows, types) {:enum16, mapping} -> <> = bin - decode_rows(types_rest, bin, [Map.fetch!(mapping, v) | row], rows, types) + decode_rows_cont(types_rest, bin, count - 1, [Map.fetch!(mapping, v) | row], rows, types) end end - defp decode_rows([], <<>>, row, rows, _types) do + defp decode_rows([], <<>>, _count, row, rows, _types) do :lists.reverse([:lists.reverse(row) | rows]) end - defp decode_rows([], <>, row, rows, types) do - row = :lists.reverse(row) - decode_rows(types, bin, [], [row | rows], types) + defp decode_rows([], <>, count, row, rows, types) do + decode_rows_cont(types, bin, count, [], [:lists.reverse(row) | rows], types) + end + + @compile inline: [decode_rows_cont: 6] + defp decode_rows_cont([_type | types_rest], <>, _count = 0, row, rows, types) do + decode_rows(types_rest, bin, 1, row, rows, types) + end + + defp decode_rows_cont(types_rest, <>, count, row, rows, types) do + decode_rows(types_rest, bin, count, row, rows, types) end # TODO eval once, at type decoding diff --git a/test/ch/row_binary_test.exs b/test/ch/row_binary_test.exs index b38d68a..f95ac86 100644 --- a/test/ch/row_binary_test.exs +++ b/test/ch/row_binary_test.exs @@ -4,6 +4,9 @@ defmodule Ch.RowBinaryTest do import Ch.{RowBinary, Test} test "encode -> decode" do + Rexbug.start("Ch.RowBinary :: return", msgs: 10000) + on_exit(fn -> :timer.sleep(100) end) + spec = [ {:string, ""}, {:string, "a"},