From 375c25d664b3002051cbe4a31fff8c8c358aca29 Mon Sep 17 00:00:00 2001 From: Henry Popp Date: Fri, 4 May 2018 15:13:06 -0500 Subject: [PATCH] refactor: minor changes --- lib/connection/flow_control.ex | 4 +- lib/stream.ex | 10 +++-- lib/stream/flow_control.ex | 72 +++++++++++++++++++--------------- 3 files changed, 49 insertions(+), 37 deletions(-) diff --git a/lib/connection/flow_control.ex b/lib/connection/flow_control.ex index f4ac237..d6eed28 100644 --- a/lib/connection/flow_control.ex +++ b/lib/connection/flow_control.ex @@ -1,11 +1,13 @@ defmodule Kadabra.Connection.FlowControl do @moduledoc false + @default_window_size 65_535 + defstruct queue: :queue.new(), stream_id: 1, active_stream_count: 0, active_streams: MapSet.new(), - window: 65_535, + window: @default_window_size, settings: %Kadabra.Connection.Settings{} alias Kadabra.{Config, Connection, StreamSupervisor} diff --git a/lib/stream.ex b/lib/stream.ex index 6cefe3d..d9c7e49 100644 --- a/lib/stream.ex +++ b/lib/stream.ex @@ -51,6 +51,7 @@ defmodule Kadabra.Stream do def new(%Config{} = config, %Settings{} = settings, stream_id) do flow_opts = [ stream_id: stream_id, + socket: config.socket, window: settings.initial_window_size, max_frame_size: settings.max_frame_size ] @@ -123,7 +124,7 @@ defmodule Kadabra.Stream do flow = stream.flow |> Stream.FlowControl.increment_window(inc) - |> Stream.FlowControl.process(stream.config.socket) + |> Stream.FlowControl.process() {:keep_state, %{stream | flow: flow}} end @@ -180,8 +181,9 @@ defmodule Kadabra.Stream do {:next_state, @reserved_remote, stream} end - def recv(from, %Continuation{} = frame, _state, stream) do - {:ok, headers} = Hpack.decode(stream.decoder, frame.header_block_fragment) + def recv(from, %Continuation{} = frame, _state, %{config: config} = stream) do + {:ok, headers} = Hpack.decode(config.ref, frame.header_block_fragment) + :gen_statem.reply(from, :ok) stream = %Stream{stream | headers: stream.headers ++ headers} @@ -257,7 +259,7 @@ defmodule Kadabra.Stream do if payload do stream.flow |> Stream.FlowControl.add(payload) - |> Stream.FlowControl.process(stream.config.socket) + |> Stream.FlowControl.process() else stream.flow end diff --git a/lib/stream/flow_control.ex b/lib/stream/flow_control.ex index 84d210b..6d9df03 100644 --- a/lib/stream/flow_control.ex +++ b/lib/stream/flow_control.ex @@ -1,9 +1,12 @@ defmodule Kadabra.Stream.FlowControl do @moduledoc false + @default_window_size 65_535 + defstruct queue: :queue.new(), - window: 56_536, + window: @default_window_size, max_frame_size: 16_384, + socket: nil, stream_id: nil alias Kadabra.{Encodable, Frame} @@ -12,6 +15,7 @@ defmodule Kadabra.Stream.FlowControl do @type t :: %__MODULE__{ max_frame_size: non_neg_integer, queue: :queue.queue(binary), + socket: pid, window: integer } @@ -34,8 +38,9 @@ defmodule Kadabra.Stream.FlowControl do @spec new(Keyword.t()) :: t def new(opts \\ []) do %__MODULE__{ - stream_id: opts[:stream_id], - window: Keyword.get(opts, :window, 56_536), + stream_id: Keyword.get(opts, :stream_id), + socket: Keyword.get(opts, :socket), + window: Keyword.get(opts, :window, @default_window_size), max_frame_size: Keyword.get(opts, :max_frame_size, 16_384) } end @@ -60,63 +65,66 @@ defmodule Kadabra.Stream.FlowControl do ## Examples - iex> process(%Kadabra.Stream.FlowControl{queue: :queue.new()}, self()) + iex> process(%Kadabra.Stream.FlowControl{queue: :queue.new()}) %Kadabra.Stream.FlowControl{queue: {[], []}} iex> queue = :queue.in({:send, "test"}, :queue.new()) - iex> process(%Kadabra.Stream.FlowControl{queue: queue, - ...> window: -20}, self()) + iex> process(%Kadabra.Stream.FlowControl{queue: queue, window: -20}) %Kadabra.Stream.FlowControl{queue: {[send: "test"], []}, window: -20} """ - @spec process(t, sock) :: t - def process(%{window: window} = flow_control, _sock) when window <= 0 do + @spec process(t) :: t + def process(%{window: window} = flow_control) when window <= 0 do flow_control end - def process(%{queue: queue} = flow_control, socket) do + def process(%{queue: queue} = flow_control) do case :queue.out(queue) do {{:value, bin}, queue} -> flow_control |> Map.put(:queue, queue) - |> do_process(socket, bin) + |> do_process(bin) {:empty, _queue} -> flow_control end end - def do_process(flow_control, socket, bin) do + defp do_process(%{window: window} = flow, bin) when byte_size(bin) > window do %{ queue: queue, max_frame_size: max_size, - window: window, + socket: socket, stream_id: stream_id - } = flow_control + } = flow - size = byte_size(bin) + {chunk, rem_bin} = :erlang.split_binary(bin, window) - if size > window do - {chunk, rem_bin} = :erlang.split_binary(bin, window) + max_size + |> split_packet(chunk) + |> send_partial_data(socket, stream_id) - max_size - |> split_packet(chunk) - |> send_partial_data(socket, stream_id) + queue = :queue.in_r(rem_bin, queue) - queue = :queue.in_r(rem_bin, queue) + flow + |> Map.put(:queue, queue) + |> Map.put(:window, 0) + |> process() + end + + defp do_process(%{window: window} = flow_control, bin) do + %{ + max_frame_size: max_size, + socket: socket, + stream_id: stream_id + } = flow_control - flow_control - |> Map.put(:queue, queue) - |> Map.put(:window, 0) - |> process(socket) - else - max_size - |> split_packet(bin) - |> send_data(socket, stream_id) + max_size + |> split_packet(bin) + |> send_data(socket, stream_id) - flow_control - |> Map.put(:window, window - size) - |> process(socket) - end + flow_control + |> Map.put(:window, window - byte_size(bin)) + |> process() end def send_partial_data([], _socket, _stream_id), do: :ok