From 28c7c5ba83bf6994636e3250df3dcefb555fdc93 Mon Sep 17 00:00:00 2001 From: Zack Siri Date: Mon, 4 Nov 2024 23:22:49 +0700 Subject: [PATCH] Clean up pipeline supervisor test --- config/test.exs | 3 + lib/uplink/application.ex | 11 +++- test/support/assert_async.ex | 79 +++++++++++++++++++++++++++ test/support/time_helper.ex | 15 ----- test/uplink/metrics/pipeline_test.exs | 6 +- 5 files changed, 95 insertions(+), 19 deletions(-) create mode 100644 test/support/assert_async.ex delete mode 100644 test/support/time_helper.ex diff --git a/config/test.exs b/config/test.exs index 9fd9fcc..f705649 100644 --- a/config/test.exs +++ b/config/test.exs @@ -8,6 +8,8 @@ config :uplink, Uplink.Metrics.Pipeline, producer_module: Broadway.DummyProducer, producer_options: [] +config :uplink, Uplink.PipelineSupervisor, sync_interval: 100 + config :uplink, Uplink.Repo, username: System.get_env("UPLINK_DB_USERNAME") || System.get_env("POSTGRES_USERNAME"), @@ -44,4 +46,5 @@ config :uplink, :drivers, aws_s3: Uplink.Drivers.Bucket.AwsMock # config :plug, :validate_header_keys_during_test, false # Print only warnings and errors during test +# Disable logging in tests config :logger, level: :warn diff --git a/lib/uplink/application.ex b/lib/uplink/application.ex index dac725e..4e9da69 100644 --- a/lib/uplink/application.ex +++ b/lib/uplink/application.ex @@ -5,8 +5,17 @@ defmodule Uplink.Application do alias Uplink.Web + @pipeline_supervisor Uplink.PipelineSupervisor + def start(_type, _args) do %{key: key, cert: cert} = Web.Certificate.generate() + + pipeline_supervisor_config = + Application.get_env(:uplink, @pipeline_supervisor, []) + + sync_interval = + Keyword.get(pipeline_supervisor_config, :sync_interval, 5_000) + router_config = Application.get_env(:uplink, Uplink.Router, port: 4040) internal_router_config = @@ -23,7 +32,7 @@ defmodule Uplink.Application do {Task.Supervisor, name: Uplink.TaskSupervisor}, {Plug.Cowboy, plug: Uplink.Internal, scheme: :http, port: internal_port}, {Pogo.DynamicSupervisor, - [name: Uplink.PipelineSupervisor, scope: :uplink]}, + name: @pipeline_supervisor, scope: :uplink, sync_interval: sync_interval}, {Uplink.Monitors, []}, { Plug.Cowboy, diff --git a/test/support/assert_async.ex b/test/support/assert_async.ex new file mode 100644 index 0000000..93054a5 --- /dev/null +++ b/test/support/assert_async.ex @@ -0,0 +1,79 @@ +defmodule AssertAsync do + @moduledoc """ + Helper macro for making assertions on async actions. This is particularly + useful for testing GenServers and other processes that may be synchronously + processing messages. The macro will retry an assertion until it passes + or times out. + + ## Example + + defmodule Foo do + use GenServer + + def init(opts) do + {:ok, state, {:continue, :sleep}} + end + + def handle_continue(:sleep, state) do + Process.sleep(2_000) + {:noreply, state} + end + + def handle_call(:bar, _, state) do + Map.get(state, :bar) + end + end + + iex> import AssertAsync + iex {:ok, pid} = GenServer.start_link(Foo, %{bar: 42}) + iex> assert_async do + ...> assert GenServer.call(pid, :bar) == 42 + ...> end + + ## Configuration + + * `sleep` - Time in milliseconds to wait before next retry. Defaults to `200`. + * `max_tries` - Number of attempts to make before flunking assertion. Defaults to `10`. + * `debug` - Boolean for producing `DEBUG` messages on failing iterations. Defaults `false`. + """ + + defmodule Impl do + @moduledoc false + require Logger + + @defaults %{ + sleep: 200, + max_tries: 10, + debug: false + } + + def assert(function, opts) do + state = Map.merge(@defaults, Map.new(opts)) + do_assert(function, state) + end + + defp do_assert(function, %{max_tries: 1}) do + function.() + end + + defp do_assert(function, %{max_tries: max_tries} = opts) do + function.() + rescue + e in ExUnit.AssertionError -> + if opts.debug do + Logger.debug(fn -> + "AssertAsync(remaining #{max_tries - 1}): #{ExUnit.AssertionError.message(e)}" + end) + end + + Process.sleep(opts.sleep) + do_assert(function, %{opts | max_tries: max_tries - 1}) + end + end + + defmacro assert_async(opts \\ [], do: do_block) do + quote do + AssertAsync.Impl.assert(fn -> unquote(do_block) end, unquote(opts)) + end + end +end diff --git a/test/support/time_helper.ex b/test/support/time_helper.ex deleted file mode 100644 index 7238481..0000000 --- a/test/support/time_helper.ex +++ /dev/null @@ -1,15 +0,0 @@ -defmodule TimeHelper do - def wait_until(fun), do: wait_until(500, fun) - - def wait_until(0, fun), do: fun.() - - def wait_until(timeout, fun) do - try do - fun.() - rescue - ExUnit.AssertionError -> - :timer.sleep(10) - wait_until(max(0, timeout - 10), fun) - end - end -end diff --git a/test/uplink/metrics/pipeline_test.exs b/test/uplink/metrics/pipeline_test.exs index 60c695d..1976443 100644 --- a/test/uplink/metrics/pipeline_test.exs +++ b/test/uplink/metrics/pipeline_test.exs @@ -2,7 +2,7 @@ defmodule Uplink.Metrics.PipelineTest do use ExUnit.Case import Uplink.Scenarios.Pipeline - import TimeHelper + import AssertAsync alias Uplink.Cache alias Uplink.Pipelines @@ -14,9 +14,9 @@ defmodule Uplink.Metrics.PipelineTest do Pipelines.start(Uplink.Metrics.Pipeline) - wait_until(5_000, fn -> + assert_async do assert Uplink.Pipelines.list() != [] - end) + end :ok end