diff --git a/lib/exq/manager/heartbeat_server.ex b/lib/exq/manager/heartbeat_server.ex index b2c44b46..a3ada43a 100644 --- a/lib/exq/manager/heartbeat_server.ex +++ b/lib/exq/manager/heartbeat_server.ex @@ -1,9 +1,7 @@ defmodule Exq.Heartbeat.Server do use GenServer - alias Exq.Redis.JobQueue alias Exq.Redis.Connection alias Exq.Support.Config - alias Exq.Support.Time alias Exq.Redis.JobStat defmodule State do @@ -23,33 +21,32 @@ defmodule Exq.Heartbeat.Server do queues: opts[:queues], poll_timeout: opts[:poll_timeout] } - schedule_work(state) + schedule_work(state, true) {:ok, state} end - def handle_cast({:heartbeat, master_state}, state) do + def handle_cast({:heartbeat, master_state, status}, state) do schedule_work(state) - current_state = struct(State, Map.from_struct(master_state)) + master_data = Map.from_struct(master_state) + current_state = %{struct(State, master_data) | name: state.name} + init_data = if status, do: [["DEL", "#{current_state.name}:workers"]], else: [] + data = init_data ++ JobStat.get_redis_commands( + current_state.namespace, + current_state.node_id, + current_state.started_at, + current_state.pid, + current_state.queues, + current_state.work_table, + current_state.poll_timeout + ) Connection.qp!( current_state.redis, - JobStat.get_redis_commands( - current_state.namespace, - current_state.node_id, - current_state.started_at, - current_state.pid, - current_state.queues, - current_state.work_table, - current_state.poll_timeout - ) - ) + data + ) {:noreply, current_state} end - defp schedule_work(state) do - Process.send_after(state.name, {:get_state, self()}, 1000) - end - - defp redis_worker_name(state) do - JobQueue.full_key(state.namespace, "#{state.node_id}:elixir") + defp schedule_work(state, status \\ false) do + Process.send_after(state.name, {:get_state, self(), status}, 1000) end end diff --git a/lib/exq/manager/server.ex b/lib/exq/manager/server.ex index 8c2de42b..15c05baa 100644 --- a/lib/exq/manager/server.ex +++ b/lib/exq/manager/server.ex @@ -170,11 +170,6 @@ defmodule Exq.Manager.Server do {:ok, state, 0} end - def handle_info({:get_state, pid}, state) do - GenServer.cast(pid, {:heartbeat, state}) - {:noreply, state} - end - def handle_call({:enqueue, queue, worker, args, options}, from, state) do Enqueuer.enqueue(state.enqueuer, from, queue, worker, args, options) {:noreply, state, 10} @@ -227,6 +222,11 @@ defmodule Exq.Manager.Server do {:noreply, updated_state, timeout} end + def handle_info({:get_state, pid, status}, state) do + GenServer.cast(pid, {:heartbeat, state, status}) + {:noreply, state} + end + def handle_info(_info, state) do {:noreply, state, state.poll_timeout} end diff --git a/lib/exq/redis/job_stat.ex b/lib/exq/redis/job_stat.ex index 9f125a7e..81174143 100644 --- a/lib/exq/redis/job_stat.ex +++ b/lib/exq/redis/job_stat.ex @@ -190,7 +190,6 @@ defmodule Exq.Redis.JobStat do def get_redis_commands(namespace, node_id, started_at, master_pid, queues, work_table, poll_timeout) do name = redis_worker_name(namespace, node_id) [ - ["DEL", "#{name}:workers"], ["SADD", JobQueue.full_key(namespace, "processes"), name], ["HSET", name, "quiet", "false"], ["HSET", name, "info", Poison.encode!(%{ hostname: node_id, started_at: started_at, pid: "#{:erlang.pid_to_list(master_pid)}", concurrency: cocurency_count(queues, work_table), queues: queues})],