diff --git a/.gitignore b/.gitignore index 65622bf..a8ed8ed 100644 --- a/.gitignore +++ b/.gitignore @@ -16,5 +16,4 @@ erl_crash.dump # Also ignore archive artifacts (built via "mix archive.build"). *.ez -mix.lock .iex.exs diff --git a/.travis.yml b/.travis.yml index cbd6619..b8a6829 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,5 +7,7 @@ matrix: otp_release: 20.3.1 - elixir: 1.6.6 otp_release: 20.1 + - elixir: 1.7.0 + otp_release: 21.0.3 script: - "MIX_ENV=test mix do deps.get, compile, coveralls.travis" diff --git a/CHANGELOG.md b/CHANGELOG.md index e6de241..0ab52f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## v0.4.3 +- Fixed supervisor crash report during normal connection shutdown +- Removed `GenStage` dependency +- GOAWAY error logger messages now disabled by default. + Re-enable with `config :kadabra, debug_log?: true` + ## v0.4.2 - Fixed `{:closed, pid}` task race condition during connection cleanup - Everything is supervised under `Kadabra.Application` again, instead of diff --git a/README.md b/README.md index a336769..c7382f1 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Add kadabra to your `mix.exs`: ```elixir def deps do [ - {:kadabra, "~> 0.4.0"} + {:kadabra, "~> 0.4.3"} ] end ``` diff --git a/lib/application.ex b/lib/application.ex index 9c08740..2ba4ef5 100644 --- a/lib/application.ex +++ b/lib/application.ex @@ -4,40 +4,12 @@ defmodule Kadabra.Application do use Application import Supervisor.Spec - alias Kadabra.Connection - - @app :kadabra - def start(_type, _args) do children = [ supervisor(Registry, [:unique, Registry.Kadabra]), supervisor(Task.Supervisor, [[name: Kadabra.Tasks]]) ] - Supervisor.start_link(children, strategy: :one_for_one, name: @app) - end - - def start_connection(uri, pid, opts) do - Supervisor.start_child( - @app, - supervisor(Kadabra.Supervisor, [uri, pid, opts], spec_opts()) - ) - end - - defp spec_opts do - ref = :erlang.make_ref() - [id: ref, restart: :transient] - end - - def ping(pid) do - pid - |> Connection.via_tuple() - |> Connection.ping() - end - - def close(pid) do - pid - |> Connection.via_tuple() - |> Connection.close() + Supervisor.start_link(children, strategy: :one_for_one, name: :kadabra) end end diff --git a/lib/config.ex b/lib/config.ex index 901dce5..e206941 100644 --- a/lib/config.ex +++ b/lib/config.ex @@ -2,8 +2,9 @@ defmodule Kadabra.Config do @moduledoc false defstruct client: nil, - supervisor: nil, - ref: nil, + queue: nil, + encoder: nil, + decoder: nil, uri: nil, socket: nil, queue: nil, @@ -11,8 +12,9 @@ defmodule Kadabra.Config do @type t :: %__MODULE__{ client: pid, - supervisor: pid, - ref: term, + queue: pid, + encoder: pid, + decoder: pid, uri: URI.t(), socket: pid, opts: Keyword.t() diff --git a/lib/connection.ex b/lib/connection.ex index 2e9b2c3..4455edf 100644 --- a/lib/connection.ex +++ b/lib/connection.ex @@ -9,7 +9,7 @@ defmodule Kadabra.Connection do local_settings: nil, queue: nil - use GenStage + use GenServer require Logger import Kernel, except: [send: 2] @@ -17,16 +17,11 @@ defmodule Kadabra.Connection do alias Kadabra.{ Config, Connection, - Encodable, - Error, - Frame, - Socket, - StreamSupervisor + Hpack, + Socket } - alias Kadabra.Frame.{Goaway, Ping} - - alias Kadabra.Connection.{FlowControl, Processor} + alias Kadabra.Connection.{Egress, FlowControl, Processor} @type t :: %__MODULE__{ buffer: binary, @@ -38,29 +33,34 @@ defmodule Kadabra.Connection do @type sock :: {:sslsocket, any, pid | {any, any}} - def start_link(%Config{supervisor: sup} = config) do - name = via_tuple(sup) - GenStage.start_link(__MODULE__, config, name: name) + def start_link(%Config{} = config) do + GenServer.start_link(__MODULE__, config) end - def via_tuple(ref) do - {:via, Registry, {Registry.Kadabra, {ref, __MODULE__}}} - end + def init(%Config{} = config) do + {:ok, encoder} = Hpack.start_link() + {:ok, decoder} = Hpack.start_link() + {:ok, socket} = Socket.start_link(config.uri, config.opts) + + config = + config + |> Map.put(:encoder, encoder) + |> Map.put(:decoder, decoder) + |> Map.put(:socket, socket) - def init(%Config{queue: queue} = config) do state = initial_state(config) + Kernel.send(self(), :start) Process.flag(:trap_exit, true) - {:consumer, state, subscribe_to: [queue]} + {:ok, state} end defp initial_state(%Config{opts: opts, queue: queue} = config) do settings = Keyword.get(opts, :settings, Connection.Settings.fastest()) - socket = config.supervisor |> Socket.via_tuple() %__MODULE__{ - config: %{config | socket: socket}, + config: config, queue: queue, local_settings: settings, flow_control: %FlowControl{} @@ -68,11 +68,11 @@ defmodule Kadabra.Connection do end def close(pid) do - GenStage.call(pid, :close) + GenServer.call(pid, :close) end def ping(pid) do - GenStage.cast(pid, {:send, :ping}) + GenServer.cast(pid, {:send, :ping}) end # handle_cast @@ -81,17 +81,13 @@ defmodule Kadabra.Connection do sendf(type, state) end - def handle_cast(_msg, state) do - {:noreply, [], state} - end - - def handle_events(events, _from, state) do + def handle_cast({:request, events}, state) do state = do_send_headers(events, state) - {:noreply, [], state} + {:noreply, state} end - def handle_subscribe(:producer, _opts, from, state) do - {:manual, %{state | queue: from}} + def handle_cast(_msg, state) do + {:noreply, state} end # handle_call @@ -102,25 +98,21 @@ defmodule Kadabra.Connection do config: config } = state - StreamSupervisor.stop(state.config.ref) - - bin = flow.stream_set.stream_id |> Goaway.new() |> Encodable.to_bin() - Socket.send(config.socket, bin) + Egress.send_goaway(config.socket, flow.stream_set.stream_id) {:stop, :shutdown, :ok, state} end # sendf - @spec sendf(:goaway | :ping, t) :: {:noreply, [], t} + @spec sendf(:goaway | :ping, t) :: {:noreply, t} def sendf(:ping, %Connection{config: config} = state) do - bin = Ping.new() |> Encodable.to_bin() - Socket.send(config.socket, bin) - {:noreply, [], state} + Egress.send_ping(config.socket) + {:noreply, state} end def sendf(_else, state) do - {:noreply, [], state} + {:noreply, state} end defp do_send_headers(request, %{flow_control: flow} = state) do @@ -132,66 +124,52 @@ defmodule Kadabra.Connection do %{state | flow_control: flow} end - def handle_info(:start, %{config: config} = state) do - config.supervisor - |> Socket.via_tuple() - |> Socket.set_active() - - bin = - %Frame.Settings{settings: state.local_settings} - |> Encodable.to_bin() + def handle_info(:start, %{config: %{socket: socket}} = state) do + Socket.set_active(socket) + Egress.send_local_settings(socket, state.local_settings) - config.supervisor - |> Socket.via_tuple() - |> Socket.send(bin) - - {:noreply, [], state} + {:noreply, state} end def handle_info({:closed, _pid}, state) do {:stop, :shutdown, state} end - def handle_info({:DOWN, _, _, _pid, {:shutdown, {:finished, sid}}}, state) do - GenStage.ask(state.queue, 1) + def handle_info({:EXIT, _pid, {:shutdown, {:finished, sid}}}, state) do + GenServer.cast(state.queue, {:ask, 1}) flow = state.flow_control |> FlowControl.finish_stream(sid) |> FlowControl.process(state.config) - {:noreply, [], %{state | flow_control: flow}} + {:noreply, %{state | flow_control: flow}} end def handle_info({:push_promise, stream}, %{config: config} = state) do Kernel.send(config.client, {:push_promise, stream}) - {:noreply, [], state} + {:noreply, state} end def handle_info({:recv, frame}, state) do case Processor.process(frame, state) do {:ok, state} -> - {:noreply, [], state} + {:noreply, state} {:connection_error, error, reason, state} -> - handle_connection_error(state, error, reason) + Egress.send_goaway( + state.config.socket, + state.flow_control.stream_set.stream_id, + error, + reason + ) + {:stop, {:shutdown, :connection_error}, state} end end - defp handle_connection_error(%{config: config} = state, error, reason) do - code = <> - - bin = - state.flow_control.stream_set.stream_id - |> Goaway.new(code, reason) - |> Encodable.to_bin() - - Socket.send(config.socket, bin) - end - def terminate(_reason, %{config: config}) do - Kernel.send(config.client, {:closed, config.supervisor}) + Kernel.send(config.client, {:closed, config.queue}) :ok end end diff --git a/lib/connection/egress.ex b/lib/connection/egress.ex new file mode 100644 index 0000000..770188e --- /dev/null +++ b/lib/connection/egress.ex @@ -0,0 +1,59 @@ +defmodule Kadabra.Connection.Egress do + @moduledoc false + + alias Kadabra.{Encodable, Error, Frame, Socket} + + alias Kadabra.Frame.{ + Goaway, + Ping, + WindowUpdate + } + + def send_goaway(socket, stream_id) do + bin = stream_id |> Goaway.new() |> Encodable.to_bin() + Socket.send(socket, bin) + end + + def send_goaway(socket, stream_id, error, reason) do + code = <> + + bin = + stream_id + |> Goaway.new(code, reason) + |> Encodable.to_bin() + + Socket.send(socket, bin) + end + + def send_ping(socket) do + bin = Ping.new() |> Encodable.to_bin() + Socket.send(socket, bin) + end + + def send_local_settings(socket, settings) do + bin = + %Frame.Settings{settings: settings} + |> Encodable.to_bin() + + Socket.send(socket, bin) + end + + @spec send_window_update(pid, non_neg_integer, integer) :: no_return + def send_window_update(socket, stream_id, bytes) + when bytes > 0 and bytes < 2_147_483_647 do + bin = + stream_id + |> WindowUpdate.new(bytes) + |> Encodable.to_bin() + + # Logger.info("Sending WINDOW_UPDATE on Stream #{stream_id} (#{bytes})") + Socket.send(socket, bin) + end + + def send_window_update(_socket, _stream_id, _bytes), do: :ok + + def send_settings_ack(socket) do + bin = Frame.Settings.ack() |> Encodable.to_bin() + Socket.send(socket, bin) + end +end diff --git a/lib/connection/flow_control.ex b/lib/connection/flow_control.ex index 0f3347c..6b8816c 100644 --- a/lib/connection/flow_control.ex +++ b/lib/connection/flow_control.ex @@ -13,7 +13,7 @@ defmodule Kadabra.Connection.FlowControl do max_frame_size: @default_max_frame_size, window: @default_window_size - alias Kadabra.{Config, StreamSet, StreamSupervisor} + alias Kadabra.{Config, Stream, StreamSet} @type t :: %__MODULE__{ queue: :queue.queue(), @@ -38,44 +38,17 @@ defmodule Kadabra.Connection.FlowControl do |> Map.put(:stream_set, new_set) end - @doc ~S""" - Increments available window. - - ## Examples - - iex> flow = %Kadabra.Connection.FlowControl{window: 1_000} - iex> increment_window(flow, 500) - %Kadabra.Connection.FlowControl{window: 1_500} - """ @spec increment_window(t, pos_integer) :: t def increment_window(%{window: window} = flow_control, amount) do %{flow_control | window: window + amount} end - @doc ~S""" - Decrements available window. - - ## Examples - - iex> flow = %Kadabra.Connection.FlowControl{window: 1_000} - iex> decrement_window(flow, 500) - %Kadabra.Connection.FlowControl{window: 500} - """ @spec decrement_window(t, pos_integer) :: t def decrement_window(%{window: window} = flow_control, amount) do %{flow_control | window: window - amount} end - @doc ~S""" - Adds new sendable item to the queue. - - ## Examples - - iex> flow = add(%Kadabra.Connection.FlowControl{}, %Kadabra.Request{}) - iex> :queue.len(flow.queue) - 1 - """ - @spec add(t, Kadabra.Request.t()) :: t + @spec add(t, any) :: t def add(%{queue: queue} = flow_control, requests) when is_list(requests) do queue = Enum.reduce(requests, queue, &:queue.in(&1, &2)) %{flow_control | queue: queue} @@ -86,8 +59,8 @@ defmodule Kadabra.Connection.FlowControl do %{flow_control | queue: queue} end - def add_active(%{stream_set: set} = flow_control, stream_id) do - new_set = StreamSet.add_active(set, stream_id) + def add_active(%{stream_set: set} = flow_control, stream_id, pid) do + new_set = StreamSet.add_active(set, stream_id, pid) %{flow_control | stream_set: new_set} end @@ -102,14 +75,14 @@ defmodule Kadabra.Connection.FlowControl do max_frame_size: max_frame } = flow - case StreamSupervisor.start_stream(config, stream_id, window, max_frame) do - {:ok, pid} -> - Process.monitor(pid) + stream = Stream.new(config, stream_id, window, max_frame) + case Stream.start_link(stream) do + {:ok, pid} -> size = byte_size(request.body || <<>>) :gen_statem.call(pid, {:send_headers, request}) - updated_set = add_stream(stream_set, stream_id) + updated_set = add_stream(stream_set, stream_id, pid) flow |> Map.put(:queue, queue) @@ -127,9 +100,9 @@ defmodule Kadabra.Connection.FlowControl do end end - defp add_stream(stream_set, stream_id) do + defp add_stream(stream_set, stream_id, pid) do stream_set - |> StreamSet.add_active(stream_id) + |> StreamSet.add_active(stream_id, pid) |> StreamSet.increment_active_stream_count() |> StreamSet.increment_stream_id() end @@ -144,20 +117,6 @@ defmodule Kadabra.Connection.FlowControl do %{flow_control | stream_set: new_set} end - @doc ~S""" - Returns true if window is positive. - - ## Examples - - iex> flow = %Kadabra.Connection.FlowControl{window: 500} - iex> can_send?(flow) - true - - iex> flow = %Kadabra.Connection.FlowControl{window: 0} - iex> can_send?(flow) - false - """ - @spec can_send?(t) :: boolean - def can_send?(%{window: bytes}) when bytes > 0, do: true - def can_send?(_else), do: false + defp can_send?(%{window: bytes}) when bytes > 0, do: true + defp can_send?(_else), do: false end diff --git a/lib/connection/processor.ex b/lib/connection/processor.ex index 7077fb9..76ad82a 100644 --- a/lib/connection/processor.ex +++ b/lib/connection/processor.ex @@ -5,15 +5,14 @@ defmodule Kadabra.Connection.Processor do alias Kadabra.{ Connection, - Encodable, Error, Frame, Hpack, - Socket, - StreamSupervisor + Stream, + StreamSet } - alias Kadabra.Connection.FlowControl + alias Kadabra.Connection.{Egress, FlowControl} alias Kadabra.Frame.{ Continuation, @@ -55,18 +54,19 @@ defmodule Kadabra.Connection.Processor do bin_size = byte_size(frame.data) size = min(available, bin_size) - send_window_update(config.socket, 0, size) - send_window_update(config.socket, stream_id, bin_size) + Egress.send_window_update(config.socket, 0, size) + Egress.send_window_update(config.socket, stream_id, bin_size) - StreamSupervisor.send_frame(config.ref, stream_id, frame) + pid = StreamSet.pid_for(state.flow_control.stream_set, stream_id) + Stream.call_recv(pid, frame) {:ok, %{state | remote_window: state.remote_window + size}} end def process(%Headers{stream_id: stream_id} = frame, state) do - state.config.ref - |> StreamSupervisor.send_frame(stream_id, frame) - |> case do + pid = StreamSet.pid_for(state.flow_control.stream_set, stream_id) + + case Stream.call_recv(pid, frame) do :ok -> {:ok, state} @@ -81,7 +81,8 @@ defmodule Kadabra.Connection.Processor do end def process(%RstStream{stream_id: stream_id} = frame, state) do - StreamSupervisor.send_frame(state.config.ref, stream_id, frame) + pid = StreamSet.pid_for(state.flow_control.stream_set, stream_id) + Stream.call_recv(pid, frame) {:ok, state} end @@ -90,16 +91,15 @@ defmodule Kadabra.Connection.Processor do def process(%Frame.Settings{ack: false, settings: nil}, state) do %{flow_control: flow, config: config} = state - bin = Frame.Settings.ack() |> Encodable.to_bin() - Socket.send(config.socket, bin) + Egress.send_settings_ack(config.socket) case flow.stream_set.max_concurrent_streams do :infinite -> - GenStage.ask(state.queue, 2_000_000_000) + GenServer.cast(state.queue, {:ask, 2_000_000_000}) max -> to_ask = max - flow.stream_set.active_stream_count - GenStage.ask(state.queue, to_ask) + GenServer.cast(state.queue, {:ask, to_ask}) end {:ok, state} @@ -120,31 +120,34 @@ defmodule Kadabra.Connection.Processor do notify_settings_change(old_settings, settings, flow) - config.ref - |> Hpack.via_tuple(:decoder) - |> Hpack.update_max_table_size(settings.max_header_list_size) + Hpack.update_max_table_size( + state.config.decoder, + settings.max_header_list_size + ) - bin = Frame.Settings.ack() |> Encodable.to_bin() - Socket.send(config.socket, bin) + Egress.send_settings_ack(config.socket) to_ask = settings.max_concurrent_streams - flow.stream_set.active_stream_count - GenStage.ask(state.queue, to_ask) + GenServer.cast(state.queue, {:ask, to_ask}) {:ok, %{state | flow_control: flow, remote_settings: settings}} end def process(%Frame.Settings{ack: true}, %{config: c} = state) do - c.ref - |> Hpack.via_tuple(:encoder) - |> Hpack.update_max_table_size(state.local_settings.max_header_list_size) + Hpack.update_max_table_size( + c.encoder, + state.local_settings.max_header_list_size + ) - c.ref - |> Hpack.via_tuple(:decoder) - |> Hpack.update_max_table_size(state.local_settings.max_header_list_size) + Hpack.update_max_table_size( + c.decoder, + state.local_settings.max_header_list_size + ) - send_huge_window_update(c.socket, state.remote_window) + available = FlowControl.window_max() - state.remote_window + Egress.send_window_update(c.socket, 0, available) {:ok, %{state | remote_window: FlowControl.window_max()}} end @@ -157,10 +160,12 @@ defmodule Kadabra.Connection.Processor do max_frame_size: max_frame } = flow_control - case StreamSupervisor.start_stream(config, stream_id, window, max_frame) do - {:ok, _pid} -> - StreamSupervisor.send_frame(config.ref, stream_id, frame) - state = add_active(state, stream_id) + stream = Stream.new(config, stream_id, window, max_frame) + + case Stream.start_link(stream) do + {:ok, pid} -> + Stream.call_recv(pid, frame) + state = add_active(state, stream_id, pid) {:ok, state} error -> @@ -205,53 +210,42 @@ defmodule Kadabra.Connection.Processor do end def process(%WindowUpdate{stream_id: stream_id} = frame, state) do - StreamSupervisor.send_frame(state.config.ref, stream_id, frame) + pid = StreamSet.pid_for(state.flow_control.stream_set, stream_id) + Stream.call_recv(pid, frame) + {:ok, state} end def process(%Continuation{stream_id: stream_id} = frame, state) do - StreamSupervisor.send_frame(state.config.ref, stream_id, frame) + pid = StreamSet.pid_for(state.flow_control.stream_set, stream_id) + Stream.call_recv(pid, frame) + {:ok, state} end def process(frame, state) do - """ - Unknown RECV on connection - Frame: #{inspect(frame)} - State: #{inspect(state)} - """ - |> Logger.info() + if debug_log?() do + """ + Unknown RECV on connection + Frame: #{inspect(frame)} + State: #{inspect(state)} + """ + |> Logger.info() + end {:ok, state} end - def add_active(state, stream_id) do - flow = FlowControl.add_active(state.flow_control, stream_id) + def add_active(state, stream_id, pid) do + flow = FlowControl.add_active(state.flow_control, stream_id, pid) %{state | flow_control: flow} end def log_goaway(%Goaway{last_stream_id: id, error_code: c, debug_data: b}) do - error = Error.parse(c) - Logger.error("Got GOAWAY, #{error}, Last Stream: #{id}, Rest: #{b}") - end - - @spec send_window_update(pid, non_neg_integer, integer) :: no_return - def send_window_update(socket, stream_id, bytes) - when bytes > 0 and bytes < 2_147_483_647 do - bin = - stream_id - |> WindowUpdate.new(bytes) - |> Encodable.to_bin() - - # Logger.info("Sending WINDOW_UPDATE on Stream #{stream_id} (#{bytes})") - Socket.send(socket, bin) - end - - def send_window_update(_socket, _stream_id, _bytes), do: :ok - - def send_huge_window_update(socket, remote_window) do - available = FlowControl.window_max() - remote_window - send_window_update(socket, 0, available) + if debug_log?() do + error = Error.parse(c) + Logger.error("Got GOAWAY, #{error}, Last Stream: #{id}, Rest: #{b}") + end end def notify_settings_change(old_settings, new_settings, %{stream_set: set}) do @@ -266,4 +260,6 @@ defmodule Kadabra.Connection.Processor do send(pid, {:settings_change, window_diff, max_frame_size}) end end + + defp debug_log?, do: Application.get_env(:kadabra, :debug_log?) end diff --git a/lib/connection/settings.ex b/lib/connection/settings.ex index 6a40c30..b0c7e42 100644 --- a/lib/connection/settings.ex +++ b/lib/connection/settings.ex @@ -130,30 +130,3 @@ defmodule Kadabra.Connection.Settings do end) end end - -defimpl Kadabra.Encodable, for: Kadabra.Connection.Settings do - def to_bin(settings) do - settings - |> Map.from_struct() - |> to_encoded_list() - |> Enum.join() - end - - def to_encoded_list(settings) do - Enum.reduce(settings, [], fn {k, v}, acc -> - case v do - nil -> acc - :infinite -> acc - v -> [encode_setting(k, v)] ++ acc - end - end) - end - - def encode_setting(:header_table_size, v), do: <<0x1::16, v::32>> - def encode_setting(:enable_push, true), do: <<0x2::16, 1::32>> - def encode_setting(:enable_push, false), do: <<0x2::16, 0::32>> - def encode_setting(:max_concurrent_streams, v), do: <<0x3::16, v::32>> - def encode_setting(:initial_window_size, v), do: <<0x4::16, v::32>> - def encode_setting(:max_frame_size, v), do: <<0x5::16, v::32>> - def encode_setting(:max_header_list_size, v), do: <<0x6::16, v::32>> -end diff --git a/lib/connection_pool.ex b/lib/connection_pool.ex new file mode 100644 index 0000000..a67353d --- /dev/null +++ b/lib/connection_pool.ex @@ -0,0 +1,111 @@ +defmodule Kadabra.ConnectionPool do + @moduledoc false + + defstruct connection: nil, + pending_demand: 0, + events: [] + + @type t :: %__MODULE__{ + connection: pid, + pending_demand: non_neg_integer, + events: [any, ...] + } + + alias Kadabra.Connection + + @spec start_link(URI.t(), pid, Keyword.t()) :: {:ok, pid} + def start_link(uri, pid, opts) do + config = %Kadabra.Config{ + client: pid, + uri: uri, + opts: opts + } + + GenServer.start_link(__MODULE__, config) + end + + def request(pid, requests) when is_list(requests) do + GenServer.call(pid, {:request, requests}) + end + + def request(pid, request) do + GenServer.call(pid, {:request, [request]}) + end + + def ping(pid), do: GenServer.call(pid, :ping) + + def close(pid), do: GenServer.call(pid, :close) + + ## Callbacks + + def init(config) do + config = %{config | queue: self()} + + Process.flag(:trap_exit, true) + + {:ok, connection} = Connection.start_link(config) + {:ok, %__MODULE__{connection: connection}} + end + + def handle_cast({:ask, demand}, state) do + state = + state + |> increment_demand(demand) + |> dispatch_events() + + {:noreply, state} + end + + def handle_call(:close, _from, state) do + Connection.close(state.connection) + {:stop, :shutdown, :ok, state} + end + + def handle_call(:ping, _from, state) do + Connection.ping(state.connection) + {:reply, :ok, state} + end + + def handle_call({:request, requests}, from, state) do + GenServer.reply(from, :ok) + + state = + state + |> enqueue(requests) + |> dispatch_events() + + {:noreply, state} + end + + def handle_info({:EXIT, _pid, :shutdown}, state) do + {:stop, :shutdown, state} + end + + def handle_info({:EXIT, _pid, {:shutdown, :connection_error}}, state) do + {:stop, :shutdown, state} + end + + def terminate(_reason, _state) do + :ok + end + + ## Private + + defp increment_demand(state, demand) do + state + |> Map.put(:pending_demand, state.pending_demand + demand) + end + + defp enqueue(state, requests) do + state + |> Map.put(:events, state.events ++ requests) + end + + defp dispatch_events(state) do + {to_dispatch, rest} = Enum.split(state.events, state.pending_demand) + new_pending = state.pending_demand - Enum.count(to_dispatch) + + GenServer.cast(state.connection, {:request, to_dispatch}) + %{state | pending_demand: new_pending, events: rest} + end +end diff --git a/lib/connection_queue.ex b/lib/connection_queue.ex deleted file mode 100644 index 8ebfffb..0000000 --- a/lib/connection_queue.ex +++ /dev/null @@ -1,58 +0,0 @@ -defmodule Kadabra.ConnectionQueue do - @moduledoc false - - use GenStage - - def start_link(sup) do - name = via_tuple(sup) - GenStage.start_link(__MODULE__, :ok, name: name) - end - - def via_tuple(ref) do - {:via, Registry, {Registry.Kadabra, {ref, __MODULE__}}} - end - - def init(:ok) do - {:producer, {:queue.new(), 0}, dispatcher: GenStage.BroadcastDispatcher} - end - - def queue_request(pid, request) do - pid - |> via_tuple() - |> GenStage.call({:request, request}) - end - - def handle_call({:request, request}, from, {queue, pending_demand}) do - GenStage.reply(from, :ok) - - queue - |> enqueue(request) - |> dispatch_events(pending_demand, []) - end - - def enqueue(queue, requests) when is_list(requests) do - Enum.reduce(requests, queue, &enqueue(&2, &1)) - end - - def enqueue(queue, request) do - :queue.in(request, queue) - end - - def handle_demand(incoming_demand, {queue, pending_demand}) do - dispatch_events(queue, incoming_demand + pending_demand, []) - end - - defp dispatch_events(queue, 0, events) do - {:noreply, Enum.reverse(events), {queue, 0}} - end - - defp dispatch_events(queue, demand, events) do - case :queue.out(queue) do - {{:value, event}, queue} -> - dispatch_events(queue, demand - 1, [event | events]) - - {:empty, queue} -> - {:noreply, Enum.reverse(events), {queue, demand}} - end - end -end diff --git a/lib/encodable.ex b/lib/encodable.ex index 700bba1..b1db780 100644 --- a/lib/encodable.ex +++ b/lib/encodable.ex @@ -27,6 +27,136 @@ defprotocol Kadabra.Encodable do def to_bin(frame) end +defimpl Kadabra.Encodable, for: Kadabra.Frame do + alias Kadabra.Frame + + def to_bin(%{type: type, flags: flags, stream_id: sid, payload: p}) do + Frame.binary_frame(type, flags, sid, p) + end +end + +defimpl Kadabra.Encodable, for: Kadabra.Frame.Data do + alias Kadabra.Frame + + @data 0x0 + + def to_bin(%{end_stream: end_stream, stream_id: stream_id, data: data}) do + flags = if end_stream, do: 0x1, else: 0x0 + Frame.binary_frame(@data, flags, stream_id, data) + end +end + +defimpl Kadabra.Encodable, for: Kadabra.Frame.Headers do + alias Kadabra.Frame + + @headers 0x1 + + def to_bin(%{header_block_fragment: block, stream_id: sid} = frame) do + flags = flags(frame) + Frame.binary_frame(@headers, flags, sid, block) + end + + defp flags(%{end_headers: false, end_stream: true}), do: 0x1 + defp flags(%{end_headers: true, end_stream: false}), do: 0x4 + defp flags(%{end_headers: true, end_stream: true}), do: 0x5 +end + +defimpl Kadabra.Encodable, for: Kadabra.Frame.RstStream do + alias Kadabra.Frame + + @rst_stream 0x3 + + def to_bin(frame) do + Frame.binary_frame(@rst_stream, 0x0, frame.stream_id, frame.error_code) + end +end + +defimpl Kadabra.Encodable, for: Kadabra.Frame.Settings do + alias Kadabra.{Encodable, Frame} + + @settings 0x4 + + def to_bin(frame) do + ack = if frame.ack, do: 0x1, else: 0x0 + + case frame.settings do + nil -> + Frame.binary_frame(@settings, ack, 0x0, <<>>) + + settings -> + p = settings |> Encodable.to_bin() + Frame.binary_frame(@settings, ack, 0x0, p) + end + end +end + +defimpl Kadabra.Encodable, for: Kadabra.Frame.Ping do + alias Kadabra.Frame + + def to_bin(frame) do + ack = if frame.ack, do: 0x1, else: 0x0 + Frame.binary_frame(0x6, ack, 0x0, frame.data) + end +end + +defimpl Kadabra.Encodable, for: Kadabra.Frame.Goaway do + alias Kadabra.Frame + + @goaway 0x7 + + def to_bin(%{last_stream_id: id, error_code: error}) do + payload = <<0::1, id::31>> <> error + Frame.binary_frame(@goaway, 0x0, 0, payload) + end +end + +defimpl Kadabra.Encodable, for: Kadabra.Frame.WindowUpdate do + alias Kadabra.Frame + + @window_update 0x8 + + def to_bin(frame) do + size = <> + Frame.binary_frame(@window_update, 0x0, frame.stream_id, size) + end +end + +defimpl Kadabra.Encodable, for: Kadabra.Frame.Continuation do + alias Kadabra.Frame + + def to_bin(frame) do + f = if frame.end_headers, do: 0x4, else: 0x0 + Frame.binary_frame(0x9, f, frame.stream_id, frame.header_block_fragment) + end +end + +defimpl Kadabra.Encodable, for: Kadabra.Connection.Settings do + def to_bin(settings) do + settings + |> Map.from_struct() + |> to_encoded_list() + |> Enum.join() + end + + def to_encoded_list(settings) do + Enum.reduce(settings, [], fn {k, v}, acc -> + case v do + nil -> acc + :infinite -> acc + v -> [encode_setting(k, v)] ++ acc + end + end) + end + + def encode_setting(:header_table_size, v), do: <<0x1::16, v::32>> + def encode_setting(:enable_push, true), do: <<0x2::16, 1::32>> + def encode_setting(:enable_push, false), do: <<0x2::16, 0::32>> + def encode_setting(:max_concurrent_streams, v), do: <<0x3::16, v::32>> + def encode_setting(:initial_window_size, v), do: <<0x4::16, v::32>> + def encode_setting(:max_frame_size, v), do: <<0x5::16, v::32>> + def encode_setting(:max_header_list_size, v), do: <<0x6::16, v::32>> +end + defimpl Kadabra.Encodable, for: Any do def to_bin(_), do: :error end diff --git a/lib/frame.ex b/lib/frame.ex index 7acdff2..2f12527 100644 --- a/lib/frame.ex +++ b/lib/frame.ex @@ -51,11 +51,3 @@ defmodule Kadabra.Frame do <> end end - -defimpl Kadabra.Encodable, for: Kadabra.Frame do - alias Kadabra.Frame - - def to_bin(%{type: type, flags: flags, stream_id: sid, payload: p}) do - Frame.binary_frame(type, flags, sid, p) - end -end diff --git a/lib/frame/continuation.ex b/lib/frame/continuation.ex index 6cb3be4..f6e2195 100644 --- a/lib/frame/continuation.ex +++ b/lib/frame/continuation.ex @@ -3,6 +3,8 @@ defmodule Kadabra.Frame.Continuation do defstruct [:header_block_fragment, :stream_id, end_headers: false] + use Bitwise + @type t :: %__MODULE__{ end_headers: boolean, header_block_fragment: bitstring, @@ -10,7 +12,6 @@ defmodule Kadabra.Frame.Continuation do } alias Kadabra.Frame - alias Kadabra.Frame.Flags @doc ~S""" Initializes a new `Frame.Continuation` given a `Frame`. @@ -26,17 +27,12 @@ defmodule Kadabra.Frame.Continuation do def new(%Frame{stream_id: id, payload: payload, flags: flags}) do %__MODULE__{ header_block_fragment: payload, - end_headers: Flags.end_headers?(flags), + end_headers: end_headers?(flags), stream_id: id } end -end -defimpl Kadabra.Encodable, for: Kadabra.Frame.Continuation do - alias Kadabra.Frame - - def to_bin(frame) do - f = if frame.end_headers, do: 0x4, else: 0x0 - Frame.binary_frame(0x9, f, frame.stream_id, frame.header_block_fragment) - end + @spec end_headers?(non_neg_integer) :: boolean + defp end_headers?(flags) when (flags &&& 4) == 4, do: true + defp end_headers?(_), do: false end diff --git a/lib/frame/data.ex b/lib/frame/data.ex index 3a6f1c5..fc6d127 100644 --- a/lib/frame/data.ex +++ b/lib/frame/data.ex @@ -3,8 +3,9 @@ defmodule Kadabra.Frame.Data do defstruct [:stream_id, :data, end_stream: false] + use Bitwise + alias Kadabra.Frame - alias Kadabra.Frame.Flags @type t :: %__MODULE__{ data: binary, @@ -17,18 +18,11 @@ defmodule Kadabra.Frame.Data do %__MODULE__{ data: data, stream_id: stream_id, - end_stream: Flags.end_stream?(flags) + end_stream: end_stream?(flags) } end -end - -defimpl Kadabra.Encodable, for: Kadabra.Frame.Data do - alias Kadabra.Frame - - @data 0x0 - def to_bin(%{end_stream: end_stream, stream_id: stream_id, data: data}) do - flags = if end_stream, do: 0x1, else: 0x0 - Frame.binary_frame(@data, flags, stream_id, data) - end + @spec end_stream?(non_neg_integer) :: boolean + defp end_stream?(flags) when (flags &&& 1) == 1, do: true + defp end_stream?(_), do: false end diff --git a/lib/frame/flags.ex b/lib/frame/flags.ex deleted file mode 100644 index 5629f95..0000000 --- a/lib/frame/flags.ex +++ /dev/null @@ -1,94 +0,0 @@ -defmodule Kadabra.Frame.Flags do - @moduledoc false - - use Bitwise - - @doc ~S""" - Returns `ACK` flag. - - ## Examples - - iex> Kadabra.Frame.Flags.ack - <<0::7, 1::1>> - """ - @spec ack :: binary - def ack, do: <<0::7, 1::1>> - - @doc ~S""" - Returns `true` if bit 0 is set to 1. - - ## Examples - - iex> Kadabra.Frame.Flags.ack?(0x1) - true - - iex> Kadabra.Frame.Flags.ack?(0) - false - """ - @spec ack?(non_neg_integer) :: boolean - def ack?(flags) when (flags &&& 1) == 1, do: true - def ack?(_), do: false - - @doc ~S""" - Returns `true` if bit 5 is set to 1. - - ## Examples - - iex> Kadabra.Frame.Flags.priority?(0x20) - true - - iex> Kadabra.Frame.Flags.priority?(0x4) - false - """ - @spec priority?(non_neg_integer) :: boolean - def priority?(flags) when (flags &&& 0x20) == 0x20, do: true - def priority?(_), do: false - - @doc ~S""" - Returns `true` if bit 3 is set to 1. - - ## Examples - - iex> Kadabra.Frame.Flags.padded?(0x8) - true - - iex> Kadabra.Frame.Flags.padded?(0x7) - false - """ - @spec padded?(non_neg_integer) :: boolean - def padded?(flags) when (flags &&& 8) == 8, do: true - def padded?(_), do: false - - @doc ~S""" - Returns `true` if bit 2 is set to 1. - - ## Examples - - iex> Kadabra.Frame.Flags.end_headers?(0x4) - true - - iex> Kadabra.Frame.Flags.end_headers?(0x5) - true - - iex> Kadabra.Frame.Flags.end_headers?(0x1) - false - """ - @spec end_headers?(non_neg_integer) :: boolean - def end_headers?(flags) when (flags &&& 4) == 4, do: true - def end_headers?(_), do: false - - @doc ~S""" - Returns `true` if bit 0 is set to 1. - - ## Examples - - iex> Kadabra.Frame.Flags.end_stream?(0x1) - true - - iex> Kadabra.Frame.Flags.end_stream?(0x0) - false - """ - @spec end_stream?(non_neg_integer) :: boolean - def end_stream?(flags) when (flags &&& 1) == 1, do: true - def end_stream?(_), do: false -end diff --git a/lib/frame/goaway.ex b/lib/frame/goaway.ex index b43a54a..e94afbd 100644 --- a/lib/frame/goaway.ex +++ b/lib/frame/goaway.ex @@ -51,14 +51,3 @@ defmodule Kadabra.Frame.Goaway do } end end - -defimpl Kadabra.Encodable, for: Kadabra.Frame.Goaway do - alias Kadabra.Frame - - @goaway 0x7 - - def to_bin(%{last_stream_id: id, error_code: error}) do - payload = <<0::1, id::31>> <> error - Frame.binary_frame(@goaway, 0x0, 0, payload) - end -end diff --git a/lib/frame/headers.ex b/lib/frame/headers.ex index cf649aa..94b07e7 100644 --- a/lib/frame/headers.ex +++ b/lib/frame/headers.ex @@ -21,8 +21,9 @@ defmodule Kadabra.Frame.Headers do weight: non_neg_integer } + use Bitwise + alias Kadabra.Frame - alias Kadabra.Frame.Flags @doc ~S""" Initializes a new `Frame.Headers` from given `Frame`. @@ -49,7 +50,7 @@ defmodule Kadabra.Frame.Headers do |> put_flags(flags) |> Map.put(:stream_id, stream_id) - if Flags.priority?(flags) do + if priority?(flags) do put_priority(frame, p) else Map.put(frame, :header_block_fragment, p) @@ -58,9 +59,9 @@ defmodule Kadabra.Frame.Headers do defp put_flags(frame, flags) do frame - |> Map.put(:end_stream, Flags.end_stream?(flags)) - |> Map.put(:end_headers, Flags.end_headers?(flags)) - |> Map.put(:priority, Flags.priority?(flags)) + |> Map.put(:end_stream, end_stream?(flags)) + |> Map.put(:end_headers, end_headers?(flags)) + |> Map.put(:priority, priority?(flags)) end defp put_priority(frame, payload) do @@ -72,19 +73,16 @@ defmodule Kadabra.Frame.Headers do |> Map.put(:weight, weight + 1) |> Map.put(:header_block_fragment, headers) end -end -defimpl Kadabra.Encodable, for: Kadabra.Frame.Headers do - alias Kadabra.Frame + @spec end_stream?(non_neg_integer) :: boolean + defp end_stream?(flags) when (flags &&& 1) == 1, do: true + defp end_stream?(_), do: false - @headers 0x1 - - def to_bin(%{header_block_fragment: block, stream_id: sid} = frame) do - flags = flags(frame) - Frame.binary_frame(@headers, flags, sid, block) - end + @spec end_headers?(non_neg_integer) :: boolean + defp end_headers?(flags) when (flags &&& 4) == 4, do: true + defp end_headers?(_), do: false - defp flags(%{end_headers: false, end_stream: true}), do: 0x1 - defp flags(%{end_headers: true, end_stream: false}), do: 0x4 - defp flags(%{end_headers: true, end_stream: true}), do: 0x5 + @spec priority?(non_neg_integer) :: boolean + defp priority?(flags) when (flags &&& 0x20) == 0x20, do: true + defp priority?(_), do: false end diff --git a/lib/frame/ping.ex b/lib/frame/ping.ex index 28fef67..95f29f2 100644 --- a/lib/frame/ping.ex +++ b/lib/frame/ping.ex @@ -3,8 +3,9 @@ defmodule Kadabra.Frame.Ping do defstruct [:data, stream_id: 0, ack: false] + use Bitwise + alias Kadabra.Frame - alias Kadabra.Frame.Flags @type t :: %__MODULE__{ ack: boolean, @@ -44,19 +45,13 @@ defmodule Kadabra.Frame.Ping do @spec new(Frame.t()) :: t def new(%Frame{type: 0x6, payload: <>, flags: flags, stream_id: sid}) do %__MODULE__{ - ack: Flags.ack?(flags), + ack: ack?(flags), data: <>, stream_id: sid } end -end -defimpl Kadabra.Encodable, for: Kadabra.Frame.Ping do - alias Kadabra.Frame - alias Kadabra.Frame.Flags - - def to_bin(frame) do - ack = if frame.ack, do: Flags.ack(), else: 0x0 - Frame.binary_frame(0x6, ack, 0x0, frame.data) - end + @spec ack?(non_neg_integer) :: boolean + defp ack?(flags) when (flags &&& 1) == 1, do: true + defp ack?(_), do: false end diff --git a/lib/frame/push_promise.ex b/lib/frame/push_promise.ex index 3949f19..581aa6d 100644 --- a/lib/frame/push_promise.ex +++ b/lib/frame/push_promise.ex @@ -5,8 +5,9 @@ defmodule Kadabra.Frame.PushPromise do header_block_fragment: nil, stream_id: nil + use Bitwise + alias Kadabra.Frame - alias Kadabra.Frame.Flags @type t :: %__MODULE__{ end_headers: boolean, @@ -29,7 +30,11 @@ defmodule Kadabra.Frame.PushPromise do %__MODULE__{ stream_id: id, header_block_fragment: headers, - end_headers: Flags.end_headers?(f) + end_headers: end_headers?(f) } end + + @spec end_headers?(non_neg_integer) :: boolean + defp end_headers?(flags) when (flags &&& 4) == 4, do: true + defp end_headers?(_), do: false end diff --git a/lib/frame/rst_stream.ex b/lib/frame/rst_stream.ex index fc465cc..cf9abe9 100644 --- a/lib/frame/rst_stream.ex +++ b/lib/frame/rst_stream.ex @@ -36,13 +36,3 @@ defmodule Kadabra.Frame.RstStream do } end end - -defimpl Kadabra.Encodable, for: Kadabra.Frame.RstStream do - alias Kadabra.Frame - - @rst_stream 0x3 - - def to_bin(frame) do - Frame.binary_frame(@rst_stream, 0x0, frame.stream_id, frame.error_code) - end -end diff --git a/lib/frame/settings.ex b/lib/frame/settings.ex index 12cc27f..ec09b29 100644 --- a/lib/frame/settings.ex +++ b/lib/frame/settings.ex @@ -3,8 +3,9 @@ defmodule Kadabra.Frame.Settings do defstruct [:settings, ack: false] + use Bitwise + alias Kadabra.Connection - alias Kadabra.Frame.Flags @type t :: %__MODULE__{ ack: boolean, @@ -17,55 +18,36 @@ defmodule Kadabra.Frame.Settings do end def new(%{payload: "", flags: flags}) do - %__MODULE__{settings: nil, ack: Flags.ack?(flags)} + %__MODULE__{settings: nil, ack: ack?(flags)} end def new(%{payload: p, flags: flags}) do s_list = parse_settings(p) case put_settings(%Connection.Settings{}, s_list) do - {:ok, settings} -> %__MODULE__{settings: settings, ack: Flags.ack?(flags)} + {:ok, settings} -> %__MODULE__{settings: settings, ack: ack?(flags)} {:error, code, _settings} -> {:error, code} end end - @spec ack?(0 | 1) :: boolean - def ack?(1), do: true - def ack?(0), do: false - @spec parse_settings(bitstring) :: [{char, non_neg_integer}, ...] | [] - def parse_settings(<<>>), do: [] + defp parse_settings(<<>>), do: [] - def parse_settings(bin) do + defp parse_settings(bin) do <> = bin [{identifier, value}] ++ parse_settings(rest) end - def put_settings(settings, []), do: {:ok, settings} + defp put_settings(settings, []), do: {:ok, settings} - def put_settings(settings, [{ident, value} | rest]) do + defp put_settings(settings, [{ident, value} | rest]) do case Connection.Settings.put(settings, ident, value) do {:ok, settings} -> put_settings(settings, rest) {:error, code, settings} -> {:error, code, settings} end end -end - -defimpl Kadabra.Encodable, for: Kadabra.Frame.Settings do - alias Kadabra.{Encodable, Frame} - - @settings 0x4 - def to_bin(frame) do - ack = if frame.ack, do: 0x1, else: 0x0 - - case frame.settings do - nil -> - Frame.binary_frame(@settings, ack, 0x0, <<>>) - - settings -> - p = settings |> Encodable.to_bin() - Frame.binary_frame(@settings, ack, 0x0, p) - end - end + @spec ack?(non_neg_integer) :: boolean + defp ack?(flags) when (flags &&& 1) == 1, do: true + defp ack?(_), do: false end diff --git a/lib/frame/window_update.ex b/lib/frame/window_update.ex index b395a2c..f268496 100644 --- a/lib/frame/window_update.ex +++ b/lib/frame/window_update.ex @@ -26,14 +26,3 @@ defmodule Kadabra.Frame.WindowUpdate do } end end - -defimpl Kadabra.Encodable, for: Kadabra.Frame.WindowUpdate do - alias Kadabra.Frame - - @window_update 0x8 - - def to_bin(frame) do - size = <> - Frame.binary_frame(@window_update, 0x0, frame.stream_id, size) - end -end diff --git a/lib/hpack.ex b/lib/hpack.ex index f86d4b1..2df53e0 100644 --- a/lib/hpack.ex +++ b/lib/hpack.ex @@ -3,29 +3,26 @@ defmodule Kadabra.Hpack do use GenServer - def start_link(ref, name) do - id = via_tuple(ref, name) - GenServer.start_link(__MODULE__, :ok, name: id) + def start_link do + GenServer.start_link(__MODULE__, :ok) end - def via_tuple(ref, name) do - {:via, Registry, {Registry.Kadabra, {ref, name}}} + def encode(pid, headers) do + GenServer.call(pid, {:encode, headers}) end - def init(:ok) do - {:ok, :hpack.new_context()} + def decode(pid, headers) do + GenServer.call(pid, {:decode, headers}) end - def encode(ref, headers) do - GenServer.call(via_tuple(ref, :encoder), {:encode, headers}) + def update_max_table_size(pid, size) do + GenServer.call(pid, {:new_max_table_size, size}) end - def decode(ref, headers) do - GenServer.call(via_tuple(ref, :decoder), {:decode, headers}) - end + ## Callbacks - def update_max_table_size(pid, size) do - GenServer.call(pid, {:new_max_table_size, size}) + def init(:ok) do + {:ok, :hpack.new_context()} end def handle_call({:encode, headers}, _pid, state) do diff --git a/lib/kadabra.ex b/lib/kadabra.ex index a296b29..49cc780 100644 --- a/lib/kadabra.ex +++ b/lib/kadabra.ex @@ -66,7 +66,9 @@ defmodule Kadabra do ``` """ - alias Kadabra.{ConnectionQueue, Request, Stream} + import Supervisor.Spec + + alias Kadabra.{ConnectionPool, Request, Stream} @typedoc ~S""" Options for connections. @@ -114,7 +116,10 @@ defmodule Kadabra do def open(uri, opts) when is_binary(uri) do uri = URI.parse(uri) - Kadabra.Application.start_connection(uri, self(), opts) + spec_opts = [id: :erlang.make_ref(), restart: :transient] + spec = worker(Kadabra.ConnectionPool, [uri, self(), opts], spec_opts) + + Supervisor.start_child(:kadabra, spec) end def open(uri, opts) when is_list(uri) do @@ -135,7 +140,7 @@ defmodule Kadabra do """ @spec close(pid) :: :ok def close(pid) do - Kadabra.Application.close(pid) + Kadabra.ConnectionPool.close(pid) end @doc ~S""" @@ -152,7 +157,7 @@ defmodule Kadabra do """ @spec ping(pid) :: no_return def ping(pid) do - Kadabra.Application.ping(pid) + Kadabra.ConnectionPool.ping(pid) end @doc ~S""" @@ -177,16 +182,16 @@ defmodule Kadabra do """ @spec request(pid, Request.t() | [Request.t()] | request_opts) :: no_return def request(pid, %Request{} = request) do - ConnectionQueue.queue_request(pid, request) + ConnectionPool.request(pid, [request]) end def request(pid, [%Request{} | _rest] = requests) do - ConnectionQueue.queue_request(pid, requests) + ConnectionPool.request(pid, requests) end def request(pid, opts) when is_list(opts) do request = Request.new(opts) - ConnectionQueue.queue_request(pid, request) + ConnectionPool.request(pid, request) end @doc ~S""" diff --git a/lib/socket.ex b/lib/socket.ex index 9218b3d..0d0b90d 100644 --- a/lib/socket.ex +++ b/lib/socket.ex @@ -3,7 +3,7 @@ defmodule Kadabra.Socket do defstruct socket: nil, buffer: "", active_user: nil - alias Kadabra.{Config, FrameParser} + alias Kadabra.FrameParser import Kernel, except: [send: 2] @@ -28,12 +28,11 @@ defmodule Kadabra.Socket do GenServer.call(pid, {:set_active, self()}) end - def start_link(%Config{supervisor: sup} = config) do - name = via_tuple(sup) - GenServer.start_link(__MODULE__, config, name: name) + def start_link(uri, opts) do + GenServer.start_link(__MODULE__, {uri, opts}) end - def init(%{uri: uri, opts: opts}) do + def init({uri, opts}) do case connect(uri, opts) do {:ok, socket} -> socket_send(socket, connection_preface()) @@ -44,10 +43,6 @@ defmodule Kadabra.Socket do end end - def via_tuple(ref) do - {:via, Registry, {Registry.Kadabra, {ref, __MODULE__}}} - end - @spec connect(URI.t(), Keyword.t()) :: connection_result defp connect(uri, opts) do case uri.scheme do diff --git a/lib/stream.ex b/lib/stream.ex index 58c7ed3..bc5cb92 100644 --- a/lib/stream.ex +++ b/lib/stream.ex @@ -6,7 +6,8 @@ defmodule Kadabra.Stream do client: nil, connection: nil, socket: nil, - ref: nil, + encoder: nil, + decoder: nil, flow: nil, uri: nil, headers: [], @@ -31,7 +32,6 @@ defmodule Kadabra.Stream do id: pos_integer, client: pid, connection: pid, - ref: term, uri: URI.t(), flow: Kadabra.Stream.FlowControl.t(), headers: [...], @@ -55,22 +55,17 @@ defmodule Kadabra.Stream do %__MODULE__{ id: stream_id, client: config.client, - ref: config.ref, uri: config.uri, socket: config.socket, + encoder: config.encoder, + decoder: config.decoder, connection: self(), flow: Stream.FlowControl.new(flow_opts) } end - def start_link(%Stream{id: id, ref: ref} = stream) do - ref - |> via_tuple(id) - |> :gen_statem.start_link(__MODULE__, stream, []) - end - - def via_tuple(ref, stream_id) do - {:via, Registry, {Registry.Kadabra, {ref, stream_id}}} + def start_link(%Stream{} = stream) do + :gen_statem.start_link(__MODULE__, stream, []) end def close(pid) do @@ -111,7 +106,7 @@ defmodule Kadabra.Stream do end def recv(from, %Headers{end_stream: end_stream?} = frame, _state, stream) do - case Hpack.decode(stream.ref, frame.header_block_fragment) do + case Hpack.decode(stream.decoder, frame.header_block_fragment) do {:ok, headers} -> :gen_statem.reply(from, :ok) @@ -133,9 +128,9 @@ defmodule Kadabra.Stream do {:next_state, :closed, stream, [{:reply, from, :ok}]} end - def recv(from, %PushPromise{} = frame, state, %{ref: ref} = stream) + def recv(from, %PushPromise{} = frame, state, stream) when state in [@idle] do - {:ok, headers} = Hpack.decode(ref, frame.header_block_fragment) + {:ok, headers} = Hpack.decode(stream.decoder, frame.header_block_fragment) stream = %Stream{stream | headers: stream.headers ++ headers} @@ -158,8 +153,8 @@ defmodule Kadabra.Stream do {:keep_state, %{stream | flow: flow}} end - def recv(from, %Continuation{} = frame, _state, %{ref: ref} = stream) do - {:ok, headers} = Hpack.decode(ref, frame.header_block_fragment) + def recv(from, %Continuation{} = frame, _state, stream) do + {:ok, headers} = Hpack.decode(stream.decoder, frame.header_block_fragment) :gen_statem.reply(from, :ok) @@ -239,7 +234,7 @@ defmodule Kadabra.Stream do def handle_event({:call, from}, {:send_headers, request}, _state, stream) do %{headers: headers, body: payload, on_response: on_resp} = request - headers_payload = encode_headers(stream.ref, headers, stream.uri) + headers_payload = encode_headers(stream.encoder, headers, stream.uri) max_size = stream.flow.max_frame_size send_headers(stream.socket, stream.id, headers_payload, payload, max_size) @@ -254,9 +249,9 @@ defmodule Kadabra.Stream do {:next_state, @open, stream} end - defp encode_headers(ref, headers, uri) do + defp encode_headers(pid, headers, uri) do headers = add_headers(headers, uri) - {:ok, encoded} = Hpack.encode(ref, headers) + {:ok, encoded} = Hpack.encode(pid, headers) :erlang.iolist_to_binary(encoded) end diff --git a/lib/stream_set.ex b/lib/stream_set.ex index 462c079..b9a78f1 100644 --- a/lib/stream_set.ex +++ b/lib/stream_set.ex @@ -3,7 +3,7 @@ defmodule Kadabra.StreamSet do defstruct stream_id: 1, active_stream_count: 0, - active_streams: MapSet.new(), + active_streams: %{}, max_concurrent_streams: :infinite @type t :: %__MODULE__{ @@ -13,90 +13,34 @@ defmodule Kadabra.StreamSet do max_concurrent_streams: non_neg_integer | :infinite } - @doc ~S""" - Increments current `stream_id`. - - ## Examples + def pid_for(set, stream_id) do + set.active_streams[stream_id] + end - iex> set = %Kadabra.StreamSet{stream_id: 5} - iex> increment_stream_id(set) - %Kadabra.StreamSet{stream_id: 7} - """ @spec increment_stream_id(t) :: t def increment_stream_id(stream_set) do %{stream_set | stream_id: stream_set.stream_id + 2} end - @doc ~S""" - Increments open stream count. - - ## Examples - - iex> set = %Kadabra.StreamSet{active_stream_count: 2} - iex> increment_active_stream_count(set) - %Kadabra.StreamSet{active_stream_count: 3} - """ @spec increment_active_stream_count(t) :: t def increment_active_stream_count(stream_set) do %{stream_set | active_stream_count: stream_set.active_stream_count + 1} end - @doc ~S""" - Decrements open stream count. - - ## Examples - - iex> set = %Kadabra.StreamSet{active_stream_count: 2} - iex> decrement_active_stream_count(set) - %Kadabra.StreamSet{active_stream_count: 1} - """ @spec decrement_active_stream_count(t) :: t def decrement_active_stream_count(stream_set) do %{stream_set | active_stream_count: stream_set.active_stream_count - 1} end - @doc ~S""" - Marks stream_id as active. - - ## Examples - - iex> set = add_active(%Kadabra.StreamSet{}, 1) - iex> set.active_streams - #MapSet<[1]> - """ - def add_active(%{active_streams: active} = stream_set, stream_id) do - %{stream_set | active_streams: MapSet.put(active, stream_id)} + def add_active(%{active_streams: active} = stream_set, stream_id, pid) do + %{stream_set | active_streams: Map.put(active, stream_id, pid)} end - @doc ~S""" - Removes stream_id from active set. - - ## Examples - - iex> set = %Kadabra.StreamSet{active_streams: MapSet.new([1, 3])} - iex> remove_active(set, 1).active_streams - #MapSet<[3]> - """ def remove_active(%{active_streams: active} = stream_set, stream_id) when is_integer(stream_id) do - %{stream_set | active_streams: MapSet.delete(active, stream_id)} + %{stream_set | active_streams: Map.delete(active, stream_id)} end - @doc ~S""" - Returns true if active_streams is less than max streams. - - ## Examples - - iex> set = %Kadabra.StreamSet{active_stream_count: 3, - ...> max_concurrent_streams: 100} - iex> can_send?(set) - true - - iex> set = %Kadabra.StreamSet{active_stream_count: 3, - ...> max_concurrent_streams: 1} - iex> can_send?(set) - false - """ @spec can_send?(t) :: boolean def can_send?(%{active_stream_count: count, max_concurrent_streams: max}) when count < max, diff --git a/lib/stream_supervisor.ex b/lib/stream_supervisor.ex deleted file mode 100644 index ed99834..0000000 --- a/lib/stream_supervisor.ex +++ /dev/null @@ -1,43 +0,0 @@ -defmodule Kadabra.StreamSupervisor do - @moduledoc false - - use Supervisor - import Supervisor.Spec - - alias Kadabra.Stream - - def start_link(ref) do - name = via_tuple(ref) - Supervisor.start_link(__MODULE__, :ok, name: name) - end - - def via_tuple(ref) do - {:via, Registry, {Registry.Kadabra, {ref, __MODULE__}}} - end - - def start_opts(id \\ :erlang.make_ref()) do - [id: id, restart: :transient] - end - - def start_stream(%{ref: ref} = config, stream_id, window, max_frame) do - stream = Stream.new(config, stream_id, window, max_frame) - Supervisor.start_child(via_tuple(ref), [stream]) - end - - def send_frame(ref, stream_id, frame) do - ref - |> Stream.via_tuple(stream_id) - |> Stream.call_recv(frame) - end - - def stop(ref) do - ref - |> via_tuple() - |> Supervisor.stop() - end - - def init(:ok) do - spec = worker(Stream, [], start_opts()) - supervise([spec], strategy: :simple_one_for_one) - end -end diff --git a/lib/supervisor.ex b/lib/supervisor.ex deleted file mode 100644 index 14d3815..0000000 --- a/lib/supervisor.ex +++ /dev/null @@ -1,43 +0,0 @@ -defmodule Kadabra.Supervisor do - @moduledoc false - - use Supervisor - - alias Kadabra.{Connection, ConnectionQueue, Hpack, Socket, StreamSupervisor} - - def start_link(uri, pid, opts) do - config = %Kadabra.Config{ - ref: :erlang.make_ref(), - client: pid, - uri: uri, - opts: opts - } - - Supervisor.start_link(__MODULE__, config) - end - - def worker_opts(id) do - [id: id, restart: :permanent] - end - - def init(%Kadabra.Config{ref: ref} = config) do - Process.flag(:trap_exit, true) - - config = - config - |> Map.put(:supervisor, self()) - |> Map.put(:queue, ConnectionQueue.via_tuple(self())) - - children = [ - supervisor(StreamSupervisor, [ref], worker_opts(:stream_supervisor)), - worker(ConnectionQueue, [self()], worker_opts(:connection_queue)), - worker(Socket, [config], worker_opts(:socket)), - worker(Hpack, [ref, :encoder], worker_opts(:encoder)), - worker(Hpack, [ref, :decoder], worker_opts(:decoder)), - worker(Connection, [config], worker_opts(:connection)) - ] - - # If anything crashes, something really bad happened - supervise(children, strategy: :one_for_all, max_restarts: 0) - end -end diff --git a/mix.exs b/mix.exs index ef37d98..e9473a5 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Kadabra.Mixfile do use Mix.Project - @version "0.4.2" + @version "0.4.3" def project do [ @@ -48,7 +48,6 @@ defmodule Kadabra.Mixfile do defp deps do [ {:hpack, "~> 0.2.3", hex: :hpack_erl}, - {:gen_stage, "~> 0.13.1"}, {:ex_doc, "~> 0.14", only: :dev}, {:excoveralls, "~> 0.7", only: :test}, {:dialyxir, "~> 0.5", only: [:dev], runtime: false}, diff --git a/mix.lock b/mix.lock new file mode 100644 index 0000000..d587186 --- /dev/null +++ b/mix.lock @@ -0,0 +1,31 @@ +%{ + "benchee": {:hex, :benchee, "0.11.0", "cf96e328ff5d69838dd89c21a9db22716bfcc6ef772e9d9dddf7ba622102722d", [], [{:deep_merge, "~> 0.1", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm"}, + "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm"}, + "certifi": {:hex, :certifi, "2.3.1", "d0f424232390bf47d82da8478022301c561cf6445b5b5fb6a84d49a9e76d2639", [:rebar3], [{:parse_trans, "3.2.0", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm"}, + "credo": {:hex, :credo, "0.9.3", "76fa3e9e497ab282e0cf64b98a624aa11da702854c52c82db1bf24e54ab7c97a", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:poison, ">= 0.0.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"}, + "deep_merge": {:hex, :deep_merge, "0.1.1", "c27866a7524a337b6a039eeb8dd4f17d458fd40fbbcb8c54661b71a22fffe846", [], [], "hexpm"}, + "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm"}, + "dogma": {:hex, :dogma, "0.1.16", "3c1532e2f63ece4813fe900a16704b8e33264da35fdb0d8a1d05090a3022eef9", [:mix], [{:poison, ">= 2.0.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"}, + "earmark": {:hex, :earmark, "1.2.5", "4d21980d5d2862a2e13ec3c49ad9ad783ffc7ca5769cf6ff891a4553fbaae761", [:mix], [], "hexpm"}, + "ex_doc": {:hex, :ex_doc, "0.19.0", "e22b6434373b4870ea77b24df069dbac7002c1f483615e9ebfc0c37497e1c75c", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.7", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"}, + "excoveralls": {:hex, :excoveralls, "0.9.1", "14fd20fac51ab98d8e79615814cc9811888d2d7b28e85aa90ff2e30dcf3191d6", [:mix], [{:hackney, ">= 0.12.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"}, + "exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm"}, + "gen_stage": {:hex, :gen_stage, "0.13.1", "edff5bca9cab22c5d03a834062515e6a1aeeb7665fb44eddae086252e39c4378", [:mix], [], "hexpm"}, + "hackney": {:hex, :hackney, "1.13.0", "24edc8cd2b28e1c652593833862435c80661834f6c9344e84b6a2255e7aeef03", [:rebar3], [{:certifi, "2.3.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.2", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, + "hpack": {:hex, :hpack_erl, "0.2.3", "17670f83ff984ae6cd74b1c456edde906d27ff013740ee4d9efaa4f1bf999633", [:rebar3], [], "hexpm"}, + "idna": {:hex, :idna, "5.1.2", "e21cb58a09f0228a9e0b95eaa1217f1bcfc31a1aaa6e1fdf2f53a33f7dbd9494", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"}, + "inch_ex": {:hex, :inch_ex, "0.5.6", "418357418a553baa6d04eccd1b44171936817db61f4c0840112b420b8e378e67", [:mix], [{:poison, "~> 1.5 or ~> 2.0 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"}, + "jason": {:hex, :jason, "1.0.0", "0f7cfa9bdb23fed721ec05419bcee2b2c21a77e926bce0deda029b5adc716fe2", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"}, + "jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [:mix, :rebar3], [], "hexpm"}, + "makeup": {:hex, :makeup, "0.5.1", "966c5c2296da272d42f1de178c1d135e432662eca795d6dc12e5e8787514edf7", [:mix], [{:nimble_parsec, "~> 0.2.2", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.8.0", "1204a2f5b4f181775a0e456154830524cf2207cf4f9112215c05e0b76e4eca8b", [:mix], [{:makeup, "~> 0.5.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 0.2.2", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, + "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"}, + "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], [], "hexpm"}, + "nimble_parsec": {:hex, :nimble_parsec, "0.2.2", "d526b23bdceb04c7ad15b33c57c4526bf5f50aaa70c7c141b4b4624555c68259", [:mix], [], "hexpm"}, + "pane": {:hex, :pane, "0.1.1", "4a9b46957a02991acbce012169ab7e8ecff74ad24886f94b142680062b10f167", [:mix], [], "hexpm"}, + "parse_trans": {:hex, :parse_trans, "3.2.0", "2adfa4daf80c14dc36f522cf190eb5c4ee3e28008fc6394397c16f62a26258c2", [:rebar3], [], "hexpm"}, + "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, + "scribe": {:hex, :scribe, "0.4.1", "4bf5395fb882995f705172b817b9a4ccae6ac83f9918fdedb6a2b9dafb57ec5f", [:mix], [{:pane, "~> 0.1", [hex: :pane, repo: "hexpm", optional: false]}], "hexpm"}, + "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], [], "hexpm"}, + "unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [:rebar3], [], "hexpm"}, +} diff --git a/test/connection/flow_control_test.exs b/test/connection/flow_control_test.exs deleted file mode 100644 index b32e654..0000000 --- a/test/connection/flow_control_test.exs +++ /dev/null @@ -1,5 +0,0 @@ -defmodule Kadabra.Connection.FlowControlTest do - use ExUnit.Case - doctest Kadabra.Connection.FlowControl, import: true - doctest Kadabra.StreamSet, import: true -end diff --git a/test/connection_test.exs b/test/connection_test.exs index 6851258..30bf975 100644 --- a/test/connection_test.exs +++ b/test/connection_test.exs @@ -11,20 +11,13 @@ defmodule Kadabra.ConnectionTest do Kadabra.get(pid, "/clockstream", on_response: & &1) Kadabra.get(pid, "/clockstream", on_response: & &1) - {_, conn_pid, _, _} = - pid - |> Supervisor.which_children() - |> Enum.find(fn {name, _, _, _} -> name == :connection end) - - {_, socket_pid, _, _} = - pid - |> Supervisor.which_children() - |> Enum.find(fn {name, _, _, _} -> name == :socket end) + conn_pid = :sys.get_state(pid).connection + socket_pid = :sys.get_state(conn_pid).config.socket # Wait to collect some data on the streams Process.sleep(500) - state = :sys.get_state(conn_pid).state + state = :sys.get_state(conn_pid) assert Enum.count(state.flow_control.stream_set.active_streams) == 2 # frame = Kadabra.Frame.Goaway.new(1) diff --git a/test/frame/flags_test.exs b/test/frame/flags_test.exs deleted file mode 100644 index 1db7015..0000000 --- a/test/frame/flags_test.exs +++ /dev/null @@ -1,4 +0,0 @@ -defmodule Kadabra.Frame.FlagsTest do - use ExUnit.Case - doctest Kadabra.Frame.Flags -end diff --git a/test/kadabra_test.exs b/test/kadabra_test.exs index c53c76f..7d6f9e6 100644 --- a/test/kadabra_test.exs +++ b/test/kadabra_test.exs @@ -4,7 +4,7 @@ defmodule KadabraTest do @moduletag report: [:pid] - alias Kadabra.{Connection, Stream} + alias Kadabra.Stream @golang_uri "https://http2.golang.org" @@ -14,11 +14,6 @@ defmodule KadabraTest do |> Kadabra.open() |> elem(1) - # on_exit(fn -> - # Process.sleep(500) - # if Process.alive?(pid), do: Kadabra.close(pid) - # end) - [conn: pid] end @@ -28,12 +23,10 @@ defmodule KadabraTest do opts = [port: 443] {:ok, pid} = Kadabra.open(@golang_uri, opts) - consumer = - pid - |> Connection.via_tuple() - |> :sys.get_state() + conn_pid = :sys.get_state(pid).connection + conn = :sys.get_state(conn_pid) - assert consumer.state.config.opts[:port] == 443 + assert conn.config.opts[:port] == 443 Kadabra.close(pid) end end @@ -254,23 +247,16 @@ defmodule KadabraTest do |> elem(1) ref = Process.monitor(pid) - socket = find_child(pid, :socket) - Process.sleep(500) + conn_pid = :sys.get_state(pid).connection + socket_pid = :sys.get_state(conn_pid).config.socket - send(socket, {:ssl_closed, nil}) + send(socket_pid, {:ssl_closed, nil}) assert_receive {:closed, ^pid}, 5_000 assert_receive {:DOWN, ^ref, :process, ^pid, :shutdown}, 5_000 end - defp find_child(pid, name) do - pid - |> Supervisor.which_children() - |> Enum.find(fn {n, _, _, _} -> n == name end) - |> elem(1) - end - # @tag :golang # test "handles extremely large headers", _context do # pid =