From 29aa3c936c786f101e45d3a4ea2ec76ed3da521d Mon Sep 17 00:00:00 2001 From: Anantha Kumaran Date: Tue, 19 Dec 2017 15:33:19 +0700 Subject: [PATCH] make redis module name and start_link args configurable --- lib/exq/redis/connection.ex | 8 ++++---- lib/exq/support/mode.ex | 7 +++---- lib/exq/support/opts.ex | 23 +++++++++++++++++++---- mix.exs | 1 + mix.lock | 9 ++++++--- test/config_test.exs | 19 ++++++++++++++----- test/exq_test.exs | 15 +++++++++++++++ test/test-sentinel.conf | 6 ++++++ test/test_helper.exs | 14 +++++++++++++- 9 files changed, 81 insertions(+), 21 deletions(-) create mode 100644 test/test-sentinel.conf diff --git a/lib/exq/redis/connection.ex b/lib/exq/redis/connection.ex index 42667132..d3006b24 100644 --- a/lib/exq/redis/connection.ex +++ b/lib/exq/redis/connection.ex @@ -6,6 +6,7 @@ defmodule Exq.Redis.Connection do require Logger alias Exq.Support.Config + import Exq.Support.Opts, only: [redis_worker_module: 0] def flushdb!(redis) do {:ok, res} = q(redis, ["flushdb"]) @@ -155,15 +156,14 @@ defmodule Exq.Redis.Connection do end def q(redis, command) do - Redix.command(redis, command, [timeout: Config.get(:redis_timeout)]) + redis_worker_module().command(redis, command, [timeout: Config.get(:redis_timeout)]) end def qp(redis, command) do - Redix.pipeline(redis, command, [timeout: Config.get(:redis_timeout)]) + redis_worker_module().pipeline(redis, command, [timeout: Config.get(:redis_timeout)]) end def qp!(redis, command) do - Redix.pipeline!(redis, command, [timeout: Config.get(:redis_timeout)]) + redis_worker_module().pipeline!(redis, command, [timeout: Config.get(:redis_timeout)]) end - end diff --git a/lib/exq/support/mode.ex b/lib/exq/support/mode.ex index 4c0342f5..dccb9f26 100644 --- a/lib/exq/support/mode.ex +++ b/lib/exq/support/mode.ex @@ -11,14 +11,13 @@ defmodule Exq.Support.Mode do Returns child list for the main Exq supervisor """ - import Exq.Support.Opts, only: [conform_opts: 1] + import Exq.Support.Opts, only: [redis_worker_opts: 1] import Supervisor.Spec def children(opts) do - {redis_opts, connection_opts, opts} = conform_opts(opts) - + {module, args, opts} = redis_worker_opts(opts) # make sure redis always first(start in order) - children = [worker(Redix, [redis_opts, connection_opts])] + children = [worker(module, args)] children = children ++ children(opts[:mode], opts) children end diff --git a/lib/exq/support/opts.ex b/lib/exq/support/opts.ex index 4ea04229..7c030fd2 100644 --- a/lib/exq/support/opts.ex +++ b/lib/exq/support/opts.ex @@ -11,10 +11,7 @@ defmodule Exq.Support.Opts do "#{name}.Sup" |> String.to_atom end - @doc """ - Return {redis_options, redis_connection_opts, gen_server_opts} - """ - def conform_opts(opts \\ []) do + defp conform_opts(opts) do mode = opts[:mode] || Config.get(:mode) redis = redis_client_name(opts[:name]) opts = [{:redis, redis}|opts] @@ -42,6 +39,24 @@ defmodule Exq.Support.Opts do end end + @doc """ + Return {redis_module, redis_args, gen_server_opts} + """ + def redis_worker_opts(opts) do + {redis_opts, connection_opts, opts} = conform_opts(opts) + case Config.get(:redis_worker) do + {module, args} -> {module, args, opts} + _ -> {Redix, [redis_opts, connection_opts], opts} + end + end + + def redis_worker_module() do + case Config.get(:redis_worker) do + {module, _args} -> module + _ -> Redix + end + end + def connection_opts(opts \\ []) do reconnect_on_sleep = opts[:reconnect_on_sleep] || Config.get(:reconnect_on_sleep) timeout = opts[:redis_timeout] || Config.get(:redis_timeout) diff --git a/mix.exs b/mix.exs index 401497ba..8d6997f1 100644 --- a/mix.exs +++ b/mix.exs @@ -38,6 +38,7 @@ defmodule Exq.Mixfile do { :redix, ">= 0.5.0"}, { :poison, ">= 1.2.0 or ~> 2.0"}, { :excoveralls, "~> 0.6", only: :test }, + { :redix_sentinel, "~> 0.5.0", only: :test }, { :flaky_connection, git: "https://github.com/hamiltop/flaky_connection.git", only: :test}, # docs diff --git a/mix.lock b/mix.lock index 8d7efa58..bd492b4b 100644 --- a/mix.lock +++ b/mix.lock @@ -1,4 +1,5 @@ -%{"certifi": {:hex, :certifi, "0.7.0", "861a57f3808f7eb0c2d1802afeaae0fa5de813b0df0979153cbafcd853ababaf", [:rebar3], []}, +%{ + "certifi": {:hex, :certifi, "0.7.0", "861a57f3808f7eb0c2d1802afeaae0fa5de813b0df0979153cbafcd853ababaf", [:rebar3], []}, "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], []}, "earmark": {:hex, :earmark, "1.2.2", "f718159d6b65068e8daeef709ccddae5f7fdc770707d82e7d126f584cd925b74", [:mix], []}, "ex_doc": {:hex, :ex_doc, "0.15.1", "d5f9d588fd802152516fccfdb96d6073753f77314fcfee892b15b6724ca0d596", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, optional: false]}]}, @@ -13,7 +14,9 @@ "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], []}, "poison": {:hex, :poison, "2.1.0", "f583218ced822675e484648fa26c933d621373f01c6c76bd00005d7bd4b82e27", [:mix], []}, "ranch": {:hex, :ranch, "1.1.0", "f7ed6d97db8c2a27cca85cacbd543558001fc5a355e93a7bff1e9a9065a8545b", [:make], []}, - "redix": {:hex, :redix, "0.5.0", "46b81bdad24d4a330cd9f9eb26d6da6c40c8e5d1ce0c1607a82b4739f8f678c4", [:mix], [{:connection, "~> 1.0", [hex: :connection, optional: false]}]}, + "redix": {:hex, :redix, "0.6.1", "20986b0e02f02b13e6f53c79a1ae70aa83147488c408f40275ec261f5bb0a6d0", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm"}, + "redix_sentinel": {:hex, :redix_sentinel, "0.5.0", "cd15dc6ff0b676c974bb2694aeb57bc4c4dfc3c5335900fd7b9cee2b1208ab64", [:mix], [{:connection, "~> 1.0.3", [hex: :connection, repo: "hexpm", optional: false]}, {:redix, "~> 0.6", [hex: :redix, repo: "hexpm", optional: false]}], "hexpm"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], []}, "ssl_verify_hostname": {:hex, :ssl_verify_hostname, "1.0.5", "2e73e068cd6393526f9fa6d399353d7c9477d6886ba005f323b592d389fb47be", [:make], []}, - "uuid": {:hex, :uuid, "1.0.1", "ebb032f2b429761540ca3e67436d1eb140206f139ddb7e1f2cc24b77cb4af45b", [:mix], []}} + "uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm"}, +} diff --git a/test/config_test.exs b/test/config_test.exs index a67940c7..3fa7c8aa 100644 --- a/test/config_test.exs +++ b/test/config_test.exs @@ -1,6 +1,7 @@ defmodule Exq.ConfigTest do use ExUnit.Case require Mix.Config + import ExqTestUtil setup_all do ExqTestUtil.reset_config @@ -107,7 +108,7 @@ defmodule Exq.ConfigTest do assert client_name == nil end - test "default conform_opts" do + test "default redis_worker_opts" do Mix.Config.persist([ exq: [ queues: ["default"], @@ -120,7 +121,7 @@ defmodule Exq.ConfigTest do shutdown_timeout: 7000, ] ]) - {_redis_opts, _connection_opts, server_opts} = Exq.Support.Opts.conform_opts([mode: :default]) + {Redix, [_redis_opts, _connection_opts], server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :default]) [scheduler_enable: scheduler_enable, namespace: namespace, scheduler_poll_timeout: scheduler_poll_timeout, workers_sup: workers_sup, poll_timeout: poll_timeout, enqueuer: enqueuer, metadata: metadata, stats: stats, name: name, scheduler: scheduler, queues: queues, redis: redis, concurrency: concurrency, middleware: middleware, @@ -145,18 +146,26 @@ defmodule Exq.ConfigTest do assert mode == :default Mix.Config.persist([exq: [queues: [{"default", 1000}, {"test1", 2000}]]]) - {_redis_opts, _connection_opts, server_opts} = Exq.Support.Opts.conform_opts([mode: :default]) + {Redix, [_redis_opts, _connection_opts], server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :default]) assert server_opts[:queues] == ["default", "test1"] assert server_opts[:concurrency] == [{"default", 1000, 0}, {"test1", 2000, 0}] end - test "api conform_opts" do + test "api redis_worker_opts" do Mix.Config.persist([exq: []]) - {_redis_opts, _connection_opts, server_opts} = Exq.Support.Opts.conform_opts([mode: :api]) + {Redix, [_redis_opts, _connection_opts], server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :api]) [name: name, namespace: namespace, redis: redis, mode: mode] = server_opts assert namespace == "test" assert name == nil assert redis == Exq.Redis.Client assert mode == :api end + + test "custom redis module" do + with_application_env(:exq, :redis_worker, {RedisWorker, [1, 2]}, fn -> + {module, args, server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :default]) + assert module == RedisWorker + assert args == [1, 2] + end) + end end diff --git a/test/exq_test.exs b/test/exq_test.exs index 3799554a..d5713b8e 100644 --- a/test/exq_test.exs +++ b/test/exq_test.exs @@ -78,6 +78,21 @@ defmodule ExqTest do stop_process(sup) end + test "enqueue and run job via redis sentinel" do + sentinel_args = [ + [role: "master", group: "exq", sentinels: [[host: "127.0.0.1", port: 6666]]], + [database: 0, password: nil], + [backoff: 100, timeout: 5000, name: Exq.Redis.Client, socket_opts: []] + ] + with_application_env(:exq, :redis_worker, {RedixSentinel, sentinel_args}, fn -> + Process.register(self(), :exqtest) + {:ok, sup} = Exq.start_link + {:ok, _} = Exq.enqueue(Exq, "default", ExqTest.PerformWorker, []) + assert_receive {:worked} + stop_process(sup) + end) + end + test "run jobs from backup queue on boot" do host = elem(:inet.gethostname(), 1) Process.register(self(), :exqtest) diff --git a/test/test-sentinel.conf b/test/test-sentinel.conf new file mode 100644 index 00000000..250c50c5 --- /dev/null +++ b/test/test-sentinel.conf @@ -0,0 +1,6 @@ +port 6666 +daemonize yes +logfile stdout +pidfile /tmp/resquex-redis-sentinel.pid + +sentinel monitor exq 127.0.0.1 6555 1 diff --git a/test/test_helper.exs b/test/test_helper.exs index a6b1ac85..39650a9f 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -85,6 +85,16 @@ defmodule ExqTestUtil do config = Mix.Config.read!(Path.join([Path.dirname(__DIR__), "config", "config.exs"])) Mix.Config.persist(config) end + + def with_application_env(app, key, new, context) do + old = Application.get_env(app, key) + Application.put_env(app, key, new) + try do + context.() + after + Application.put_env(app, key, old) + end + end end defmodule TestRedis do @@ -96,13 +106,15 @@ defmodule TestRedis do def start do unless Config.get(:test_with_local_redis) == false do [] = :os.cmd('redis-server test/test-redis.conf') - :timer.sleep(100) + [] = :os.cmd('redis-server test/test-sentinel.conf --sentinel') + :timer.sleep(500) end end def stop do unless Config.get(:test_with_local_redis) == false do [] = :os.cmd('redis-cli -p 6555 shutdown') + [] = :os.cmd('redis-cli -p 6666 shutdown') end end