Skip to content

Commit

Permalink
Update Manager and stats to correctly store information about process…
Browse files Browse the repository at this point in the history
…ing jobs
  • Loading branch information
ondrejbartas authored and TondaHack committed Jan 24, 2018
1 parent 55d8b74 commit 5df7dcf
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 40 deletions.
50 changes: 45 additions & 5 deletions lib/exq/manager/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,16 @@ defmodule Exq.Manager.Server do
use GenServer
alias Exq.Enqueuer
alias Exq.Support.Config
alias Exq.Support.Time
alias Exq.Redis.JobQueue
alias Exq.Redis.Connection

@backoff_mult 10

defmodule State do
defstruct redis: nil, stats: nil, enqueuer: nil, pid: nil, node_id: nil, namespace: nil, work_table: nil,
queues: nil, poll_timeout: nil, scheduler_poll_timeout: nil, workers_sup: nil,
middleware: nil, metadata: nil
middleware: nil, metadata: nil, started_at: nil
end

def start_link(opts\\[]) do
Expand All @@ -135,7 +137,6 @@ defmodule Exq.Manager.Server do
def server_name(nil), do: Config.get(:name)
def server_name(name), do: name


##===========================================================
## gen server callbacks
##===========================================================
Expand All @@ -160,13 +161,37 @@ defmodule Exq.Manager.Server do
queues: opts[:queues],
pid: self(),
poll_timeout: opts[:poll_timeout],
scheduler_poll_timeout: opts[:scheduler_poll_timeout]
scheduler_poll_timeout: opts[:scheduler_poll_timeout],
started_at: Time.unix_seconds,
}

check_redis_connection(opts)

name = redis_worker_name(state)
worker_init = [
["DEL", "#{name}:workers"], # remove old working processes
["SADD", JobQueue.full_key(state.namespace, "processes"), name],
["HSET", name, "quiet", "false"],
["HSET", name, "info", Poison.encode!(%{ hostname: state.node_id, started_at: state.started_at, pid: "#{:erlang.pid_to_list(state.pid)}", concurrency: cocurency_count(state), queues: state.queues})],
["HSET", name, "beat", Time.unix_seconds],
["EXPIRE", name, (state.poll_timeout / 1000 + 5)],
]
Connection.qp!(state.redis, worker_init)

{:ok, state, 0}
end

def cocurency_count(state) do
Enum.map(state.queues, fn(q) ->
[{_, concurrency, _}] = :ets.lookup(state.work_table, q)
cond do
concurrency == :infinite -> 1000000
true -> concurrency
end
end)
|> Enum.sum
end

def handle_call({:enqueue, queue, worker, args, options}, from, state) do
Enqueuer.enqueue(state.enqueuer, from, queue, worker, args, options)
{:noreply, state, 10}
Expand Down Expand Up @@ -214,7 +239,7 @@ defmodule Exq.Manager.Server do
"""
def handle_cast(:cleanup_host_stats, state) do
rescue_timeout(fn ->
Exq.Stats.Server.cleanup_host_stats(state.stats, state.namespace, state.node_id)
Exq.Stats.Server.cleanup_host_stats(state.stats, state.namespace, state.node_id, state.pid)
end)
{:noreply, state, 0}
end
Expand All @@ -240,6 +265,11 @@ defmodule Exq.Manager.Server do
##===========================================================
## Internal Functions
##===========================================================

defp redis_worker_name(state) do
JobQueue.full_key(state.namespace, "#{state.node_id}:elixir")
end

@doc """
Dequeue jobs and dispatch to workers
"""
Expand All @@ -251,6 +281,17 @@ defmodule Exq.Manager.Server do

job_results = jobs |> Enum.map(fn(potential_job) -> dispatch_job(state, potential_job) end)

# Update worker info in redis that it is alive
name = redis_worker_name(state)
worker_init = [
["SADD", JobQueue.full_key(state.namespace, "processes"), name],
["HSET", name, "quiet", "false"],
["HSET", name, "info", Poison.encode!(%{ hostname: state.node_id, started_at: state.started_at, pid: "#{:erlang.pid_to_list(state.pid)}", concurrency: cocurency_count(state), queues: state.queues})],
["HSET", name, "beat", Time.unix_seconds],
["EXPIRE", name, (state.poll_timeout / 1000 + 5)], # expire information about live worker in poll_interval + 5s
]
Connection.qp!(state.redis, worker_init)

cond do
Enum.any?(job_results, fn(status) -> elem(status, 1) == :dispatch end) ->
{state, 0}
Expand Down Expand Up @@ -297,7 +338,6 @@ defmodule Exq.Manager.Server do
update_worker_count(state.work_table, queue, 1)
end


# Setup queues from options / configs.

# The following is done:
Expand Down
15 changes: 15 additions & 0 deletions lib/exq/redis/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ defmodule Exq.Redis.Connection do
val
end

def hget!(redis, key, field) do
{:ok, val} = q(redis, ["HGET", key, field])
val
end

def hvals!(redis, key) do
{:ok, val} = q(redis, ["HVALS", key])
val
end

def hlen!(redis, key) do
{:ok, val} = q(redis, ["HLEN", key])
val
end

def set!(redis, key, val \\ 0) do
q(redis, ["SET", key, val])
end
Expand Down
69 changes: 55 additions & 14 deletions lib/exq/redis/job_stat.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,41 +38,78 @@ defmodule Exq.Redis.JobStat do
{:ok, count}
end

def add_process_commands(namespace, process_info, serialized_process \\ nil) do
serialized = serialized_process || Exq.Support.Process.encode(process_info)
[["SADD", JobQueue.full_key(namespace, "processes"), serialized]]
def add_process_commands(namespace, process_info, _) do
name = supervisor_worker_name(namespace, process_info)
string_pid = :erlang.pid_to_list(process_info.pid)
[
["SADD", JobQueue.full_key(namespace, "processes"), name], # ensure supervisor worker is added to list
["HINCRBY", name, "busy", "1"],
["HSET", "#{name}:workers", string_pid, Poison.encode!(%{
run_at: process_info.started_at,
pid: string_pid,
payload: serialize_processing_payload(process_info.job),
hostname: process_info.hostname,
queue: process_info.job && process_info.job.queue
})]
]
end
def add_process(redis, namespace, process_info, serialized_process \\ nil) do
instr = add_process_commands(namespace, process_info, serialized_process)
Connection.qp!(redis, instr)
:ok
end

def remove_process_commands(namespace, process_info, serialized_process \\ nil) do
serialized = serialized_process || Exq.Support.Process.encode(process_info)
[["SREM", JobQueue.full_key(namespace, "processes"), serialized]]
defp serialize_processing_payload(nil) do
%{}
end
defp serialize_processing_payload(job) do
%{
queue: job.queue,
class: job.class,
args: job.args,
jid: job.jid,
created_at: job.enqueued_at,
enqueued_at: job.enqueued_at
}
end

def remove_process_commands(namespace, process_info, _) do
name = supervisor_worker_name(namespace, process_info)
[
["HINCRBY", name, "busy", "-1"],
["HDEL", "#{name}:workers", :erlang.pid_to_list(process_info.pid)],
]
end
def remove_process(redis, namespace, process_info, serialized_process \\ nil) do
instr = remove_process_commands(namespace, process_info, serialized_process)
Connection.qp!(redis, instr)
:ok
end

def cleanup_processes(redis, namespace, host) do
Connection.smembers!(redis, JobQueue.full_key(namespace, "processes"))
|> Enum.map(fn(serialized) -> {Process.decode(serialized), serialized} end)
|> Enum.filter(fn({process, _}) -> process.host == host end)
|> Enum.each(fn({process, serialized}) -> remove_process(redis, namespace, process, serialized) end)
def cleanup_processes(redis, namespace, hostname, master_pid) do
processes = JobQueue.full_key(namespace, "processes")
master_pid_string = "#{:erlang.pid_to_list(master_pid)}"
instr = Connection.smembers!(redis, processes)
|> Enum.filter(fn(key) -> key =~ "#{hostname}:" end)
|> Enum.filter(fn(key) -> ((Connection.hget!(redis, key, "info") || '{}') |> Poison.decode!)["pid"] != master_pid_string end)
|> Enum.flat_map(fn(key) -> [["SREM", processes, key], ["DEL", "#{processes}:workers"]] end)

if Enum.count(instr) > 0 do
Connection.qp!(redis, instr)
end
:ok
end

def busy(redis, namespace) do
Connection.scard!(redis, JobQueue.full_key(namespace, "processes"))
(Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) || [])
|> Enum.map(fn(key) -> Connection.hlen!(redis, "#{key}:workers") end)
|> Enum.sum
end

def processes(redis, namespace) do
list = Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) || []
Enum.map(list, &Process.decode/1)
(Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) || [])
|> Enum.flat_map(fn(key) -> Connection.hvals!(redis, "#{key}:workers") end)
|> Enum.map(&Process.decode(&1))
end

def find_failed(redis, namespace, jid) do
Expand Down Expand Up @@ -120,6 +157,10 @@ defmodule Exq.Redis.JobStat do
{:ok, failures, successes}
end

defp supervisor_worker_name(namespace, process_info) do
JobQueue.full_key(namespace, "#{process_info.hostname}:elixir")
end

defp realtime_stats_formatter(redis, namespace) do
fn(keys, ns) ->
if Enum.empty?(keys) do
Expand Down
8 changes: 6 additions & 2 deletions lib/exq/serializers/json_serializer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ defmodule Exq.Serializers.JsonSerializer do
deserialized = decode!(serialized)
%Exq.Support.Process{
pid: Map.get(deserialized, "pid"),
host: Map.get(deserialized, "host"),
hostname: Map.get(deserialized, "hostname"),
queues: Map.get(deserialized, "queues"),
concurrency: Map.get(deserialized, "concurrency"),
job: Map.get(deserialized, "job"),
started_at: Map.get(deserialized, "started_at")
}
Expand All @@ -66,7 +68,9 @@ defmodule Exq.Serializers.JsonSerializer do
formatted_pid = to_string(:io_lib.format("~p", [process.pid]))
deserialized = Enum.into([
pid: formatted_pid,
host: process.host,
hostname: process.hostname,
queues: process.queues,
concurrency: process.concurrency,
job: process.job,
started_at: process.started_at], Map.new)

Expand Down
14 changes: 6 additions & 8 deletions lib/exq/stats/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ defmodule Exq.Stats.Server do
@doc """
Add in progress worker process
"""
def add_process(stats, namespace, worker, host, job_serialized) do
def add_process(stats, namespace, worker, hostname, job_serialized) do
process_info = %Process{pid: worker,
host: host,
hostname: hostname,
job: Exq.Support.Config.serializer.decode_job(job_serialized),
started_at: Time.unix_seconds}
serialized = Exq.Support.Process.encode(process_info)
Expand Down Expand Up @@ -62,8 +62,8 @@ defmodule Exq.Stats.Server do
@doc """
Cleanup stats on boot. This includes cleaning up busy workers.
"""
def cleanup_host_stats(stats, namespace, host) do
GenServer.call(stats, {:cleanup_host_stats, namespace, host})
def cleanup_host_stats(stats, namespace, host, master_pid) do
GenServer.call(stats, {:cleanup_host_stats, namespace, host, master_pid})
:ok
end

Expand All @@ -76,7 +76,6 @@ defmodule Exq.Stats.Server do
GenServer.call(stats, :force_flush)
end


##===========================================================
## gen server callbacks
##===========================================================
Expand All @@ -102,9 +101,9 @@ defmodule Exq.Stats.Server do
{:reply, :ok, state}
end

def handle_call({:cleanup_host_stats, namespace, host}, _from, state) do
def handle_call({:cleanup_host_stats, namespace, host, master_pid}, _from, state) do
try do
JobStat.cleanup_processes(state.redis, namespace, host)
JobStat.cleanup_processes(state.redis, namespace, host, master_pid)
rescue
e -> Logger.error("Error cleaning up processes - #{Kernel.inspect e}")
end
Expand All @@ -124,7 +123,6 @@ defmodule Exq.Stats.Server do
:ok
end


##===========================================================
## Methods
##===========================================================
Expand Down
2 changes: 1 addition & 1 deletion lib/exq/support/process.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Exq.Support.Process do
@moduledoc """
Struct for in progress worker
"""
defstruct pid: nil, host: nil, job: nil, started_at: nil
defstruct pid: nil, hostname: nil, job: nil, started_at: nil, concurrency: nil, job: nil, queues: nil

alias Exq.Support.Config

Expand Down
1 change: 0 additions & 1 deletion lib/exq/worker/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ defmodule Exq.Worker.Server do
{:noreply, state}
end


@doc """
Dispatch work to the target module (call :perform method of target)
"""
Expand Down
4 changes: 2 additions & 2 deletions test/api_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ defmodule ApiTest do
test "processes with data" do
JobStat.add_process(:testredis, "test", %Process{pid: self()})
assert {:ok, [processes]} = Exq.Api.processes(Exq.Api)
my_pid_str = to_string(:erlang.pid_to_list(self()))
assert %Process{pid: ^my_pid_str} = processes
pid = to_string(:erlang.pid_to_list(self()))
assert pid = processes.pid
end

test "jobs when empty" do
Expand Down
3 changes: 1 addition & 2 deletions test/exq_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ defmodule ExqTest do

# Clear processes for this node
host = Exq.NodeIdentifier.HostnameIdentifier.node_id()
Exq.Stats.Server.cleanup_host_stats(ExqP.Stats, "test", host)
Exq.Stats.Server.cleanup_host_stats(ExqP.Stats, "test", host, self())

# Check that process has been cleared
processes = Exq.Redis.JobStat.processes(state.redis, "test")
Expand Down Expand Up @@ -320,7 +320,6 @@ defmodule ExqTest do
{:ok, count} = TestStats.failed_count(state.redis, "test")
assert count == "2"


{:ok, jid} = Exq.enqueue(Exq, "default", "ExqTest.FailWorker/failure_perform", [])

# if we kill Exq too fast we dont record the failure because exq is gone
Expand Down
9 changes: 4 additions & 5 deletions test/job_stat_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ defmodule JobStatTest do
{:ok, jid}
end

def create_process_info(host) do
def create_process_info(hostname) do
process_info = %Process{pid: self(),
host: host,
hostname: hostname,
job: %Job{},
started_at: Time.unix_seconds}
serialized = Exq.Support.Process.encode(process_info)
Expand Down Expand Up @@ -122,11 +122,10 @@ defmodule JobStatTest do
assert Enum.count(Exq.Redis.JobStat.processes(:testredis, namespace)) == 2

# Should cleanup only the host that is passed in
JobStat.cleanup_processes(:testredis, namespace, "host123")
JobStat.cleanup_processes(:testredis, namespace, "host123", self())
processes = Exq.Redis.JobStat.processes(:testredis, namespace)
assert Enum.count(processes) == 1
assert Enum.find(processes, fn(process) -> process.host == "host456" end) != nil
assert Enum.find(processes, fn(process) -> process.hostname == "host456" end) != nil
end


end

0 comments on commit 5df7dcf

Please sign in to comment.