Skip to content

Commit

Permalink
Merge pull request akira#296 from ananthakumaran/start_link_configurable
Browse files Browse the repository at this point in the history
make redis module name and start_link args configurable
  • Loading branch information
akira authored Jan 16, 2018
2 parents 1366c41 + 29aa3c9 commit 9673cb5
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 21 deletions.
8 changes: 4 additions & 4 deletions lib/exq/redis/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Exq.Redis.Connection do
require Logger

alias Exq.Support.Config
import Exq.Support.Opts, only: [redis_worker_module: 0]

def flushdb!(redis) do
{:ok, res} = q(redis, ["flushdb"])
Expand Down Expand Up @@ -155,15 +156,14 @@ defmodule Exq.Redis.Connection do
end

def q(redis, command) do
Redix.command(redis, command, [timeout: Config.get(:redis_timeout)])
redis_worker_module().command(redis, command, [timeout: Config.get(:redis_timeout)])
end

def qp(redis, command) do
Redix.pipeline(redis, command, [timeout: Config.get(:redis_timeout)])
redis_worker_module().pipeline(redis, command, [timeout: Config.get(:redis_timeout)])
end

def qp!(redis, command) do
Redix.pipeline!(redis, command, [timeout: Config.get(:redis_timeout)])
redis_worker_module().pipeline!(redis, command, [timeout: Config.get(:redis_timeout)])
end

end
7 changes: 3 additions & 4 deletions lib/exq/support/mode.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ defmodule Exq.Support.Mode do
Returns child list for the main Exq supervisor
"""

import Exq.Support.Opts, only: [conform_opts: 1]
import Exq.Support.Opts, only: [redis_worker_opts: 1]
import Supervisor.Spec

def children(opts) do
{redis_opts, connection_opts, opts} = conform_opts(opts)

{module, args, opts} = redis_worker_opts(opts)
# make sure redis always first(start in order)
children = [worker(Redix, [redis_opts, connection_opts])]
children = [worker(module, args)]
children = children ++ children(opts[:mode], opts)
children
end
Expand Down
23 changes: 19 additions & 4 deletions lib/exq/support/opts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ defmodule Exq.Support.Opts do
"#{name}.Sup" |> String.to_atom
end

@doc """
Return {redis_options, redis_connection_opts, gen_server_opts}
"""
def conform_opts(opts \\ []) do
defp conform_opts(opts) do
mode = opts[:mode] || Config.get(:mode)
redis = redis_client_name(opts[:name])
opts = [{:redis, redis}|opts]
Expand Down Expand Up @@ -42,6 +39,24 @@ defmodule Exq.Support.Opts do
end
end

@doc """
Return {redis_module, redis_args, gen_server_opts}
"""
def redis_worker_opts(opts) do
{redis_opts, connection_opts, opts} = conform_opts(opts)
case Config.get(:redis_worker) do
{module, args} -> {module, args, opts}
_ -> {Redix, [redis_opts, connection_opts], opts}
end
end

def redis_worker_module() do
case Config.get(:redis_worker) do
{module, _args} -> module
_ -> Redix
end
end

def connection_opts(opts \\ []) do
reconnect_on_sleep = opts[:reconnect_on_sleep] || Config.get(:reconnect_on_sleep)
timeout = opts[:redis_timeout] || Config.get(:redis_timeout)
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ defmodule Exq.Mixfile do
{ :redix, ">= 0.5.0"},
{ :poison, ">= 1.2.0 or ~> 2.0"},
{ :excoveralls, "~> 0.6", only: :test },
{ :redix_sentinel, "~> 0.5.0", only: :test },
{ :flaky_connection, git: "https://github.com/hamiltop/flaky_connection.git", only: :test},

# docs
Expand Down
9 changes: 6 additions & 3 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
%{"certifi": {:hex, :certifi, "0.7.0", "861a57f3808f7eb0c2d1802afeaae0fa5de813b0df0979153cbafcd853ababaf", [:rebar3], []},
%{
"certifi": {:hex, :certifi, "0.7.0", "861a57f3808f7eb0c2d1802afeaae0fa5de813b0df0979153cbafcd853ababaf", [:rebar3], []},
"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], []},
"earmark": {:hex, :earmark, "1.2.2", "f718159d6b65068e8daeef709ccddae5f7fdc770707d82e7d126f584cd925b74", [:mix], []},
"ex_doc": {:hex, :ex_doc, "0.15.1", "d5f9d588fd802152516fccfdb96d6073753f77314fcfee892b15b6724ca0d596", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, optional: false]}]},
Expand All @@ -13,7 +14,9 @@
"mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], []},
"poison": {:hex, :poison, "2.1.0", "f583218ced822675e484648fa26c933d621373f01c6c76bd00005d7bd4b82e27", [:mix], []},
"ranch": {:hex, :ranch, "1.1.0", "f7ed6d97db8c2a27cca85cacbd543558001fc5a355e93a7bff1e9a9065a8545b", [:make], []},
"redix": {:hex, :redix, "0.5.0", "46b81bdad24d4a330cd9f9eb26d6da6c40c8e5d1ce0c1607a82b4739f8f678c4", [:mix], [{:connection, "~> 1.0", [hex: :connection, optional: false]}]},
"redix": {:hex, :redix, "0.6.1", "20986b0e02f02b13e6f53c79a1ae70aa83147488c408f40275ec261f5bb0a6d0", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm"},
"redix_sentinel": {:hex, :redix_sentinel, "0.5.0", "cd15dc6ff0b676c974bb2694aeb57bc4c4dfc3c5335900fd7b9cee2b1208ab64", [:mix], [{:connection, "~> 1.0.3", [hex: :connection, repo: "hexpm", optional: false]}, {:redix, "~> 0.6", [hex: :redix, repo: "hexpm", optional: false]}], "hexpm"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], []},
"ssl_verify_hostname": {:hex, :ssl_verify_hostname, "1.0.5", "2e73e068cd6393526f9fa6d399353d7c9477d6886ba005f323b592d389fb47be", [:make], []},
"uuid": {:hex, :uuid, "1.0.1", "ebb032f2b429761540ca3e67436d1eb140206f139ddb7e1f2cc24b77cb4af45b", [:mix], []}}
"uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm"},
}
19 changes: 14 additions & 5 deletions test/config_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Exq.ConfigTest do
use ExUnit.Case
require Mix.Config
import ExqTestUtil

setup_all do
ExqTestUtil.reset_config
Expand Down Expand Up @@ -107,7 +108,7 @@ defmodule Exq.ConfigTest do
assert client_name == nil
end

test "default conform_opts" do
test "default redis_worker_opts" do
Mix.Config.persist([
exq: [
queues: ["default"],
Expand All @@ -120,7 +121,7 @@ defmodule Exq.ConfigTest do
shutdown_timeout: 7000,
]
])
{_redis_opts, _connection_opts, server_opts} = Exq.Support.Opts.conform_opts([mode: :default])
{Redix, [_redis_opts, _connection_opts], server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :default])
[scheduler_enable: scheduler_enable, namespace: namespace, scheduler_poll_timeout: scheduler_poll_timeout,
workers_sup: workers_sup, poll_timeout: poll_timeout, enqueuer: enqueuer, metadata: metadata, stats: stats,
name: name, scheduler: scheduler, queues: queues, redis: redis, concurrency: concurrency, middleware: middleware,
Expand All @@ -145,18 +146,26 @@ defmodule Exq.ConfigTest do
assert mode == :default

Mix.Config.persist([exq: [queues: [{"default", 1000}, {"test1", 2000}]]])
{_redis_opts, _connection_opts, server_opts} = Exq.Support.Opts.conform_opts([mode: :default])
{Redix, [_redis_opts, _connection_opts], server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :default])
assert server_opts[:queues] == ["default", "test1"]
assert server_opts[:concurrency] == [{"default", 1000, 0}, {"test1", 2000, 0}]
end

test "api conform_opts" do
test "api redis_worker_opts" do
Mix.Config.persist([exq: []])
{_redis_opts, _connection_opts, server_opts} = Exq.Support.Opts.conform_opts([mode: :api])
{Redix, [_redis_opts, _connection_opts], server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :api])
[name: name, namespace: namespace, redis: redis, mode: mode] = server_opts
assert namespace == "test"
assert name == nil
assert redis == Exq.Redis.Client
assert mode == :api
end

test "custom redis module" do
with_application_env(:exq, :redis_worker, {RedisWorker, [1, 2]}, fn ->
{module, args, server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :default])
assert module == RedisWorker
assert args == [1, 2]
end)
end
end
15 changes: 15 additions & 0 deletions test/exq_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ defmodule ExqTest do
stop_process(sup)
end

test "enqueue and run job via redis sentinel" do
sentinel_args = [
[role: "master", group: "exq", sentinels: [[host: "127.0.0.1", port: 6666]]],
[database: 0, password: nil],
[backoff: 100, timeout: 5000, name: Exq.Redis.Client, socket_opts: []]
]
with_application_env(:exq, :redis_worker, {RedixSentinel, sentinel_args}, fn ->
Process.register(self(), :exqtest)
{:ok, sup} = Exq.start_link
{:ok, _} = Exq.enqueue(Exq, "default", ExqTest.PerformWorker, [])
assert_receive {:worked}
stop_process(sup)
end)
end

test "run jobs from backup queue on boot" do
host = elem(:inet.gethostname(), 1)
Process.register(self(), :exqtest)
Expand Down
6 changes: 6 additions & 0 deletions test/test-sentinel.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
port 6666
daemonize yes
logfile stdout
pidfile /tmp/resquex-redis-sentinel.pid

sentinel monitor exq 127.0.0.1 6555 1
14 changes: 13 additions & 1 deletion test/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ defmodule ExqTestUtil do
config = Mix.Config.read!(Path.join([Path.dirname(__DIR__), "config", "config.exs"]))
Mix.Config.persist(config)
end

def with_application_env(app, key, new, context) do
old = Application.get_env(app, key)
Application.put_env(app, key, new)
try do
context.()
after
Application.put_env(app, key, old)
end
end
end

defmodule TestRedis do
Expand All @@ -96,13 +106,15 @@ defmodule TestRedis do
def start do
unless Config.get(:test_with_local_redis) == false do
[] = :os.cmd('redis-server test/test-redis.conf')
:timer.sleep(100)
[] = :os.cmd('redis-server test/test-sentinel.conf --sentinel')
:timer.sleep(500)
end
end

def stop do
unless Config.get(:test_with_local_redis) == false do
[] = :os.cmd('redis-cli -p 6555 shutdown')
[] = :os.cmd('redis-cli -p 6666 shutdown')
end
end

Expand Down

0 comments on commit 9673cb5

Please sign in to comment.