diff --git a/CHANGELOG.md b/CHANGELOG.md index 561fe17..558bfe8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ # Changelog +## v0.4.1 +- Send exactly number of allowed bytes on initial connection WINDOW_UPDATE +- Default settings use maximum values for MAX_FRAME_SIZE and INITIAL_WINDOW_SIZE +- Incoming PING and WINDOW_UPDATE frames are now validated, closing the + connection if an error is encountered +- Fixed: can no longer accidentally send 0-size WINDOW_UPDATE frames +- Fixed: don't send WINDOW_UPDATE frames larger than remote server's available + connection flow window + ## v0.4.0 - Support for `http` URIs - Requests are now buffered with GenStage diff --git a/config/dogma.exs b/config/dogma.exs deleted file mode 100644 index 7fd46f3..0000000 --- a/config/dogma.exs +++ /dev/null @@ -1,16 +0,0 @@ -use Mix.Config -alias Dogma.Rule - -config :dogma, - # Select a set of rules as a base - rule_set: Dogma.RuleSet.All, - - # Pick paths not to lint - exclude: [ - ~r(\Alib/vendor/) - ], - - # Override an existing rule configuration - override: [ - # %Rule.LineLength{ max_length: 100 }, - ] diff --git a/lib/config.ex b/lib/config.ex index 6b21353..901dce5 100644 --- a/lib/config.ex +++ b/lib/config.ex @@ -6,6 +6,7 @@ defmodule Kadabra.Config do ref: nil, uri: nil, socket: nil, + queue: nil, opts: [] @type t :: %__MODULE__{ diff --git a/lib/connection.ex b/lib/connection.ex index eeab8ef..2060164 100644 --- a/lib/connection.ex +++ b/lib/connection.ex @@ -4,56 +4,40 @@ defmodule Kadabra.Connection do defstruct buffer: "", config: nil, flow_control: nil, + remote_window: 65_535, + remote_settings: nil, + local_settings: nil, queue: nil use GenStage require Logger + import Kernel, except: [send: 2] + alias Kadabra.{ Config, Connection, - ConnectionQueue, Encodable, Error, Frame, - FrameParser, - Hpack, - Stream, - StreamSupervisor + Socket, + Tasks } - alias Kadabra.Connection.{FlowControl, Socket} - - alias Kadabra.Frame.{ - Continuation, - Data, - Goaway, - Headers, - Ping, - PushPromise, - RstStream, - WindowUpdate - } + alias Kadabra.Frame.{Goaway, Ping} + + alias Kadabra.Connection.{FlowControl, Processor} @type t :: %__MODULE__{ buffer: binary, config: term, - flow_control: term + flow_control: term, + local_settings: Connection.Settings.t(), + queue: pid } @type sock :: {:sslsocket, any, pid | {any, any}} - @type frame :: - Data.t() - | Headers.t() - | RstStream.t() - | Frame.Settings.t() - | PushPromise.t() - | Ping.t() - | Goaway.t() - | WindowUpdate.t() - | Continuation.t() - def start_link(%Config{supervisor: sup} = config) do name = via_tuple(sup) GenStage.start_link(__MODULE__, config, name: name) @@ -63,27 +47,22 @@ defmodule Kadabra.Connection do {:via, Registry, {Registry.Kadabra, {ref, __MODULE__}}} end - def init(%Config{supervisor: sup, uri: uri, opts: opts} = config) do - case Socket.connect(uri, opts) do - {:ok, socket} -> - send_preface_and_settings(socket, opts[:settings]) - config = %{config | socket: socket} - state = initial_state(config) - {:consumer, state, subscribe_to: [ConnectionQueue.via_tuple(sup)]} - - {:error, error} -> - {:stop, error} - end + def init(%Config{queue: queue} = config) do + state = initial_state(config) + Kernel.send(self(), :start) + Process.flag(:trap_exit, true) + {:consumer, state, subscribe_to: [queue]} end - defp initial_state(%Config{opts: opts} = config) do - settings = Keyword.get(opts, :settings, Connection.Settings.default()) + defp initial_state(%Config{opts: opts, queue: queue} = config) do + settings = Keyword.get(opts, :settings, Connection.Settings.fastest()) + socket = config.supervisor |> Socket.via_tuple() %__MODULE__{ - config: config, - flow_control: %Connection.FlowControl{ - settings: settings - } + config: %{config | socket: socket}, + queue: queue, + local_settings: settings, + flow_control: %FlowControl{} } end @@ -91,26 +70,12 @@ defmodule Kadabra.Connection do GenStage.call(pid, :close) end - defp send_preface_and_settings(socket, settings) do - Socket.send(socket, Frame.connection_preface()) - - bin = - %Frame.Settings{settings: settings || Connection.Settings.default()} - |> Encodable.to_bin() - - Socket.send(socket, bin) - end - def ping(pid) do GenStage.cast(pid, {:send, :ping}) end # handle_cast - def handle_cast({:recv, frame}, state) do - recv(frame, state) - end - def handle_cast({:send, type}, state) do sendf(type, state) end @@ -136,16 +101,13 @@ defmodule Kadabra.Connection do config: config } = state - bin = flow.stream_id |> Goaway.new() |> Encodable.to_bin() - :ssl.send(config.socket, bin) - - send(config.client, {:closed, config.supervisor}) + bin = flow.stream_set.stream_id |> Goaway.new() |> Encodable.to_bin() + Socket.send(config.socket, bin) - Task.Supervisor.start_child(Kadabra.Tasks, fn -> - Kadabra.Supervisor.stop(config.supervisor) - end) + Kernel.send(config.client, {:closed, config.supervisor}) + Tasks.run(fn -> Kadabra.Supervisor.stop(config.supervisor) end) - {:stop, :normal, :ok, state} + {:reply, :ok, [], state} end # sendf @@ -161,287 +123,80 @@ defmodule Kadabra.Connection do {:noreply, [], state} end - # recv - - @spec recv(frame, t) :: {:noreply, [], t} - def recv(%Frame.RstStream{}, state) do - Logger.error("recv unstarted stream rst") - {:noreply, [], state} - end - - def recv(%Frame.Ping{ack: true}, %{config: config} = state) do - send(config.client, {:pong, self()}) - {:noreply, [], state} - end - - def recv(%Frame.Ping{ack: false}, %{client: pid} = state) do - send(pid, {:ping, self()}) - {:noreply, [], state} - end - - # nil settings means use default - def recv(%Frame.Settings{ack: false, settings: nil}, state) do - %{flow_control: flow, config: config} = state - - bin = Frame.Settings.ack() |> Encodable.to_bin() - Socket.send(config.socket, bin) - - case flow.settings.max_concurrent_streams do - :infinite -> - GenStage.ask(state.queue, 2_000_000_000) - - max -> - to_ask = max - flow.active_stream_count - GenStage.ask(state.queue, to_ask) - end + defp do_send_headers(request, %{flow_control: flow} = state) do + flow = + flow + |> FlowControl.add(request) + |> FlowControl.process(state.config) - {:noreply, [], state} + %{state | flow_control: flow} end - def recv(%Frame.Settings{ack: false, settings: settings}, state) do - %{flow_control: flow, config: config} = state - old_settings = flow.settings - flow = Connection.FlowControl.update_settings(flow, settings) - - notify_settings_change(config.ref, old_settings, flow) - - pid = Hpack.via_tuple(config.ref, :encoder) - Hpack.update_max_table_size(pid, settings.max_header_list_size) + def handle_info(:start, %{config: config} = state) do + config.supervisor + |> Socket.via_tuple() + |> Socket.set_active() - bin = Frame.Settings.ack() |> Encodable.to_bin() - Socket.send(config.socket, bin) - - to_ask = settings.max_concurrent_streams - flow.active_stream_count - GenStage.ask(state.queue, to_ask) + bin = + %Frame.Settings{settings: state.local_settings} + |> Encodable.to_bin() - {:noreply, [], %{state | flow_control: flow}} - end + config.supervisor + |> Socket.via_tuple() + |> Socket.send(bin) - def recv(%Frame.Settings{ack: true}, state) do - send_huge_window_update(state.config.socket) {:noreply, [], state} end - def recv(%Goaway{} = frame, state) do - log_goaway(frame) - - {:stop, :normal, state} - end - - def recv(%WindowUpdate{window_size_increment: inc}, state) do - flow = Connection.FlowControl.increment_window(state.flow_control, inc) - {:noreply, [], %{state | flow_control: flow}} - end - - def recv(frame, state) do - """ - Unknown RECV on connection - Frame: #{inspect(frame)} - State: #{inspect(state)} - """ - |> Logger.info() + def handle_info({:closed, _pid}, %{config: config} = state) do + Kernel.send(config.client, {:closed, config.supervisor}) + Tasks.run(fn -> Kadabra.Supervisor.stop(config.supervisor) end) {:noreply, [], state} end - def notify_settings_change(ref, old_settings, flow) do - %{initial_window_size: old_window} = old_settings - %{settings: settings} = flow - - max_frame_size = settings.max_frame_size - new_window = settings.initial_window_size - window_diff = new_window - old_window - - for stream_id <- flow.active_streams do - pid = Stream.via_tuple(ref, stream_id) - Stream.cast_recv(pid, {:settings_change, window_diff, max_frame_size}) - end - end - - defp do_send_headers(requests, state) when is_list(requests) do - Enum.reduce(requests, state, &do_send_headers/2) - end - - defp do_send_headers(request, %{flow_control: flow} = state) do - flow = - flow - |> FlowControl.add(request) - |> FlowControl.process(state.config) - - %{state | flow_control: flow} - end - - def log_goaway(%Goaway{last_stream_id: id, error_code: c, debug_data: b}) do - error = Error.string(c) - Logger.error("Got GOAWAY, #{error}, Last Stream: #{id}, Rest: #{b}") - end + def handle_info({:EXIT, _pid, {:shutdown, {:finished, stream_id}}}, state) do + GenStage.ask(state.queue, 1) - def handle_info({:finished, stream_id}, %{flow_control: flow} = state) do flow = - flow - |> FlowControl.decrement_active_stream_count() - |> FlowControl.remove_active(stream_id) + state.flow_control + |> FlowControl.finish_stream(stream_id) |> FlowControl.process(state.config) - GenStage.ask(state.queue, 1) - {:noreply, [], %{state | flow_control: flow}} end def handle_info({:push_promise, stream}, %{config: config} = state) do - send(config.client, {:push_promise, stream}) - {:noreply, [], state} - end - - def handle_info({:tcp, _socket, bin}, state) do - do_recv_bin(bin, state) + Kernel.send(config.client, {:push_promise, stream}) {:noreply, [], state} end - def handle_info({:tcp_closed, _socket}, state) do - handle_disconnect(state) - end - - def handle_info({:ssl, _socket, bin}, state) do - do_recv_bin(bin, state) - end - - def handle_info({:ssl_closed, _socket}, state) do - handle_disconnect(state) - end + def handle_info({:recv, frame}, state) do + case Processor.process(frame, %{config: config} = state) do + {:ok, state} -> + {:noreply, [], state} - defp do_recv_bin(bin, %{config: %{socket: socket}} = state) do - bin = state.buffer <> bin + {:connection_error, error, state} -> + code = Error.code(error) - case parse_bin(socket, bin, state) do - {:unfinished, bin, state} -> - Socket.setopts(socket, [{:active, :once}]) - {:noreply, [], %{state | buffer: bin}} - end - end + bin = + state.flow_control.stream_set.stream_id + |> Goaway.new(code) + |> Encodable.to_bin() - def parse_bin(socket, bin, state) do - case FrameParser.parse(bin) do - {:ok, frame, rest} -> - state = process(frame, state) - parse_bin(socket, rest, state) + Socket.send(config.socket, bin) + Tasks.run(fn -> Kadabra.Supervisor.stop(config.supervisor) end) - {:error, bin} -> - {:unfinished, bin, state} + {:noreply, [], state} end end - @spec process(frame, t) :: :ok - def process(bin, state) when is_binary(bin) do - Logger.info("Got binary: #{inspect(bin)}") - state - end - - def process(%Data{stream_id: 0}, state) do - # This is an error - state - end - - def process(%Data{stream_id: stream_id} = frame, %{config: config} = state) do - send_window_update(config.socket, frame) - - config.ref - |> Stream.via_tuple(stream_id) - |> Stream.cast_recv(frame) - - state - end - - def process(%Headers{stream_id: stream_id} = frame, %{config: config} = state) do - config.ref - |> Stream.via_tuple(stream_id) - |> Stream.call_recv(frame) - - state - end - - def process(%RstStream{} = frame, %{config: config} = state) do - pid = Stream.via_tuple(config.ref, frame.stream_id) - Stream.cast_recv(pid, frame) - state - end - - def process(%Frame.Settings{} = frame, state) do - # Process immediately - {:noreply, [], state} = recv(frame, state) - state - end - - def process(%PushPromise{stream_id: stream_id} = frame, state) do - %{config: config, flow_control: flow_control} = state - {:ok, pid} = StreamSupervisor.start_stream(config, flow_control, stream_id) - - Stream.call_recv(pid, frame) - - flow = Connection.FlowControl.add_active(flow_control, stream_id) - - %{state | flow_control: flow} - end - - def process(%Ping{} = frame, state) do - # Process immediately - recv(frame, state) - state - end - - def process(%Goaway{} = frame, state) do - GenStage.cast(self(), {:recv, frame}) - state - end - - def process(%WindowUpdate{stream_id: 0} = frame, state) do - Stream.cast_recv(self(), frame) - state - end - - def process(%WindowUpdate{stream_id: stream_id} = frame, state) do - pid = Stream.via_tuple(state.config.ref, stream_id) - Stream.cast_recv(pid, frame) - state - end - - def process(%Continuation{stream_id: stream_id} = frame, state) do - pid = Stream.via_tuple(state.config.ref, stream_id) - Stream.call_recv(pid, frame) - state - end - - def process(_error, state), do: state - - def send_window_update(_socket, %Data{data: nil}), do: :ok - - def send_window_update(_socket, %Data{data: ""}), do: :ok - - def send_window_update(socket, %Data{stream_id: sid, data: data}) do - bin = data |> WindowUpdate.new() |> Encodable.to_bin() - Socket.send(socket, bin) - - s_bin = - sid - |> WindowUpdate.new(byte_size(data)) - |> Encodable.to_bin() - - Socket.send(socket, s_bin) - end - - def send_huge_window_update(socket) do + def send_goaway(%{config: config, flow_control: flow}, error) do bin = - 0 - |> Frame.WindowUpdate.new(2_000_000_000) + flow.stream_set.stream_id + |> Frame.Goaway.new(Error.code(error)) |> Encodable.to_bin() - Socket.send(socket, bin) - end - - def handle_disconnect(%{config: config} = state) do - send(config.client, {:closed, config.supervisor}) - Task.start(fn -> Kadabra.Supervisor.stop(config.supervisor) end) - - {:stop, :normal, state} + Socket.send(config.socket, bin) end end diff --git a/lib/connection/flow_control.ex b/lib/connection/flow_control.ex index d6eed28..3009a28 100644 --- a/lib/connection/flow_control.ex +++ b/lib/connection/flow_control.ex @@ -1,74 +1,41 @@ defmodule Kadabra.Connection.FlowControl do @moduledoc false - @default_window_size 65_535 + @default_window_size round(:math.pow(2, 16) - 1) + @max_window_size round(:math.pow(2, 31) - 1) + + @default_initial_window_size round(:math.pow(2, 16) - 1) + @default_max_frame_size round(:math.pow(2, 14)) defstruct queue: :queue.new(), - stream_id: 1, - active_stream_count: 0, - active_streams: MapSet.new(), - window: @default_window_size, - settings: %Kadabra.Connection.Settings{} + stream_set: %Kadabra.StreamSet{}, + initial_window_size: @default_initial_window_size, + max_frame_size: @default_max_frame_size, + window: @default_window_size - alias Kadabra.{Config, Connection, StreamSupervisor} + alias Kadabra.{Config, Stream, StreamSet} @type t :: %__MODULE__{ queue: :queue.queue(), - stream_id: pos_integer, - active_stream_count: non_neg_integer, - active_streams: MapSet.t(), - window: integer, - settings: Connection.Settings.t() + stream_set: StreamSet.t(), + initial_window_size: non_neg_integer, + max_frame_size: non_neg_integer, + window: integer } - @spec update_settings(t, Connection.Settings.t()) :: t - def update_settings(flow_control, nil), do: flow_control + def window_default, do: @default_window_size - def update_settings(%{settings: old_settings} = flow_control, settings) do - settings = Connection.Settings.merge(old_settings, settings) - %{flow_control | settings: settings} - end + def window_max, do: @max_window_size - @doc ~S""" - Increments current `stream_id`. + @spec update_settings(t(), integer, integer, integer) :: t() + def update_settings(flow_control, initial_window, max_frame, max_streams) do + new_set = + Map.put(flow_control.stream_set, :max_concurrent_streams, max_streams) - ## Examples - - iex> flow = %Kadabra.Connection.FlowControl{stream_id: 5} - iex> increment_stream_id(flow) - %Kadabra.Connection.FlowControl{stream_id: 7} - """ - @spec increment_stream_id(t) :: t - def increment_stream_id(flow_control) do - %{flow_control | stream_id: flow_control.stream_id + 2} - end - - @doc ~S""" - Increments open stream count. - - ## Examples - - iex> flow = %Kadabra.Connection.FlowControl{active_stream_count: 2} - iex> increment_active_stream_count(flow) - %Kadabra.Connection.FlowControl{active_stream_count: 3} - """ - @spec increment_active_stream_count(t) :: t - def increment_active_stream_count(flow_control) do - %{flow_control | active_stream_count: flow_control.active_stream_count + 1} - end - - @doc ~S""" - Decrements open stream count. - - ## Examples - - iex> flow = %Kadabra.Connection.FlowControl{active_stream_count: 2} - iex> decrement_active_stream_count(flow) - %Kadabra.Connection.FlowControl{active_stream_count: 1} - """ - @spec decrement_active_stream_count(t) :: t - def decrement_active_stream_count(flow_control) do - %{flow_control | active_stream_count: flow_control.active_stream_count - 1} + flow_control + |> Map.put(:initial_window_size, initial_window) + |> Map.put(:max_frame_size, max_frame) + |> Map.put(:stream_set, new_set) end @doc ~S""" @@ -99,33 +66,6 @@ defmodule Kadabra.Connection.FlowControl do %{flow_control | window: window - amount} end - @doc ~S""" - Marks stream_id as active. - - ## Examples - - iex> flow = add_active(%Kadabra.Connection.FlowControl{}, 1) - iex> flow.active_streams - #MapSet<[1]> - """ - def add_active(%{active_streams: active} = flow_control, stream_id) do - %{flow_control | active_streams: MapSet.put(active, stream_id)} - end - - @doc ~S""" - Marks stream_id as active. - - ## Examples - - iex> flow = remove_active(%Kadabra.Connection.FlowControl{ - ...> active_streams: MapSet.new([1, 3])}, 1) - iex> flow.active_streams - #MapSet<[3]> - """ - def remove_active(%{active_streams: active} = flow_control, stream_id) do - %{flow_control | active_streams: MapSet.delete(active, stream_id)} - end - @doc ~S""" Adds new sendable item to the queue. @@ -136,26 +76,50 @@ defmodule Kadabra.Connection.FlowControl do 1 """ @spec add(t, Kadabra.Request.t()) :: t + def add(%{queue: queue} = flow_control, requests) when is_list(requests) do + queue = Enum.reduce(requests, queue, &:queue.in(&1, &2)) + %{flow_control | queue: queue} + end + def add(%{queue: queue} = flow_control, request) do queue = :queue.in(request, queue) %{flow_control | queue: queue} end + def add_active(%{stream_set: set} = flow_control, stream_id, pid) do + new_set = StreamSet.add_active(set, stream_id, pid) + %{flow_control | stream_set: new_set} + end + @spec process(t, Config.t()) :: t - def process(%{queue: queue} = flow, config) do + def process(%{queue: queue, stream_set: stream_set} = flow, config) do with {{:value, request}, queue} <- :queue.out(queue), + {:can_send, true} <- {:can_send, StreamSet.can_send?(stream_set)}, {:can_send, true} <- {:can_send, can_send?(flow)} do - case StreamSupervisor.start_stream(config, flow) do + %{ + stream_set: %{stream_id: stream_id}, + initial_window_size: window, + max_frame_size: max_frame + } = flow + + stream = Stream.new(config, stream_id, window, max_frame) + + case Stream.start_link(stream) do {:ok, pid} -> size = byte_size(request.body || <<>>) :gen_statem.call(pid, {:send_headers, request}) + updated_set = + stream_set + |> StreamSet.add_active(stream_id, pid) + |> StreamSet.increment_active_stream_count() + |> StreamSet.increment_stream_id() + flow |> Map.put(:queue, queue) + |> Map.put(:stream_set, updated_set) |> decrement_window(size) - |> add_active(flow.stream_id) - |> increment_active_stream_count() - |> increment_stream_id() + |> process(config) other -> raise "something happened #{inspect(other)}" @@ -167,32 +131,30 @@ defmodule Kadabra.Connection.FlowControl do end end + @spec finish_stream(t, non_neg_integer) :: t + def finish_stream(%{stream_set: set} = flow_control, stream_id) do + new_set = + set + |> StreamSet.decrement_active_stream_count() + |> StreamSet.remove_active(stream_id) + + %{flow_control | stream_set: new_set} + end + @doc ~S""" - Returns true if active_streams is less than max streams and window - is positive. + Returns true if window is positive. ## Examples - iex> settings = %Kadabra.Connection.Settings{max_concurrent_streams: 100} - iex> flow = %Kadabra.Connection.FlowControl{active_stream_count: 3, - ...> window: 500, settings: settings} + iex> flow = %Kadabra.Connection.FlowControl{window: 500} iex> can_send?(flow) true - iex> settings = %Kadabra.Connection.Settings{max_concurrent_streams: 100} - iex> flow = %Kadabra.Connection.FlowControl{active_stream_count: 3, - ...> window: 0, settings: settings} - iex> can_send?(flow) - false - - iex> settings = %Kadabra.Connection.Settings{max_concurrent_streams: 1} - iex> flow = %Kadabra.Connection.FlowControl{active_stream_count: 3, - ...> window: 500, settings: settings} + iex> flow = %Kadabra.Connection.FlowControl{window: 0} iex> can_send?(flow) false """ @spec can_send?(t) :: boolean - def can_send?(%{active_stream_count: count, settings: s, window: bytes}) do - count < s.max_concurrent_streams and bytes > 0 - end + def can_send?(%{window: bytes}) when bytes > 0, do: true + def can_send?(_else), do: false end diff --git a/lib/connection/processor.ex b/lib/connection/processor.ex new file mode 100644 index 0000000..2b6045d --- /dev/null +++ b/lib/connection/processor.ex @@ -0,0 +1,261 @@ +defmodule Kadabra.Connection.Processor do + @moduledoc false + + require Logger + + alias Kadabra.{ + Connection, + Encodable, + Error, + Frame, + Hpack, + Socket, + Stream + } + + alias Kadabra.Connection.FlowControl + + alias Kadabra.Frame.{ + Continuation, + Data, + Goaway, + Headers, + Ping, + PushPromise, + RstStream, + WindowUpdate + } + + @type frame :: + Data.t() + | Headers.t() + | RstStream.t() + | Frame.Settings.t() + | PushPromise.t() + | Ping.t() + | Goaway.t() + | WindowUpdate.t() + | Continuation.t() + + @spec process(frame, Connection.t()) :: + {:ok, Connection.t()} | {:connection_error, atom, Connection.t()} + def process(bin, state) when is_binary(bin) do + Logger.info("Got binary: #{inspect(bin)}") + state + end + + def process(%Data{stream_id: 0}, state) do + # This is an error + {:ok, state} + end + + def process(%Data{stream_id: stream_id} = frame, %{config: config} = state) do + available = FlowControl.window_max() - state.remote_window + bin_size = byte_size(frame.data) + size = min(available, bin_size) + + send_window_update(config.socket, 0, size) + send_window_update(config.socket, stream_id, bin_size) + + process_on_stream(state, stream_id, frame) + + {:ok, %{state | remote_window: state.remote_window + size}} + end + + def process(%Headers{stream_id: stream_id} = frame, state) do + state + |> process_on_stream(stream_id, frame) + |> case do + :ok -> + {:ok, state} + + {:connection_error, error} -> + {:connection_error, error, state} + end + end + + def process(%RstStream{stream_id: 0}, state) do + Logger.error("recv unstarted stream rst") + {:ok, state} + end + + def process(%RstStream{stream_id: stream_id} = frame, state) do + process_on_stream(state, stream_id, frame) + + {:ok, state} + end + + # nil settings means use default + def process(%Frame.Settings{ack: false, settings: nil}, state) do + %{flow_control: flow, config: config} = state + + bin = Frame.Settings.ack() |> Encodable.to_bin() + Socket.send(config.socket, bin) + + case flow.stream_set.max_concurrent_streams do + :infinite -> + GenStage.ask(state.queue, 2_000_000_000) + + max -> + to_ask = max - flow.stream_set.active_stream_count + GenStage.ask(state.queue, to_ask) + end + + {:ok, state} + end + + def process(%Frame.Settings{ack: false, settings: settings}, state) do + %{flow_control: flow, config: config} = state + old_settings = state.remote_settings + settings = Connection.Settings.merge(old_settings, settings) + + flow = + FlowControl.update_settings( + flow, + settings.initial_window_size, + settings.max_frame_size, + settings.max_concurrent_streams + ) + + notify_settings_change(old_settings, settings, flow) + + config.ref + |> Hpack.via_tuple(:encoder) + |> Hpack.update_max_table_size(settings.max_header_list_size) + + bin = Frame.Settings.ack() |> Encodable.to_bin() + Socket.send(config.socket, bin) + + to_ask = + settings.max_concurrent_streams - flow.stream_set.active_stream_count + + GenStage.ask(state.queue, to_ask) + + {:ok, %{state | flow_control: flow, remote_settings: settings}} + end + + def process(%Frame.Settings{ack: true}, %{config: c} = state) do + send_huge_window_update(c.socket, state.remote_window) + {:ok, %{state | remote_window: FlowControl.window_max()}} + end + + def process(%PushPromise{stream_id: stream_id} = frame, state) do + %{config: config, flow_control: flow_control} = state + + %{ + initial_window_size: window, + max_frame_size: max_frame + } = flow_control + + stream = Stream.new(config, stream_id, window, max_frame) + + case Stream.start_link(stream) do + {:ok, pid} -> + Stream.call_recv(pid, frame) + + flow = FlowControl.add_active(flow_control, stream_id, pid) + + {:ok, %{state | flow_control: flow}} + + error -> + raise "#{inspect(error)}" + end + end + + def process(%Ping{stream_id: sid}, state) when sid != 0 do + {:connection_error, :PROTOCOL_ERROR, state} + end + + def process(%Ping{data: data}, state) when byte_size(data) != 8 do + {:connection_error, :FRAME_SIZE_ERROR, state} + end + + def process(%Ping{ack: false}, %{config: config} = state) do + Kernel.send(config.client, {:ping, self()}) + {:ok, state} + end + + def process(%Ping{ack: true}, %{config: config} = state) do + Kernel.send(config.client, {:pong, self()}) + {:ok, state} + end + + def process(%Goaway{} = frame, state) do + log_goaway(frame) + {:connection_error, :NO_ERROR, state} + end + + def process(%WindowUpdate{stream_id: 0, window_size_increment: inc}, state) + when inc <= 0 do + {:connection_error, :PROTOCOL_ERROR, state} + end + + def process(%WindowUpdate{stream_id: 0, window_size_increment: inc}, state) do + flow = FlowControl.increment_window(state.flow_control, inc) + {:ok, %{state | flow_control: flow}} + end + + def process(%WindowUpdate{stream_id: stream_id} = frame, state) do + process_on_stream(state, stream_id, frame) + {:ok, state} + end + + def process(%Continuation{stream_id: stream_id} = frame, state) do + process_on_stream(state, stream_id, frame) + {:ok, state} + end + + def process(frame, state) do + """ + Unknown RECV on connection + Frame: #{inspect(frame)} + State: #{inspect(state)} + """ + |> Logger.info() + + {:ok, state} + end + + def log_goaway(%Goaway{last_stream_id: id, error_code: c, debug_data: b}) do + error = Error.parse(c) + Logger.error("Got GOAWAY, #{error}, Last Stream: #{id}, Rest: #{b}") + end + + @spec send_window_update(pid, non_neg_integer, integer) :: no_return + def send_window_update(socket, stream_id, bytes) + when bytes > 0 and bytes < 2_147_483_647 do + bin = + stream_id + |> WindowUpdate.new(bytes) + |> Encodable.to_bin() + + # Logger.info("Sending WINDOW_UPDATE on Stream #{stream_id} (#{bytes})") + Socket.send(socket, bin) + end + + def send_window_update(_socket, _stream_id, _bytes), do: :ok + + def send_huge_window_update(socket, remote_window) do + available = FlowControl.window_max() - remote_window + send_window_update(socket, 0, available) + end + + def notify_settings_change(old_settings, new_settings, %{stream_set: set}) do + old_settings = old_settings || Connection.Settings.default() + %{initial_window_size: old_window} = old_settings + + max_frame_size = new_settings.max_frame_size + new_window = new_settings.initial_window_size + window_diff = new_window - old_window + + for {_stream_id, pid} <- set.active_streams do + send(pid, {:settings_change, window_diff, max_frame_size}) + end + end + + def process_on_stream(state, stream_id, frame) do + state.flow_control.stream_set.active_streams + |> Map.get(stream_id) + |> Stream.call_recv(frame) + end +end diff --git a/lib/connection/settings.ex b/lib/connection/settings.ex index 1220028..6a40c30 100644 --- a/lib/connection/settings.ex +++ b/lib/connection/settings.ex @@ -1,11 +1,15 @@ defmodule Kadabra.Connection.Settings do @moduledoc false + @default_header_table_size 4096 + @default_initial_window_size round(:math.pow(2, 16) - 1) + @default_max_frame_size round(:math.pow(2, 14)) + defstruct enable_push: true, - header_table_size: 4096, - initial_window_size: 65_535, + header_table_size: @default_header_table_size, + initial_window_size: @default_initial_window_size, max_concurrent_streams: :infinite, - max_frame_size: 16_384, + max_frame_size: @default_max_frame_size, max_header_list_size: nil alias Kadabra.Error @@ -26,10 +30,20 @@ defmodule Kadabra.Connection.Settings do @max_frame_size 0x5 @max_header_list_size 0x6 + @max_initial_window_size round(:math.pow(2, 31) - 1) + @max_max_frame_size round(:math.pow(2, 24) - 1) + def default do %__MODULE__{} end + def fastest do + %__MODULE__{ + initial_window_size: @max_initial_window_size, + max_frame_size: @max_max_frame_size + } + end + @doc ~S""" Puts setting value, returning an error if present. @@ -79,7 +93,8 @@ defmodule Kadabra.Connection.Settings do {:ok, %{settings | max_concurrent_streams: value}} end - def put(settings, @initial_window_size, value) when value > 4_294_967_295 do + def put(settings, @initial_window_size, value) + when value > @max_initial_window_size do {:error, Error.flow_control_error(), settings} end @@ -87,12 +102,13 @@ defmodule Kadabra.Connection.Settings do {:ok, %{settings | initial_window_size: value}} end + def put(settings, @max_frame_size, value) + when value < @default_max_frame_size or value > @max_max_frame_size do + {:error, Error.protocol_error(), settings} + end + def put(settings, @max_frame_size, value) do - if value < 16_384 or value > 16_777_215 do - {:error, Error.protocol_error(), settings} - else - {:ok, %{settings | max_frame_size: value}} - end + {:ok, %{settings | max_frame_size: value}} end def put(settings, @max_header_list_size, value) do @@ -101,6 +117,8 @@ defmodule Kadabra.Connection.Settings do def put(settings, _else, _value), do: {:ok, settings} + def merge(nil, new_settings), do: new_settings + def merge(old_settings, new_settings) do Map.merge(old_settings, new_settings, fn k, v1, v2 -> cond do diff --git a/lib/connection/socket.ex b/lib/connection/socket.ex deleted file mode 100644 index dcc6286..0000000 --- a/lib/connection/socket.ex +++ /dev/null @@ -1,82 +0,0 @@ -defmodule Kadabra.Connection.Socket do - @moduledoc false - - @type ssl_sock :: {:sslsocket, any, pid | {any, any}} - - @type connection_result :: - {:ok, ssl_sock} - | {:ok, pid} - | {:error, :not_implmenented} - | {:error, :bad_scheme} - - @spec connect(URI.t(), Keyword.t()) :: connection_result - def connect(uri, opts) do - case uri.scheme do - "http" -> do_connect(uri, :http, opts) - "https" -> do_connect(uri, :https, opts) - _ -> {:error, :bad_scheme} - end - end - - defp do_connect(uri, :http, opts) do - tcp_opts = - opts - |> Keyword.get(:tcp, []) - |> options(:http) - - uri.host - |> to_charlist() - |> :gen_tcp.connect(uri.port, tcp_opts) - end - - defp do_connect(uri, :https, opts) do - :ssl.start() - - ssl_opts = - opts - |> Keyword.get(:ssl, []) - |> options(:https) - - uri.host - |> to_charlist() - |> :ssl.connect(uri.port, ssl_opts) - end - - @spec options(Keyword.t(), :http | :https) :: [...] - def options(opts, :https) do - opts ++ - [ - {:active, :once}, - {:packet, :raw}, - {:reuseaddr, false}, - {:alpn_advertised_protocols, [<<"h2">>]}, - :binary - ] - end - - def options(opts, :http) do - opts ++ - [ - {:active, :once}, - {:packet, :raw}, - {:reuseaddr, false}, - :binary - ] - end - - def send({:sslsocket, _, _} = pid, bin) do - :ssl.send(pid, bin) - end - - def send(pid, bin) do - :gen_tcp.send(pid, bin) - end - - def setopts({:sslsocket, _, _} = pid, opts) do - :ssl.setopts(pid, opts) - end - - def setopts(pid, opts) do - :inet.setopts(pid, opts) - end -end diff --git a/lib/connection_queue.ex b/lib/connection_queue.ex index 4afdd00..8ebfffb 100644 --- a/lib/connection_queue.ex +++ b/lib/connection_queue.ex @@ -34,7 +34,7 @@ defmodule Kadabra.ConnectionQueue do Enum.reduce(requests, queue, &enqueue(&2, &1)) end - def enqueue(queue, %Kadabra.Request{} = request) do + def enqueue(queue, request) do :queue.in(request, queue) end diff --git a/lib/error.ex b/lib/error.ex index c2f85df..93e2169 100644 --- a/lib/error.ex +++ b/lib/error.ex @@ -1,6 +1,22 @@ defmodule Kadabra.Error do @moduledoc false + @type error :: + :NO_ERROR + | :PROTOCOL_ERROR + | :INTERNAL_ERROR + | :FLOW_CONTROL_ERROR + | :SETTINGS_TIMEOUT + | :STREAM_CLOSED + | :FRAME_SIZE_ERROR + | :REFUSED_STREAM + | :CANCEL + | :COMPRESSION_ERROR + | :CONNECT_ERROR + | :ENHANCE_YOUR_CALM + | :INADEQUATE_SECURITY + | :HTTP_1_1_REQUIRED + @doc ~S""" 32-bit error code of type `NO_ERROR` @@ -54,56 +70,68 @@ defmodule Kadabra.Error do @spec frame_size_error :: <<_::32>> def frame_size_error, do: <<6::32>> + @doc ~S""" + 32-bit error code of type `COMPRESSION_ERROR` + + ## Examples + + iex> Kadabra.Error.compression_error + <<0, 0, 0, 9>> + """ + @spec compression_error :: <<_::32>> + def compression_error, do: <<9::32>> + @doc ~S""" Returns a string error given integer error in range 0x0 - 0xd. ## Examples - iex> Kadabra.Error.string(0x1) - "PROTOCOL_ERROR" - iex> Kadabra.Error.string(0xfff) + iex> Kadabra.Error.parse(0x1) + :PROTOCOL_ERROR + iex> Kadabra.Error.parse(0xfff) 0xfff """ - @spec string(integer) :: String.t() | integer - def string(0x0), do: "NO_ERROR" - def string(0x1), do: "PROTOCOL_ERROR" - def string(0x2), do: "INTERNAL_ERROR" - def string(0x3), do: "FLOW_CONTROL_ERROR" - def string(0x4), do: "SETTINGS_TIMEOUT" - def string(0x5), do: "STREAM_CLOSED" - def string(0x6), do: "FRAME_SIZE_ERROR" - def string(0x7), do: "REFUSED_STREAM" - def string(0x8), do: "CANCEL" - def string(0x9), do: "COMPRESSION_ERROR" - def string(0xA), do: "CONNECT_ERROR" - def string(0xB), do: "ENHANCE_YOUR_CALM" - def string(0xC), do: "INADEQUATE_SECURITY" - def string(0xD), do: "HTTP_1_1_REQUIRED" - def string(error), do: error + @spec parse(integer) :: error | integer + def parse(0x0), do: :NO_ERROR + def parse(0x1), do: :PROTOCOL_ERROR + def parse(0x2), do: :INTERNAL_ERROR + def parse(0x3), do: :FLOW_CONTROL_ERROR + def parse(0x4), do: :SETTINGS_TIMEOUT + def parse(0x5), do: :STREAM_CLOSED + def parse(0x6), do: :FRAME_SIZE_ERROR + def parse(0x7), do: :REFUSED_STREAM + def parse(0x8), do: :CANCEL + def parse(0x9), do: :COMPRESSION_ERROR + def parse(0xA), do: :CONNECT_ERROR + def parse(0xB), do: :ENHANCE_YOUR_CALM + def parse(0xC), do: :INADEQUATE_SECURITY + def parse(0xD), do: :HTTP_1_1_REQUIRED + def parse(error), do: error @doc ~S""" Returns integer error code given string error. ## Examples - iex> Kadabra.Error.code("PROTOCOL_ERROR") + iex> Kadabra.Error.code(:PROTOCOL_ERROR) 0x1 - iex> Kadabra.Error.code("NOT_AN_ERROR") - "NOT_AN_ERROR" + iex> Kadabra.Error.code(:NOT_AN_ERROR) + :NOT_AN_ERROR """ - def code("NO_ERROR"), do: 0x0 - def code("PROTOCOL_ERROR"), do: 0x1 - def code("INTERNAL_ERROR"), do: 0x2 - def code("FLOW_CONTROL_ERROR"), do: 0x3 - def code("SETTINGS_TIMEOUT"), do: 0x4 - def code("STREAM_CLOSED"), do: 0x5 - def code("FRAME_SIZE_ERROR"), do: 0x6 - def code("REFUSED_STREAM"), do: 0x7 - def code("CANCEL"), do: 0x8 - def code("COMPRESSION_ERROR"), do: 0x9 - def code("CONNECT_ERROR"), do: 0xA - def code("ENHANCE_YOUR_CALM"), do: 0xB - def code("INADEQUATE_SECURITY"), do: 0xC - def code("HTTP_1_1_REQUIRED"), do: 0xD + @spec code(error) :: integer + def code(:NO_ERROR), do: 0x0 + def code(:PROTOCOL_ERROR), do: 0x1 + def code(:INTERNAL_ERROR), do: 0x2 + def code(:FLOW_CONTROL_ERROR), do: 0x3 + def code(:SETTINGS_TIMEOUT), do: 0x4 + def code(:STREAM_CLOSED), do: 0x5 + def code(:FRAME_SIZE_ERROR), do: 0x6 + def code(:REFUSED_STREAM), do: 0x7 + def code(:CANCEL), do: 0x8 + def code(:COMPRESSION_ERROR), do: 0x9 + def code(:CONNECT_ERROR), do: 0xA + def code(:ENHANCE_YOUR_CALM), do: 0xB + def code(:INADEQUATE_SECURITY), do: 0xC + def code(:HTTP_1_1_REQUIRED), do: 0xD def code(error), do: error end diff --git a/lib/frame.ex b/lib/frame.ex index 9ac44ed..7acdff2 100644 --- a/lib/frame.ex +++ b/lib/frame.ex @@ -11,9 +11,6 @@ defmodule Kadabra.Frame do payload: bitstring } - @spec connection_preface() :: String.t() - def connection_preface, do: "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" - @spec new(binary) :: {:ok, t, binary} | {:error, binary} def new(<> = bin) do size = p_size * 8 @@ -54,3 +51,11 @@ defmodule Kadabra.Frame do <> end end + +defimpl Kadabra.Encodable, for: Kadabra.Frame do + alias Kadabra.Frame + + def to_bin(%{type: type, flags: flags, stream_id: sid, payload: p}) do + Frame.binary_frame(type, flags, sid, p) + end +end diff --git a/lib/frame/data.ex b/lib/frame/data.ex index 0ae4e3c..3a6f1c5 100644 --- a/lib/frame/data.ex +++ b/lib/frame/data.ex @@ -27,8 +27,8 @@ defimpl Kadabra.Encodable, for: Kadabra.Frame.Data do @data 0x0 - def to_bin(frame) do - end_stream = if frame.end_stream, do: 0x1, else: 0x0 - Frame.binary_frame(@data, end_stream, frame.stream_id, frame.data) + def to_bin(%{end_stream: end_stream, stream_id: stream_id, data: data}) do + flags = if end_stream, do: 0x1, else: 0x0 + Frame.binary_frame(@data, flags, stream_id, data) end end diff --git a/lib/frame/goaway.ex b/lib/frame/goaway.ex index f21d6fe..445b2cc 100644 --- a/lib/frame/goaway.ex +++ b/lib/frame/goaway.ex @@ -43,6 +43,13 @@ defmodule Kadabra.Frame.Goaway do debug_data: debug_data } end + + def new(stream_id, error_code) when is_integer(stream_id) do + %__MODULE__{ + last_stream_id: stream_id, + error_code: error_code + } + end end defimpl Kadabra.Encodable, for: Kadabra.Frame.Goaway do diff --git a/lib/frame/headers.ex b/lib/frame/headers.ex index 23d48b8..c06e3f4 100644 --- a/lib/frame/headers.ex +++ b/lib/frame/headers.ex @@ -64,3 +64,17 @@ defmodule Kadabra.Frame.Headers do end end end + +defimpl Kadabra.Encodable, for: Kadabra.Frame.Headers do + alias Kadabra.Frame + + @headers 0x1 + + def to_bin(%{header_block_fragment: block, stream_id: sid} = frame) do + flags = flags(frame) + Frame.binary_frame(@headers, flags, sid, block) + end + + defp flags(%{end_headers: true, end_stream: false}), do: 0x4 + defp flags(%{end_headers: true, end_stream: true}), do: 0x5 +end diff --git a/lib/frame/ping.ex b/lib/frame/ping.ex index 3d00f03..28fef67 100644 --- a/lib/frame/ping.ex +++ b/lib/frame/ping.ex @@ -1,14 +1,15 @@ defmodule Kadabra.Frame.Ping do @moduledoc false - defstruct [:data, ack: false] + defstruct [:data, stream_id: 0, ack: false] alias Kadabra.Frame alias Kadabra.Frame.Flags @type t :: %__MODULE__{ ack: boolean, - data: <<_::64>> + data: <<_::64>>, + stream_id: integer } @doc ~S""" @@ -18,13 +19,14 @@ defmodule Kadabra.Frame.Ping do iex> Kadabra.Frame.Ping.new %Kadabra.Frame.Ping{data: <<0, 0, 0, 0, 0, 0, 0, 0>>, - ack: false} + ack: false, stream_id: 0} """ @spec new() :: t def new do %__MODULE__{ ack: false, - data: <<0, 0, 0, 0, 0, 0, 0, 0>> + data: <<0, 0, 0, 0, 0, 0, 0, 0>>, + stream_id: 0 } end @@ -34,15 +36,17 @@ defmodule Kadabra.Frame.Ping do ## Examples iex> frame = %Kadabra.Frame{payload: <<0, 0, 0, 0, 0, 0, 0, 0>>, - ...> flags: 0x1, type: 0x6} + ...> flags: 0x1, type: 0x6, stream_id: 0} iex> Kadabra.Frame.Ping.new(frame) - %Kadabra.Frame.Ping{data: <<0, 0, 0, 0, 0, 0, 0, 0>>, ack: true} + %Kadabra.Frame.Ping{data: <<0, 0, 0, 0, 0, 0, 0, 0>>, ack: true, + stream_id: 0} """ @spec new(Frame.t()) :: t - def new(%Frame{type: 0x6, payload: <>, flags: flags}) do + def new(%Frame{type: 0x6, payload: <>, flags: flags, stream_id: sid}) do %__MODULE__{ ack: Flags.ack?(flags), - data: <> + data: <>, + stream_id: sid } end end diff --git a/lib/frame/window_update.ex b/lib/frame/window_update.ex index f9b5c9f..b395a2c 100644 --- a/lib/frame/window_update.ex +++ b/lib/frame/window_update.ex @@ -10,7 +10,7 @@ defmodule Kadabra.Frame.WindowUpdate do window_size_increment: non_neg_integer } - @spec new(Frame.t() | binary) :: t + @spec new(Frame.t()) :: t def new(%Frame{payload: <>, stream_id: stream_id}) do %__MODULE__{ window_size_increment: inc, @@ -18,10 +18,6 @@ defmodule Kadabra.Frame.WindowUpdate do } end - def new(bin) do - new(0x0, bin) - end - @spec new(non_neg_integer, pos_integer | binary) :: t def new(stream_id, increment) when is_integer(increment) do %__MODULE__{ @@ -29,10 +25,6 @@ defmodule Kadabra.Frame.WindowUpdate do window_size_increment: increment } end - - def new(stream_id, bin) when is_binary(bin) do - new(stream_id, byte_size(bin)) - end end defimpl Kadabra.Encodable, for: Kadabra.Frame.WindowUpdate do diff --git a/lib/hpack.ex b/lib/hpack.ex index 9b9900f..f86d4b1 100644 --- a/lib/hpack.ex +++ b/lib/hpack.ex @@ -28,15 +28,6 @@ defmodule Kadabra.Hpack do GenServer.call(pid, {:new_max_table_size, size}) end - def reset(ref) do - GenServer.call(via_tuple(ref, :encoder), :reset) - GenServer.call(via_tuple(ref, :decoder), :reset) - end - - def handle_call(:reset, _pid, _state) do - {:reply, :ok, :hpack.new_context()} - end - def handle_call({:encode, headers}, _pid, state) do {:ok, {bin, new_state}} = :hpack.encode(headers, state) {:reply, {:ok, bin}, new_state} diff --git a/lib/socket.ex b/lib/socket.ex new file mode 100644 index 0000000..98ef9db --- /dev/null +++ b/lib/socket.ex @@ -0,0 +1,186 @@ +defmodule Kadabra.Socket do + @moduledoc false + + defstruct socket: nil, buffer: "", active_user: nil + + alias Kadabra.{Config, FrameParser} + + import Kernel, except: [send: 2] + + use GenServer + + @type ssl_sock :: {:sslsocket, any, pid | {any, any}} + + @type connection_result :: + {:ok, ssl_sock} + | {:ok, pid} + | {:error, :not_implmenented} + | {:error, :bad_scheme} + + @spec connection_preface() :: String.t() + def connection_preface, do: "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" + + def send(pid, bin) do + GenServer.call(pid, {:send, bin}) + end + + def set_active(pid) do + GenServer.call(pid, {:set_active, self()}) + end + + def start_link(%Config{supervisor: sup} = config) do + name = via_tuple(sup) + GenServer.start_link(__MODULE__, config, name: name) + end + + def init(%{uri: uri, opts: opts}) do + case connect(uri, opts) do + {:ok, socket} -> + socket_send(socket, connection_preface()) + {:ok, %__MODULE__{socket: socket}} + + error -> + error + end + end + + def via_tuple(ref) do + {:via, Registry, {Registry.Kadabra, {ref, __MODULE__}}} + end + + @spec connect(URI.t(), Keyword.t()) :: connection_result + defp connect(uri, opts) do + case uri.scheme do + "http" -> do_connect(uri, :http, opts) + "https" -> do_connect(uri, :https, opts) + _ -> {:error, :bad_scheme} + end + end + + defp do_connect(uri, :http, opts) do + tcp_opts = + opts + |> Keyword.get(:tcp, []) + |> options(:http) + + uri.host + |> to_charlist() + |> :gen_tcp.connect(uri.port, tcp_opts) + end + + defp do_connect(uri, :https, opts) do + :ssl.start() + + ssl_opts = + opts + |> Keyword.get(:ssl, []) + |> options(:https) + + uri.host + |> to_charlist() + |> :ssl.connect(uri.port, ssl_opts) + end + + @spec options(Keyword.t(), :http | :https) :: [...] + defp options(opts, :https) do + opts ++ + [ + {:active, :once}, + {:packet, :raw}, + {:reuseaddr, false}, + {:alpn_advertised_protocols, [<<"h2">>]}, + :binary + ] + end + + defp options(opts, :http) do + opts ++ + [ + {:active, :once}, + {:packet, :raw}, + {:reuseaddr, false}, + :binary + ] + end + + # Frame recv and parsing + + defp do_recv_bin(bin, %{socket: socket} = state) do + bin = state.buffer <> bin + + case parse_bin(socket, bin, state) do + {:unfinished, bin, state} -> + setopts(state.socket, [{:active, :once}]) + {:noreply, %{state | buffer: bin}} + end + end + + def parse_bin(_socket, bin, %{active_user: nil} = state) do + {:unfinished, bin, state} + end + + def parse_bin(socket, bin, state) do + case FrameParser.parse(bin) do + {:ok, frame, rest} -> + Kernel.send(state.active_user, {:recv, frame}) + parse_bin(socket, rest, state) + + {:error, bin} -> + {:unfinished, bin, state} + end + end + + # Internal socket helpers + + defp socket_send({:sslsocket, _, _} = pid, bin) do + :ssl.send(pid, bin) + end + + defp socket_send(pid, bin) do + :gen_tcp.send(pid, bin) + end + + defp setopts({:sslsocket, _, _} = pid, opts) do + :ssl.setopts(pid, opts) + end + + defp setopts(pid, opts) do + :inet.setopts(pid, opts) + end + + # handle_call + + def handle_call({:set_active, pid}, _from, state) do + {:reply, :ok, %{state | active_user: pid}} + end + + def handle_call({:send, bin}, _from, state) do + resp = socket_send(state.socket, bin) + {:reply, resp, state} + end + + # handle_info + + def handle_info(:send_preface, state) do + socket_send(state.socket, connection_preface()) + {:noreply, state} + end + + def handle_info({:tcp, _socket, bin}, state) do + do_recv_bin(bin, state) + end + + def handle_info({:tcp_closed, _socket}, state) do + Kernel.send(state.active_user, {:closed, self()}) + {:noreply, state} + end + + def handle_info({:ssl, _socket, bin}, state) do + do_recv_bin(bin, state) + end + + def handle_info({:ssl_closed, _socket}, state) do + Kernel.send(state.active_user, {:closed, self()}) + {:noreply, state} + end +end diff --git a/lib/stream.ex b/lib/stream.ex index d9c7e49..b6878c9 100644 --- a/lib/stream.ex +++ b/lib/stream.ex @@ -3,18 +3,18 @@ defmodule Kadabra.Stream do defstruct id: nil, body: "", - config: nil, + client: nil, connection: nil, - settings: nil, + socket: nil, + ref: nil, flow: nil, + uri: nil, headers: [], - on_push_promise: nil, on_response: nil require Logger - alias Kadabra.{Config, Encodable, Frame, Hpack, Stream, Tasks} - alias Kadabra.Connection.{Settings, Socket} + alias Kadabra.{Encodable, Frame, Hpack, Socket, Stream, Tasks} alias Kadabra.Frame.{ Continuation, @@ -29,17 +29,15 @@ defmodule Kadabra.Stream do @type t :: %__MODULE__{ id: pos_integer, - config: term, + client: pid, connection: pid, - settings: pid, + ref: term, + uri: URI.t(), flow: Kadabra.Stream.FlowControl.t(), headers: [...], body: binary } - # @data 0x0 - @headers 0x1 - @closed :closed @hc_local :half_closed_local @hc_remote :half_closed_remote @@ -48,38 +46,29 @@ defmodule Kadabra.Stream do # @reserved_local :reserved_local @reserved_remote :reserved_remote - def new(%Config{} = config, %Settings{} = settings, stream_id) do + def new(config, stream_id, initial_window_size, max_frame_size) do flow_opts = [ - stream_id: stream_id, - socket: config.socket, - window: settings.initial_window_size, - max_frame_size: settings.max_frame_size + window: initial_window_size, + max_frame_size: max_frame_size ] %__MODULE__{ id: stream_id, - config: config, + client: config.client, + ref: config.ref, + uri: config.uri, + socket: config.socket, connection: self(), flow: Stream.FlowControl.new(flow_opts) } end - def start_link(%{id: id, config: config} = stream) do - config.ref - |> via_tuple(id) - |> :gen_statem.start_link(__MODULE__, stream, []) - end - - def via_tuple(ref, stream_id) do - {:via, Registry, {Registry.Kadabra, {ref, stream_id}}} + def start_link(%Stream{} = stream) do + :gen_statem.start_link(__MODULE__, stream, []) end - def close(ref, stream_id) do - ref |> via_tuple(stream_id) |> cast_recv(:close) - end - - def cast_recv(pid, frame) do - :gen_statem.cast(pid, {:recv, frame}) + def close(pid) do + call_recv(pid, :close) end def call_recv(pid, frame) do @@ -90,87 +79,67 @@ defmodule Kadabra.Stream do :gen_statem.cast(pid, {:send, frame}) end - def recv(:close, _state, _stream) do - {:stop, :normal} + # recv + + def recv(from, :close, _state, _stream) do + {:stop, :normal, [{:reply, from, :ok}]} end - # For SETTINGS initial_window_size and max_frame_size changes - def recv({:settings_change, window, new_max_frame}, _state, stream) do + def recv(from, %WindowUpdate{window_size_increment: inc}, _state, stream) do flow = stream.flow - |> Stream.FlowControl.increment_window(window) - |> Stream.FlowControl.set_max_frame_size(new_max_frame) + |> Stream.FlowControl.increment_window(inc) + |> Stream.FlowControl.process() + |> send_data_frames(stream.socket, stream.id) - {:keep_state, %{stream | flow: flow}} + {:keep_state, %{stream | flow: flow}, [{:reply, from, :ok}]} end - def recv(%Data{end_stream: true, data: data}, state, stream) + def recv(from, %Data{end_stream: true, data: data}, state, stream) when state in [@hc_local] do + :gen_statem.reply(from, :ok) stream = %Stream{stream | body: stream.body <> data} {:next_state, @closed, stream} end - def recv(%Data{end_stream: true, data: data}, _state, stream) do + def recv(from, %Data{end_stream: true, data: data}, _state, stream) do + :gen_statem.reply(from, :ok) stream = %Stream{stream | body: stream.body <> data} {:next_state, @hc_remote, stream} end - def recv(%Data{end_stream: false, data: data}, _state, stream) do + def recv(from, %Data{end_stream: false, data: data}, _state, stream) do + :gen_statem.reply(from, :ok) stream = %Stream{stream | body: stream.body <> data} {:keep_state, stream} end - def recv(%WindowUpdate{window_size_increment: inc}, _state, stream) do - flow = - stream.flow - |> Stream.FlowControl.increment_window(inc) - |> Stream.FlowControl.process() - - {:keep_state, %{stream | flow: flow}} - end + def recv(from, %Headers{end_stream: end_stream?} = frame, _state, stream) do + case Hpack.decode(stream.ref, frame.header_block_fragment) do + {:ok, headers} -> + :gen_statem.reply(from, :ok) - def recv(%RstStream{} = _frame, state, stream) - when state in [@open, @hc_local, @hc_remote, @closed] do - # IO.inspect(frame, label: "Got RST_STREAM") - {:next_state, :closed, stream} - end + stream = %Stream{stream | headers: stream.headers ++ headers} - def recv(frame, state, stream) do - """ - Unknown RECV on stream #{stream.id} - Frame: #{inspect(frame)} - State: #{inspect(state)} - """ - |> Logger.info() + if end_stream?, + do: {:next_state, @hc_remote, stream}, + else: {:keep_state, stream} - {:keep_state, stream} + _error -> + :gen_statem.reply(from, {:connection_error, :COMPRESSION_ERROR}) + {:stop, :normal} + end end - # Headers, PushPromise and Continuation frames must be calls - - def recv(from, %Headers{end_stream: true} = frame, _state, stream) do - {:ok, headers} = - Hpack.decode(stream.config.ref, frame.header_block_fragment) - - :gen_statem.reply(from, :ok) - - stream = %Stream{stream | headers: stream.headers ++ headers} - {:next_state, @hc_remote, stream} - end - - def recv(from, %Headers{end_stream: false} = frame, _state, stream) do - {:ok, headers} = - Hpack.decode(stream.config.ref, frame.header_block_fragment) - - :gen_statem.reply(from, :ok) - - stream = %Stream{stream | headers: stream.headers ++ headers} - {:keep_state, stream} + def recv(from, %RstStream{} = _frame, state, stream) + when state in [@open, @hc_local, @hc_remote, @closed] do + # IO.inspect(frame, label: "Got RST_STREAM") + {:next_state, :closed, stream, [{:reply, from, :ok}]} end - def recv(from, %PushPromise{} = frame, state, %{config: config} = stream) + def recv(from, %PushPromise{} = frame, state, %{ref: ref} = stream) when state in [@idle] do - {:ok, headers} = Hpack.decode(config.ref, frame.header_block_fragment) + {:ok, headers} = Hpack.decode(ref, frame.header_block_fragment) stream = %Stream{stream | headers: stream.headers ++ headers} @@ -181,8 +150,8 @@ defmodule Kadabra.Stream do {:next_state, @reserved_remote, stream} end - def recv(from, %Continuation{} = frame, _state, %{config: config} = stream) do - {:ok, headers} = Hpack.decode(config.ref, frame.header_block_fragment) + def recv(from, %Continuation{} = frame, _state, %{ref: ref} = stream) do + {:ok, headers} = Hpack.decode(ref, frame.header_block_fragment) :gen_statem.reply(from, :ok) @@ -190,11 +159,22 @@ defmodule Kadabra.Stream do {:keep_state, stream} end + def recv(frame, state, stream) do + """ + Unknown RECV on stream #{stream.id} + Frame: #{inspect(frame)} + State: #{inspect(state)} + """ + |> Logger.info() + + {:keep_state, stream} + end + # Enter Events - def handle_event(:enter, _old, @hc_remote, %{config: config} = stream) do + def handle_event(:enter, _old, @hc_remote, %{socket: socket} = stream) do bin = stream.id |> RstStream.new() |> Encodable.to_bin() - Socket.send(config.socket, bin) + Socket.send(socket, bin) :gen_statem.cast(self(), :close) {:keep_state, stream} @@ -203,14 +183,23 @@ defmodule Kadabra.Stream do def handle_event(:enter, _old, @closed, stream) do response = Response.new(stream.id, stream.headers, stream.body) Tasks.run(stream.on_response, response) - send(stream.connection, {:finished, stream.id}) - send(stream.config.client, {:end_stream, response}) + send(stream.client, {:end_stream, response}) - {:stop, :normal} + {:stop, {:shutdown, {:finished, stream.id}}} end def handle_event(:enter, _old, _new, stream), do: {:keep_state, stream} + # For SETTINGS initial_window_size and max_frame_size changes + def handle_event(:info, {:settings_change, window, max_frame}, _, stream) do + flow = + stream.flow + |> Stream.FlowControl.increment_window(window) + |> Stream.FlowControl.set_max_frame_size(max_frame) + + {:keep_state, %{stream | flow: flow}} + end + # Casts def handle_event(:cast, :close, _state, stream) do @@ -242,14 +231,21 @@ defmodule Kadabra.Stream do def handle_event({:call, from}, {:send_headers, request}, _state, stream) do %{headers: headers, body: payload, on_response: on_resp} = request - headers = add_headers(headers, stream.config) + headers = add_headers(headers, stream.uri) - {:ok, encoded} = Hpack.encode(stream.config.ref, headers) + {:ok, encoded} = Hpack.encode(stream.ref, headers) headers_payload = :erlang.iolist_to_binary(encoded) - flags = if payload, do: 0x4, else: 0x5 - h = Frame.binary_frame(@headers, flags, stream.id, headers_payload) - Socket.send(stream.config.socket, h) + bin = + %Frame.Headers{ + stream_id: stream.id, + header_block_fragment: headers_payload, + end_stream: is_nil(payload), + end_headers: true + } + |> Encodable.to_bin() + + Socket.send(stream.socket, bin) # Logger.info("Sending, Stream ID: #{stream.id}, size: #{byte_size(h)}") # Reply early for better performance @@ -260,16 +256,17 @@ defmodule Kadabra.Stream do stream.flow |> Stream.FlowControl.add(payload) |> Stream.FlowControl.process() + |> send_data_frames(stream.socket, stream.id) else stream.flow end stream = %{stream | flow: flow, on_response: on_resp} - {:next_state, @open, stream, []} + {:next_state, @open, stream} end - def add_headers(headers, %{uri: uri}) do + def add_headers(headers, uri) do h = headers ++ [ @@ -281,6 +278,24 @@ defmodule Kadabra.Stream do Enum.sort(h, fn {a, _b}, {c, _d} -> a < c end) end + def send_data_frames(flow_control, socket, stream_id) do + flow_control.out_queue + |> :queue.to_list() + |> Enum.each(fn {data, end_stream?} -> + send_data_frame(socket, stream_id, end_stream?, data) + end) + + %{flow_control | out_queue: :queue.new()} + end + + defp send_data_frame(socket, stream_id, end_stream?, data) do + bin = + %Frame.Data{stream_id: stream_id, end_stream: end_stream?, data: data} + |> Encodable.to_bin() + + Socket.send(socket, bin) + end + # Other Callbacks def init(stream), do: {:ok, @idle, stream} diff --git a/lib/stream/flow_control.ex b/lib/stream/flow_control.ex index 6d9df03..c0fd1cb 100644 --- a/lib/stream/flow_control.ex +++ b/lib/stream/flow_control.ex @@ -1,21 +1,18 @@ defmodule Kadabra.Stream.FlowControl do @moduledoc false - @default_window_size 65_535 + @default_max_frame round(:math.pow(2, 14)) + @default_window round(:math.pow(2, 16) - 1) defstruct queue: :queue.new(), - window: @default_window_size, - max_frame_size: 16_384, - socket: nil, - stream_id: nil - - alias Kadabra.{Encodable, Frame} - alias Kadabra.Connection.Socket + out_queue: :queue.new(), + window: @default_window, + max_frame_size: @default_max_frame @type t :: %__MODULE__{ max_frame_size: non_neg_integer, queue: :queue.queue(binary), - socket: pid, + out_queue: :queue.queue({binary, boolean}), window: integer } @@ -28,20 +25,17 @@ defmodule Kadabra.Stream.FlowControl do ## Examples - iex> new(stream_id: 1) - %Kadabra.Stream.FlowControl{stream_id: 1} + iex> new() + %Kadabra.Stream.FlowControl{} - iex> new(stream_id: 1, window: 20_000, max_frame_size: 18_000) - %Kadabra.Stream.FlowControl{stream_id: 1, window: 20_000, - max_frame_size: 18_000} + iex> new(window: 20_000, max_frame_size: 18_000) + %Kadabra.Stream.FlowControl{window: 20_000, max_frame_size: 18_000} """ @spec new(Keyword.t()) :: t def new(opts \\ []) do %__MODULE__{ - stream_id: Keyword.get(opts, :stream_id), - socket: Keyword.get(opts, :socket), - window: Keyword.get(opts, :window, @default_window_size), - max_frame_size: Keyword.get(opts, :max_frame_size, 16_384) + window: Keyword.get(opts, :window, @default_window), + max_frame_size: Keyword.get(opts, :max_frame_size, @default_max_frame) } end @@ -92,80 +86,36 @@ defmodule Kadabra.Stream.FlowControl do defp do_process(%{window: window} = flow, bin) when byte_size(bin) > window do %{ queue: queue, - max_frame_size: max_size, - socket: socket, - stream_id: stream_id + out_queue: out_queue, + max_frame_size: max_size } = flow {chunk, rem_bin} = :erlang.split_binary(bin, window) - max_size - |> split_packet(chunk) - |> send_partial_data(socket, stream_id) + payloads = split_packet(max_size, chunk) + out_queue = enqueue_partial(out_queue, payloads) queue = :queue.in_r(rem_bin, queue) flow |> Map.put(:queue, queue) + |> Map.put(:out_queue, out_queue) |> Map.put(:window, 0) |> process() end defp do_process(%{window: window} = flow_control, bin) do - %{ - max_frame_size: max_size, - socket: socket, - stream_id: stream_id - } = flow_control + %{max_frame_size: max_size, out_queue: out_queue} = flow_control - max_size - |> split_packet(bin) - |> send_data(socket, stream_id) + payloads = split_packet(max_size, bin) + out_queue = enqueue_complete(out_queue, payloads) flow_control |> Map.put(:window, window - byte_size(bin)) + |> Map.put(:out_queue, out_queue) |> process() end - def send_partial_data([], _socket, _stream_id), do: :ok - - def send_partial_data([bin | rest], socket, stream_id) do - p = - %Frame.Data{stream_id: stream_id, end_stream: false, data: bin} - |> Encodable.to_bin() - - Socket.send(socket, p) - send_partial_data(rest, socket, stream_id) - end - - def send_data([], _socket, _stream_id), do: :ok - - def send_data([bin | []], socket, stream_id) do - p = - %Frame.Data{stream_id: stream_id, end_stream: true, data: bin} - |> Encodable.to_bin() - - Socket.send(socket, p) - send_data([], socket, stream_id) - end - - def send_data([bin | rest], socket, stream_id) do - p = - %Frame.Data{stream_id: stream_id, end_stream: false, data: bin} - |> Encodable.to_bin() - - Socket.send(socket, p) - send_data(rest, socket, stream_id) - end - - def split_packet(size, p) when byte_size(p) >= size do - {chunk, rest} = :erlang.split_binary(p, size) - [chunk | split_packet(size, rest)] - end - - def split_packet(_size, <<>>), do: [] - def split_packet(_size, p), do: [p] - @doc ~S""" Increments stream window by given increment. @@ -192,4 +142,34 @@ defmodule Kadabra.Stream.FlowControl do def set_max_frame_size(flow_control, size) do %{flow_control | max_frame_size: size} end + + defp enqueue_complete(queue, []), do: queue + + defp enqueue_complete(queue, [payload | []]) do + {payload, true} + |> :queue.in(queue) + |> enqueue_complete([]) + end + + defp enqueue_complete(queue, [payload | rest]) do + {payload, false} + |> :queue.in(queue) + |> enqueue_complete(rest) + end + + defp enqueue_partial(queue, []), do: queue + + defp enqueue_partial(queue, [payload | rest]) do + {payload, false} + |> :queue.in(queue) + |> enqueue_partial(rest) + end + + defp split_packet(size, p) when byte_size(p) >= size do + {chunk, rest} = :erlang.split_binary(p, size) + [chunk | split_packet(size, rest)] + end + + defp split_packet(_size, <<>>), do: [] + defp split_packet(_size, p), do: [p] end diff --git a/lib/stream_set.ex b/lib/stream_set.ex new file mode 100644 index 0000000..f1ca18a --- /dev/null +++ b/lib/stream_set.ex @@ -0,0 +1,122 @@ +defmodule Kadabra.StreamSet do + @moduledoc false + + defstruct stream_id: 1, + active_stream_count: 0, + active_streams: %{}, + max_concurrent_streams: :infinite + + @type t :: %__MODULE__{ + stream_id: pos_integer, + active_stream_count: non_neg_integer, + active_streams: %{}, + max_concurrent_streams: non_neg_integer | :infinite + } + + @doc ~S""" + Increments current `stream_id`. + + ## Examples + + iex> set = %Kadabra.StreamSet{stream_id: 5} + iex> increment_stream_id(set) + %Kadabra.StreamSet{stream_id: 7} + """ + @spec increment_stream_id(t) :: t + def increment_stream_id(stream_set) do + %{stream_set | stream_id: stream_set.stream_id + 2} + end + + @doc ~S""" + Increments open stream count. + + ## Examples + + iex> set = %Kadabra.StreamSet{active_stream_count: 2} + iex> increment_active_stream_count(set) + %Kadabra.StreamSet{active_stream_count: 3} + """ + @spec increment_active_stream_count(t) :: t + def increment_active_stream_count(stream_set) do + %{stream_set | active_stream_count: stream_set.active_stream_count + 1} + end + + @doc ~S""" + Decrements open stream count. + + ## Examples + + iex> set = %Kadabra.StreamSet{active_stream_count: 2} + iex> decrement_active_stream_count(set) + %Kadabra.StreamSet{active_stream_count: 1} + """ + @spec decrement_active_stream_count(t) :: t + def decrement_active_stream_count(stream_set) do + %{stream_set | active_stream_count: stream_set.active_stream_count - 1} + end + + @doc ~S""" + Marks stream_id as active. + + ## Examples + + iex> set = add_active(%Kadabra.StreamSet{}, 1, :pid) + iex> set.active_streams + %{1 => :pid} + """ + def add_active(%{active_streams: active} = stream_set, stream_id, pid) do + %{stream_set | active_streams: Map.put(active, stream_id, pid)} + end + + @doc ~S""" + Removes stream_id from active set. + + ## Examples + + iex> set = remove_active(%Kadabra.StreamSet{ + ...> active_streams: %{1 => self(), 3 => self()}}, 1) + iex> set.active_streams + %{3 => self()} + + iex> set = remove_active(%Kadabra.StreamSet{ + ...> active_streams: %{1 => self()}}, self()) + iex> set.active_streams + %{} + """ + def remove_active(%{active_streams: active} = stream_set, pid) + when is_pid(pid) do + updated = + active + |> Enum.filter(fn {_, p} -> p != pid end) + |> Enum.into(%{}) + + %{stream_set | active_streams: updated} + end + + def remove_active(%{active_streams: active} = stream_set, stream_id) + when is_integer(stream_id) do + %{stream_set | active_streams: Map.delete(active, stream_id)} + end + + @doc ~S""" + Returns true if active_streams is less than max streams. + + ## Examples + + iex> set = %Kadabra.StreamSet{active_stream_count: 3, + ...> max_concurrent_streams: 100} + iex> can_send?(set) + true + + iex> set = %Kadabra.StreamSet{active_stream_count: 3, + ...> max_concurrent_streams: 1} + iex> can_send?(set) + false + """ + @spec can_send?(t) :: boolean + def can_send?(%{active_stream_count: count, max_concurrent_streams: max}) + when count < max, + do: true + + def can_send?(_else), do: false +end diff --git a/lib/stream_supervisor.ex b/lib/stream_supervisor.ex deleted file mode 100644 index af1950d..0000000 --- a/lib/stream_supervisor.ex +++ /dev/null @@ -1,31 +0,0 @@ -defmodule Kadabra.StreamSupervisor do - @moduledoc false - - use Supervisor - import Supervisor.Spec - - alias Kadabra.Stream - - def start_link(ref) do - name = via_tuple(ref) - Supervisor.start_link(__MODULE__, :ok, name: name) - end - - def via_tuple(ref) do - {:via, Registry, {Registry.Kadabra, {ref, __MODULE__}}} - end - - def start_opts(id \\ :erlang.make_ref()) do - [id: id, restart: :transient] - end - - def start_stream(%{ref: ref} = config, flow, stream_id \\ nil) do - stream = Stream.new(config, flow.settings, stream_id || flow.stream_id) - Supervisor.start_child(via_tuple(ref), [stream]) - end - - def init(:ok) do - spec = worker(Stream, [], start_opts()) - supervise([spec], strategy: :simple_one_for_one) - end -end diff --git a/lib/supervisor.ex b/lib/supervisor.ex index 02449a0..7fd746c 100644 --- a/lib/supervisor.ex +++ b/lib/supervisor.ex @@ -4,7 +4,7 @@ defmodule Kadabra.Supervisor do use Supervisor import Supervisor.Spec - alias Kadabra.{Connection, ConnectionQueue, Hpack, StreamSupervisor} + alias Kadabra.{Connection, ConnectionQueue, Hpack, Socket} def start_link(uri, pid, opts) do config = %Kadabra.Config{ @@ -21,18 +21,17 @@ defmodule Kadabra.Supervisor do Supervisor.stop(pid) end - def start_opts(id \\ :erlang.make_ref()) do - [id: id, restart: :transient] - end - def init(%Kadabra.Config{ref: ref} = config) do - config = %{config | supervisor: self()} + config = + config + |> Map.put(:supervisor, self()) + |> Map.put(:queue, ConnectionQueue.via_tuple(self())) children = [ - supervisor(StreamSupervisor, [ref], id: :stream_supervisor), + worker(ConnectionQueue, [self()], id: :connection_queue), + worker(Socket, [config], id: :socket), worker(Hpack, [ref, :encoder], id: :encoder), worker(Hpack, [ref, :decoder], id: :decoder), - worker(ConnectionQueue, [self()], id: :connection_queue), worker(Connection, [config], id: :connection) ] diff --git a/lib/tasks.ex b/lib/tasks.ex index fcdc2e3..ed9917f 100644 --- a/lib/tasks.ex +++ b/lib/tasks.ex @@ -1,6 +1,10 @@ defmodule Kadabra.Tasks do @moduledoc false + def run(fun) do + Task.Supervisor.start_child(Kadabra.Tasks, fn -> fun.() end) + end + def run(nil, _response), do: :ok def run(fun, response) do diff --git a/mix.exs b/mix.exs index 7c10db5..0f9a499 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Kadabra.Mixfile do use Mix.Project - @version "0.4.0" + @version "0.4.1" def project do [ @@ -50,7 +50,6 @@ defmodule Kadabra.Mixfile do {:hpack, "~> 0.2.3", hex: :hpack_erl}, {:gen_stage, "~> 0.13.1"}, {:ex_doc, "~> 0.14", only: :dev}, - {:dogma, "~> 0.1", only: :dev}, {:excoveralls, "~> 0.7", only: :test}, {:dialyxir, "~> 0.5", only: [:dev], runtime: false}, {:credo, "~> 0.8", only: [:dev, :test], runtime: false} diff --git a/test/connection/flow_control_test.exs b/test/connection/flow_control_test.exs index a24e60b..b32e654 100644 --- a/test/connection/flow_control_test.exs +++ b/test/connection/flow_control_test.exs @@ -1,4 +1,5 @@ defmodule Kadabra.Connection.FlowControlTest do use ExUnit.Case doctest Kadabra.Connection.FlowControl, import: true + doctest Kadabra.StreamSet, import: true end diff --git a/test/connection_test.exs b/test/connection_test.exs index 9983af6..5aaba0d 100644 --- a/test/connection_test.exs +++ b/test/connection_test.exs @@ -11,24 +11,25 @@ defmodule Kadabra.ConnectionTest do Kadabra.get(pid, "/clockstream", on_response: & &1) Kadabra.get(pid, "/clockstream", on_response: & &1) - {_, stream_sup_pid, _, _} = + {_, conn_pid, _, _} = pid |> Supervisor.which_children() - |> Enum.find(fn {name, _, _, _} -> name == :stream_supervisor end) + |> Enum.find(fn {name, _, _, _} -> name == :connection end) - {_, conn_pid, _, _} = + {_, socket_pid, _, _} = pid |> Supervisor.which_children() - |> Enum.find(fn {name, _, _, _} -> name == :connection end) + |> Enum.find(fn {name, _, _, _} -> name == :socket end) # Wait to collect some data on the streams Process.sleep(500) - assert Supervisor.count_children(stream_sup_pid).active == 2 + state = :sys.get_state(conn_pid).state + assert Enum.count(state.flow_control.stream_set.active_streams) == 2 # frame = Kadabra.Frame.Goaway.new(1) # GenServer.cast(conn_pid, {:recv, frame}) - send(conn_pid, {:ssl_closed, nil}) + send(socket_pid, {:ssl_closed, nil}) assert_receive {:closed, ^pid}, 5_000 assert_receive {:DOWN, ^ref, :process, ^pid, :normal}, 5_000 diff --git a/test/kadabra_test.exs b/test/kadabra_test.exs index 0dbcf07..7cda385 100644 --- a/test/kadabra_test.exs +++ b/test/kadabra_test.exs @@ -150,7 +150,7 @@ defmodule KadabraTest do {:end_stream, response} -> assert response.id == 1 assert response.status == 200 - assert byte_size(response.body) == 17668 + assert byte_size(response.body) == 17_668 after 5_000 -> flunk("No stream response received.") @@ -168,8 +168,7 @@ defmodule KadabraTest do assert byte_size(response.body) == 10_921_353 other -> - IO.inspect(other) - flunk("Unexpected response") + flunk("Unexpected response: #{inspect(other)}") after 45_000 -> flunk("No stream response received.") @@ -255,9 +254,9 @@ defmodule KadabraTest do |> elem(1) ref = Process.monitor(pid) - conn_pid = find_child(pid, :connection) + socket = find_child(pid, :socket) - send(conn_pid, {:ssl_closed, nil}) + send(socket, {:ssl_closed, nil}) assert_receive {:closed, ^pid}, 5_000 assert_receive {:DOWN, ^ref, :process, ^pid, :normal}, 5_000 diff --git a/test/stream_test.exs b/test/stream_test.exs index f0e0176..8853ef8 100644 --- a/test/stream_test.exs +++ b/test/stream_test.exs @@ -2,11 +2,11 @@ defmodule Kadabra.StreamTest do use ExUnit.Case doctest Kadabra.Stream - alias Kadabra.{Config, Connection, Frame, Stream} + alias Kadabra.{Config, Frame, Stream} describe "recv/3" do test "keeps state on unknown stream" do - stream = Stream.new(%Config{}, %Connection.Settings{}, 1) + stream = Stream.new(%Config{}, nil, nil, 1) # Individual streams shouldn't get pings ping = Frame.Ping.new() @@ -14,18 +14,21 @@ defmodule Kadabra.StreamTest do end test "closes stream on RST_STREAM" do - stream = Stream.new(%Config{}, %Connection.Settings{}, 1) + stream = Stream.new(%Config{}, nil, nil, 1) rst = Frame.RstStream.new(1) - assert {:next_state, :closed, ^stream} = Stream.recv(rst, :open, stream) + reply = [{:reply, self(), :ok}] + + assert {:next_state, :closed, ^stream, ^reply} = + Stream.recv(self(), rst, :open, stream) end test "closes stream on DATA with END_STREAM in hc_local state" do - stream = Stream.new(%Config{}, %Connection.Settings{}, 1) + stream = Stream.new(%Config{}, nil, nil, 1) data = %Frame.Data{stream_id: 1, data: "test", end_stream: true} assert {:next_state, :closed, _stream} = - Stream.recv(data, :half_closed_local, stream) + Stream.recv({self(), nil}, data, :half_closed_local, stream) end end end