Skip to content

Commit

Permalink
Move heartbeat to separate genserver
Browse files Browse the repository at this point in the history
  • Loading branch information
TondaHack committed Jan 24, 2018
1 parent 5df7dcf commit 3aa19a9
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 36 deletions.
2 changes: 1 addition & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ config :logger, :console,
config :exq,
name: Exq,
host: System.get_env("REDIS_HOST") || "127.0.0.1",
port: System.get_env("REDIS_PORT") || 6555,
port: System.get_env("REDIS_PORT") || 6379,
url: nil,
namespace: "test",
queues: ["default"],
Expand Down
61 changes: 61 additions & 0 deletions lib/exq/manager/heartbeat_server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
defmodule HeartbeatServer do
use GenServer
alias Exq.Redis.JobQueue
alias Exq.Redis.Connection
alias Exq.Support.Time

def start_link(master_state) do
GenServer.start_link(__MODULE__, master_state)
end

def init(master_state) do
worker_init = [ ["DEL", "#{redis_worker_name(master_state)}:workers"] ]
++ getRedisCommands(master_state)
Connection.qp!(master_state.redis, worker_init)

schedule_work()

{:ok, master_state}
end

def handle_info(:heartbeat, master_state) do
schedule_work()

current_state = GenServer.call(Map.get(master_state, :pid), :get_state)
Connection.qp!(current_state.redis, getRedisCommands(current_state))

{:noreply, current_state}
end

defp schedule_work() do
Process.send_after(self(), :heartbeat, 1000)
end

defp redis_worker_name(state) do
JobQueue.full_key(state.namespace, "#{state.node_id}:elixir")
end

defp getRedisCommands(state) do
name = redis_worker_name(state)
[
["SADD", JobQueue.full_key(state.namespace, "processes"), name],
["HSET", name, "quiet", "false"],
["HSET", name, "info", Poison.encode!(%{ hostname: state.node_id, started_at: state.started_at, pid: "#{:erlang.pid_to_list(state.pid)}", concurrency: cocurency_count(state), queues: state.queues})],
["HSET", name, "beat", Time.unix_seconds],
["EXPIRE", name, (state.poll_timeout / 1000 + 5)], # expire information about live worker in poll_interval + 5s
]
end


defp cocurency_count(state) do
Enum.map(state.queues, fn(q) ->
[{_, concurrency, _}] = :ets.lookup(state.work_table, q)
cond do
concurrency == :infinite -> 1000000
true -> concurrency
end
end)
|> Enum.sum
end

end
39 changes: 4 additions & 35 deletions lib/exq/manager/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ defmodule Exq.Manager.Server do
alias Exq.Support.Config
alias Exq.Support.Time
alias Exq.Redis.JobQueue
alias Exq.Redis.Connection

@backoff_mult 10

Expand Down Expand Up @@ -167,29 +166,12 @@ defmodule Exq.Manager.Server do

check_redis_connection(opts)

name = redis_worker_name(state)
worker_init = [
["DEL", "#{name}:workers"], # remove old working processes
["SADD", JobQueue.full_key(state.namespace, "processes"), name],
["HSET", name, "quiet", "false"],
["HSET", name, "info", Poison.encode!(%{ hostname: state.node_id, started_at: state.started_at, pid: "#{:erlang.pid_to_list(state.pid)}", concurrency: cocurency_count(state), queues: state.queues})],
["HSET", name, "beat", Time.unix_seconds],
["EXPIRE", name, (state.poll_timeout / 1000 + 5)],
]
Connection.qp!(state.redis, worker_init)

HeartbeatServer.start_link(state)
{:ok, state, 0}
end

def cocurency_count(state) do
Enum.map(state.queues, fn(q) ->
[{_, concurrency, _}] = :ets.lookup(state.work_table, q)
cond do
concurrency == :infinite -> 1000000
true -> concurrency
end
end)
|> Enum.sum
def handle_call(:get_state, _from, state) do
{:reply, state, state}
end

def handle_call({:enqueue, queue, worker, args, options}, from, state) do
Expand Down Expand Up @@ -266,9 +248,7 @@ defmodule Exq.Manager.Server do
## Internal Functions
##===========================================================

defp redis_worker_name(state) do
JobQueue.full_key(state.namespace, "#{state.node_id}:elixir")
end


@doc """
Dequeue jobs and dispatch to workers
Expand All @@ -281,17 +261,6 @@ defmodule Exq.Manager.Server do

job_results = jobs |> Enum.map(fn(potential_job) -> dispatch_job(state, potential_job) end)

# Update worker info in redis that it is alive
name = redis_worker_name(state)
worker_init = [
["SADD", JobQueue.full_key(state.namespace, "processes"), name],
["HSET", name, "quiet", "false"],
["HSET", name, "info", Poison.encode!(%{ hostname: state.node_id, started_at: state.started_at, pid: "#{:erlang.pid_to_list(state.pid)}", concurrency: cocurency_count(state), queues: state.queues})],
["HSET", name, "beat", Time.unix_seconds],
["EXPIRE", name, (state.poll_timeout / 1000 + 5)], # expire information about live worker in poll_interval + 5s
]
Connection.qp!(state.redis, worker_init)

cond do
Enum.any?(job_results, fn(status) -> elem(status, 1) == :dispatch end) ->
{state, 0}
Expand Down

0 comments on commit 3aa19a9

Please sign in to comment.