diff --git a/config/test.exs b/config/test.exs index 64b203ff..2f41d2d8 100644 --- a/config/test.exs +++ b/config/test.exs @@ -6,7 +6,7 @@ config :logger, :console, config :exq, name: Exq, host: System.get_env("REDIS_HOST") || "127.0.0.1", - port: System.get_env("REDIS_PORT") || 6555, + port: System.get_env("REDIS_PORT") || 6379, url: nil, namespace: "test", queues: ["default"], diff --git a/lib/exq/manager/heartbeat_server.ex b/lib/exq/manager/heartbeat_server.ex new file mode 100644 index 00000000..9cc28963 --- /dev/null +++ b/lib/exq/manager/heartbeat_server.ex @@ -0,0 +1,61 @@ +defmodule HeartbeatServer do + use GenServer + alias Exq.Redis.JobQueue + alias Exq.Redis.Connection + alias Exq.Support.Time + + def start_link(master_state) do + GenServer.start_link(__MODULE__, master_state) + end + + def init(master_state) do + worker_init = [ ["DEL", "#{redis_worker_name(master_state)}:workers"] ] + ++ getRedisCommands(master_state) + Connection.qp!(master_state.redis, worker_init) + + schedule_work() + + {:ok, master_state} + end + + def handle_info(:heartbeat, master_state) do + schedule_work() + + current_state = GenServer.call(Map.get(master_state, :pid), :get_state) + Connection.qp!(current_state.redis, getRedisCommands(current_state)) + + {:noreply, current_state} + end + + defp schedule_work() do + Process.send_after(self(), :heartbeat, 1000) + end + + defp redis_worker_name(state) do + JobQueue.full_key(state.namespace, "#{state.node_id}:elixir") + end + + defp getRedisCommands(state) do + name = redis_worker_name(state) + [ + ["SADD", JobQueue.full_key(state.namespace, "processes"), name], + ["HSET", name, "quiet", "false"], + ["HSET", name, "info", Poison.encode!(%{ hostname: state.node_id, started_at: state.started_at, pid: "#{:erlang.pid_to_list(state.pid)}", concurrency: cocurency_count(state), queues: state.queues})], + ["HSET", name, "beat", Time.unix_seconds], + ["EXPIRE", name, (state.poll_timeout / 1000 + 5)], # expire information about live worker in poll_interval + 5s + ] + end + + + defp cocurency_count(state) do + Enum.map(state.queues, fn(q) -> + [{_, concurrency, _}] = :ets.lookup(state.work_table, q) + cond do + concurrency == :infinite -> 1000000 + true -> concurrency + end + end) + |> Enum.sum + end + +end diff --git a/lib/exq/manager/server.ex b/lib/exq/manager/server.ex index 18c70b87..a4c7a0a0 100644 --- a/lib/exq/manager/server.ex +++ b/lib/exq/manager/server.ex @@ -115,7 +115,6 @@ defmodule Exq.Manager.Server do alias Exq.Support.Config alias Exq.Support.Time alias Exq.Redis.JobQueue - alias Exq.Redis.Connection @backoff_mult 10 @@ -167,29 +166,12 @@ defmodule Exq.Manager.Server do check_redis_connection(opts) - name = redis_worker_name(state) - worker_init = [ - ["DEL", "#{name}:workers"], # remove old working processes - ["SADD", JobQueue.full_key(state.namespace, "processes"), name], - ["HSET", name, "quiet", "false"], - ["HSET", name, "info", Poison.encode!(%{ hostname: state.node_id, started_at: state.started_at, pid: "#{:erlang.pid_to_list(state.pid)}", concurrency: cocurency_count(state), queues: state.queues})], - ["HSET", name, "beat", Time.unix_seconds], - ["EXPIRE", name, (state.poll_timeout / 1000 + 5)], - ] - Connection.qp!(state.redis, worker_init) - + HeartbeatServer.start_link(state) {:ok, state, 0} end - def cocurency_count(state) do - Enum.map(state.queues, fn(q) -> - [{_, concurrency, _}] = :ets.lookup(state.work_table, q) - cond do - concurrency == :infinite -> 1000000 - true -> concurrency - end - end) - |> Enum.sum + def handle_call(:get_state, _from, state) do + {:reply, state, state} end def handle_call({:enqueue, queue, worker, args, options}, from, state) do @@ -266,9 +248,7 @@ defmodule Exq.Manager.Server do ## Internal Functions ##=========================================================== - defp redis_worker_name(state) do - JobQueue.full_key(state.namespace, "#{state.node_id}:elixir") - end + @doc """ Dequeue jobs and dispatch to workers @@ -281,17 +261,6 @@ defmodule Exq.Manager.Server do job_results = jobs |> Enum.map(fn(potential_job) -> dispatch_job(state, potential_job) end) - # Update worker info in redis that it is alive - name = redis_worker_name(state) - worker_init = [ - ["SADD", JobQueue.full_key(state.namespace, "processes"), name], - ["HSET", name, "quiet", "false"], - ["HSET", name, "info", Poison.encode!(%{ hostname: state.node_id, started_at: state.started_at, pid: "#{:erlang.pid_to_list(state.pid)}", concurrency: cocurency_count(state), queues: state.queues})], - ["HSET", name, "beat", Time.unix_seconds], - ["EXPIRE", name, (state.poll_timeout / 1000 + 5)], # expire information about live worker in poll_interval + 5s - ] - Connection.qp!(state.redis, worker_init) - cond do Enum.any?(job_results, fn(status) -> elem(status, 1) == :dispatch end) -> {state, 0}