From fe4d5889e5469a7164a1069f44f8f63fababbc28 Mon Sep 17 00:00:00 2001 From: Michael Guarino Date: Tue, 7 Aug 2018 20:38:43 -0400 Subject: [PATCH] Implement exponential backoffs for critical functions --- lib/exlasticsearch/repo.ex | 16 +++++++- lib/exlasticsearch/retry/decorator.ex | 18 +++++++++ .../retry/exponential_backoff.ex | 32 ++++++++++++++++ lib/exlasticsearch/retry_strategy.ex | 8 ++++ mix.exs | 1 + mix.lock | 1 + test/exlasticsearch/repo_test.exs | 38 +++++++++++++++++++ test/support/test_model.ex | 12 ++++++ 8 files changed, 124 insertions(+), 2 deletions(-) create mode 100644 lib/exlasticsearch/retry/decorator.ex create mode 100644 lib/exlasticsearch/retry/exponential_backoff.ex create mode 100644 lib/exlasticsearch/retry_strategy.ex create mode 100644 test/exlasticsearch/repo_test.exs diff --git a/lib/exlasticsearch/repo.ex b/lib/exlasticsearch/repo.ex index 7a28d08..2f1edaa 100644 --- a/lib/exlasticsearch/repo.ex +++ b/lib/exlasticsearch/repo.ex @@ -10,6 +10,7 @@ defmodule ExlasticSearch.Repo do url: "https://elasticsearch.url.io:9200" """ use Scrivener + use ExlasticSearch.Retry.Decorator alias ExlasticSearch.{Indexable, Query, Response} alias Elastix.{Index, Mapping, Document, Bulk, Search, HTTP} require Logger @@ -93,14 +94,18 @@ defmodule ExlasticSearch.Repo do protocol prior to insertion """ @spec index(struct) :: response + @decorate retry() def index(%{__struct__: model} = struct) do id = Indexable.id(struct) document = Indexable.document(struct) - es_url() |> Document.index(model.__es_index__(), model.__doc_type__(), id, document) + + es_url() + |> Document.index(model.__es_index__(), model.__doc_type__(), id, document) + |> mark_failure() end @doc """ - Gets an ES document by id + Gets an ES document by _id """ @spec get(struct) :: response def get(%{__struct__: model} = struct) do @@ -130,9 +135,11 @@ defmodule ExlasticSearch.Repo do Removes `struct` from the index of its model """ @spec delete(struct) :: response + @decorate retry() def delete(%{__struct__: model} = struct) do es_url() |> Document.delete(model.__es_index__(), model.__doc_type__(), Indexable.id(struct)) + |> mark_failure() end @@ -157,6 +164,7 @@ defmodule ExlasticSearch.Repo do es_url() |> Bulk.post(bulk_request, [], opts) + |> mark_failure() end def index_stream(stream, parallelism \\ 10, demand \\ 10) do @@ -192,4 +200,8 @@ defmodule ExlasticSearch.Repo do end end defp decode(response, _, _), do: response + + defp mark_failure({:ok, %HTTPoison.Response{body: %{"_shards" => %{"successful" => 0}}} = result}), do: {:error, result} + defp mark_failure({:ok, %HTTPoison.Response{body: %{"errors" => true}} = result}), do: {:error, result} + defp mark_failure(result), do: result end diff --git a/lib/exlasticsearch/retry/decorator.ex b/lib/exlasticsearch/retry/decorator.ex new file mode 100644 index 0000000..c48cd2c --- /dev/null +++ b/lib/exlasticsearch/retry/decorator.ex @@ -0,0 +1,18 @@ +defmodule ExlasticSearch.Retry.Decorator do + @moduledoc """ + Decorator for applying retry strategies to a function. Configure with + + ``` + config :exlasticsearch, :retry, strategy: MyStrategy, additional_opts + ``` + """ + use Decorator.Define, retry: 0 + @config Application.get_env(:exlasticsearch, :retry, []) + + def retry(body, _ctx) do + {strategy, config} = Keyword.pop(@config, :strategy, ExlasticSearch.Retry.ExponentialBackoff) + quote do + unquote(strategy).retry(fn -> unquote(body) end, unquote(config)) + end + end +end \ No newline at end of file diff --git a/lib/exlasticsearch/retry/exponential_backoff.ex b/lib/exlasticsearch/retry/exponential_backoff.ex new file mode 100644 index 0000000..1774f06 --- /dev/null +++ b/lib/exlasticsearch/retry/exponential_backoff.ex @@ -0,0 +1,32 @@ +defmodule ExlasticSearch.Retry.ExponentialBackoff do + @moduledoc """ + Retry Strategy implementation utilizing exponential backoffs + """ + @behaviour ExlasticSearch.RetryStrategy + + def retry(fun, opts) do + initial = Keyword.get(opts, :initial, 1) + max = Keyword.get(opts, :max, 3) + jitter = Keyword.get(opts, :jitter, 4) + + do_retry(fun, max, initial, jitter, 0) + end + + defp do_retry(fun, max, _, _, max), do: fun.() + defp do_retry(fun, max, initial, jitter, retry) do + case fun.() do + {:ok, result} -> {:ok, result} + {:error, _} -> + sleep(initial, retry, jitter) + |> :timer.sleep() + + do_retry(fun, max, initial, jitter, retry + 1) + end + end + + defp sleep(initial, retry, jitter) do + jitter = :rand.uniform(jitter) + exp = :math.pow(2, retry) |> round() + jitter + (initial * exp) + end +end \ No newline at end of file diff --git a/lib/exlasticsearch/retry_strategy.ex b/lib/exlasticsearch/retry_strategy.ex new file mode 100644 index 0000000..b4fe580 --- /dev/null +++ b/lib/exlasticsearch/retry_strategy.ex @@ -0,0 +1,8 @@ +defmodule ExlasticSearch.RetryStrategy do + @moduledoc """ + Behavior for retrying a 0-arity function according to some strategy + """ + @type response :: {:ok, any} | {:error, any} + @type callable :: (-> {:ok, any} | {:error, any}) + @callback retry(fnc :: callable, opts :: list) :: response +end \ No newline at end of file diff --git a/mix.exs b/mix.exs index 82053c2..498b820 100644 --- a/mix.exs +++ b/mix.exs @@ -35,6 +35,7 @@ defmodule Exlasticsearch.MixProject do {:elastix, "~> 0.5.0"}, {:ecto, "~> 2.1.0"}, {:scrivener_ecto, "~> 1.0"}, + {:decorator, "~> 1.2"}, {:ex_doc, ">= 0.0.0", only: :dev} ] end diff --git a/mix.lock b/mix.lock index 44bec18..9f39081 100644 --- a/mix.lock +++ b/mix.lock @@ -1,6 +1,7 @@ %{ "certifi": {:hex, :certifi, "2.3.1", "d0f424232390bf47d82da8478022301c561cf6445b5b5fb6a84d49a9e76d2639", [:rebar3], [{:parse_trans, "3.2.0", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm"}, "decimal": {:hex, :decimal, "1.5.0", "b0433a36d0e2430e3d50291b1c65f53c37d56f83665b43d79963684865beab68", [:mix], [], "hexpm"}, + "decorator": {:hex, :decorator, "1.2.3", "258681ae943e57bd92d821ea995e3994b4e0b62ae8404b5d892cb8b23b55b050", [:mix], [], "hexpm"}, "earmark": {:hex, :earmark, "1.2.5", "4d21980d5d2862a2e13ec3c49ad9ad783ffc7ca5769cf6ff891a4553fbaae761", [:mix], [], "hexpm"}, "ecto": {:hex, :ecto, "2.1.6", "29b45f393c2ecd99f83e418ea9b0a2af6078ecb30f401481abac8a473c490f84", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: true]}, {:decimal, "~> 1.2", [hex: :decimal, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.8.0", [hex: :mariaex, repo: "hexpm", optional: true]}, {:poison, "~> 2.2 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: true]}, {:poolboy, "~> 1.5", [hex: :poolboy, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.13.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:sbroker, "~> 1.0", [hex: :sbroker, repo: "hexpm", optional: true]}], "hexpm"}, "elastix": {:hex, :elastix, "0.5.0", "ff7b9b88c8bf3ba473f7f4bca76c658b2ca9fa59c7988aaea411ed5ed4c4cbd1", [:mix], [{:httpoison, ">= 0.7.0", [hex: :httpoison, repo: "hexpm", optional: false]}, {:poison, "~> 3.1", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"}, diff --git a/test/exlasticsearch/repo_test.exs b/test/exlasticsearch/repo_test.exs new file mode 100644 index 0000000..b9270ee --- /dev/null +++ b/test/exlasticsearch/repo_test.exs @@ -0,0 +1,38 @@ +defmodule ExlasticSearch.RepoTest do + use ExUnit.Case, async: true + alias ExlasticSearch.{ + Repo, + TestModel + } + + setup_all do + Repo.create_index(TestModel) + Repo.create_mapping(TestModel) + :ok + end + + describe "#index" do + test "It will index an element in es" do + model = %ExlasticSearch.TestModel{id: Ecto.UUID.generate()} + {:ok, _} = Repo.index(model) + + assert exists?(model) + end + end + + describe "#bulk" do + test "It will bulk index/delete from es" do + model = %ExlasticSearch.TestModel{id: Ecto.UUID.generate()} + {:ok, _} = Repo.bulk([{:index, model}]) + + assert exists?(model) + end + end + + defp exists?(model) do + case Repo.get(model) do + {:ok, %{found: true}} -> true + _ -> false + end + end +end \ No newline at end of file diff --git a/test/support/test_model.ex b/test/support/test_model.ex index f4d72d2..58a247c 100644 --- a/test/support/test_model.ex +++ b/test/support/test_model.ex @@ -14,4 +14,16 @@ defmodule ExlasticSearch.TestModel do mapping :user, properties: %{ext_name: %{type: :text}} end +end + +defimpl ExlasticSearch.Indexable, for: ExlasticSearch.TestModel do + def id(%{id: id}), do: id + + def document(struct) do + struct + |> Map.from_struct() + |> Map.take(@for.__mappings__()) + end + + def preload(struct), do: struct end \ No newline at end of file