Skip to content

Commit

Permalink
Merge pull request #2 from PPCBee/fix-sidekiq5-heartbeat
Browse files Browse the repository at this point in the history
Hearbeat server fixes
  • Loading branch information
TondaHack authored Feb 15, 2018
2 parents e7c2058 + 2c43784 commit 34363ce
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 54 deletions.
87 changes: 39 additions & 48 deletions lib/exq/manager/heartbeat_server.ex
Original file line number Diff line number Diff line change
@@ -1,61 +1,52 @@
defmodule HeartbeatServer do
defmodule Exq.Heartbeat.Server do
use GenServer
alias Exq.Redis.JobQueue
alias Exq.Redis.Connection
alias Exq.Support.Time
alias Exq.Support.Config
alias Exq.Redis.JobStat

def start_link(master_state) do
GenServer.start_link(__MODULE__, master_state)
defmodule State do
defstruct name: nil, node_id: nil, namespace: nil, started_at: nil, pid: nil, queues: nil, poll_timeout: nil, work_table: nil, redis: nil
end

def init(master_state) do
worker_init = [ ["DEL", "#{redis_worker_name(master_state)}:workers"] ]
++ getRedisCommands(master_state)
Connection.qp!(master_state.redis, worker_init)

schedule_work()

{:ok, master_state}
end

def handle_info(:heartbeat, master_state) do
schedule_work()

current_state = GenServer.call(Map.get(master_state, :pid), :get_state)
Connection.qp!(current_state.redis, getRedisCommands(current_state))

{:noreply, current_state}
end

defp schedule_work() do
Process.send_after(self(), :heartbeat, 1000)
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end

defp redis_worker_name(state) do
JobQueue.full_key(state.namespace, "#{state.node_id}:elixir")
def init(opts) do
state = %State{
name: Exq.Manager.Server.server_name(opts[:name]),
redis: opts[:redis],
node_id: Config.node_identifier.node_id(),
namespace: opts[:namespace],
queues: opts[:queues],
poll_timeout: opts[:poll_timeout]
}
schedule_work(state, true)
{:ok, state}
end

defp getRedisCommands(state) do
name = redis_worker_name(state)
[
["SADD", JobQueue.full_key(state.namespace, "processes"), name],
["HSET", name, "quiet", "false"],
["HSET", name, "info", Poison.encode!(%{ hostname: state.node_id, started_at: state.started_at, pid: "#{:erlang.pid_to_list(state.pid)}", concurrency: cocurency_count(state), queues: state.queues})],
["HSET", name, "beat", Time.unix_seconds],
["EXPIRE", name, (state.poll_timeout / 1000 + 5)], # expire information about live worker in poll_interval + 5s
]
def handle_cast({:heartbeat, master_state, status}, state) do
schedule_work(state)
master_data = Map.from_struct(master_state)
current_state = %{struct(State, master_data) | name: state.name}
init_data = if status, do: [["DEL", "#{current_state.name}:workers"]], else: []
data = init_data ++ JobStat.get_redis_commands(
current_state.namespace,
current_state.node_id,
current_state.started_at,
current_state.pid,
current_state.queues,
current_state.work_table,
current_state.poll_timeout
)
Connection.qp!(
current_state.redis,
data
)
{:noreply, current_state}
end


defp cocurency_count(state) do
Enum.map(state.queues, fn(q) ->
[{_, concurrency, _}] = :ets.lookup(state.work_table, q)
cond do
concurrency == :infinite -> 1000000
true -> concurrency
end
end)
|> Enum.sum
defp schedule_work(state, status \\ false) do
Process.send_after(state.name, {:get_state, self(), status}, 1000)
end

end
12 changes: 6 additions & 6 deletions lib/exq/manager/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ defmodule Exq.Manager.Server do
pid: self(),
poll_timeout: opts[:poll_timeout],
scheduler_poll_timeout: opts[:scheduler_poll_timeout],
started_at: Time.unix_seconds,
started_at: Time.unix_seconds
}

check_redis_connection(opts)
Expand All @@ -167,14 +167,9 @@ defmodule Exq.Manager.Server do
Exq.Stats.Server.cleanup_host_stats(state.stats, state.namespace, state.node_id, state.pid)
end)

HeartbeatServer.start_link(state)
{:ok, state, 0}
end

def handle_call(:get_state, _from, state) do
{:reply, state, state}
end

def handle_call({:enqueue, queue, worker, args, options}, from, state) do
Enqueuer.enqueue(state.enqueuer, from, queue, worker, args, options)
{:noreply, state, 10}
Expand Down Expand Up @@ -227,6 +222,11 @@ defmodule Exq.Manager.Server do
{:noreply, updated_state, timeout}
end

def handle_info({:get_state, pid, status}, state) do
GenServer.cast(pid, {:heartbeat, state, status})
{:noreply, state}
end

def handle_info(_info, state) do
{:noreply, state, state.poll_timeout}
end
Expand Down
26 changes: 26 additions & 0 deletions lib/exq/redis/job_stat.ex
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,30 @@ defmodule Exq.Redis.JobStat do
val
end
end

def get_redis_commands(namespace, node_id, started_at, master_pid, queues, work_table, poll_timeout) do
name = redis_worker_name(namespace, node_id)
[
["SADD", JobQueue.full_key(namespace, "processes"), name],
["HSET", name, "quiet", "false"],
["HSET", name, "info", Poison.encode!(%{ hostname: node_id, started_at: started_at, pid: "#{:erlang.pid_to_list(master_pid)}", concurrency: cocurency_count(queues, work_table), queues: queues})],
["HSET", name, "beat", Time.unix_seconds],
["EXPIRE", name, (poll_timeout / 1000 + 5)]
]
end

defp redis_worker_name(namespace, node_id) do
JobQueue.full_key(namespace, "#{node_id}:elixir")
end

defp cocurency_count(queues, work_table) do
Enum.map(queues, fn(q) ->
[{_, concurrency, _}] = :ets.lookup(work_table, q)
cond do
concurrency == :infinite -> 1000000
true -> concurrency
end
end)
|> Enum.sum
end
end
1 change: 1 addition & 0 deletions lib/exq/support/mode.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ defmodule Exq.Support.Mode do
worker(Exq.Stats.Server, [opts]),
supervisor(Exq.Worker.Supervisor, [opts]),
worker(Exq.Manager.Server, [opts]),
worker(Exq.Heartbeat.Server, [opts]),
worker(Exq.WorkerDrainer.Server, [opts]),
worker(Exq.Enqueuer.Server, [opts]),
worker(Exq.Api.Server, [opts])
Expand Down

0 comments on commit 34363ce

Please sign in to comment.