diff --git a/lib/exq/manager/server.ex b/lib/exq/manager/server.ex index 8dbc6a8b..18c70b87 100644 --- a/lib/exq/manager/server.ex +++ b/lib/exq/manager/server.ex @@ -113,14 +113,16 @@ defmodule Exq.Manager.Server do use GenServer alias Exq.Enqueuer alias Exq.Support.Config + alias Exq.Support.Time alias Exq.Redis.JobQueue + alias Exq.Redis.Connection @backoff_mult 10 defmodule State do defstruct redis: nil, stats: nil, enqueuer: nil, pid: nil, node_id: nil, namespace: nil, work_table: nil, queues: nil, poll_timeout: nil, scheduler_poll_timeout: nil, workers_sup: nil, - middleware: nil, metadata: nil + middleware: nil, metadata: nil, started_at: nil end def start_link(opts\\[]) do @@ -135,7 +137,6 @@ defmodule Exq.Manager.Server do def server_name(nil), do: Config.get(:name) def server_name(name), do: name - ##=========================================================== ## gen server callbacks ##=========================================================== @@ -160,13 +161,37 @@ defmodule Exq.Manager.Server do queues: opts[:queues], pid: self(), poll_timeout: opts[:poll_timeout], - scheduler_poll_timeout: opts[:scheduler_poll_timeout] + scheduler_poll_timeout: opts[:scheduler_poll_timeout], + started_at: Time.unix_seconds, } check_redis_connection(opts) + + name = redis_worker_name(state) + worker_init = [ + ["DEL", "#{name}:workers"], # remove old working processes + ["SADD", JobQueue.full_key(state.namespace, "processes"), name], + ["HSET", name, "quiet", "false"], + ["HSET", name, "info", Poison.encode!(%{ hostname: state.node_id, started_at: state.started_at, pid: "#{:erlang.pid_to_list(state.pid)}", concurrency: cocurency_count(state), queues: state.queues})], + ["HSET", name, "beat", Time.unix_seconds], + ["EXPIRE", name, (state.poll_timeout / 1000 + 5)], + ] + Connection.qp!(state.redis, worker_init) + {:ok, state, 0} end + def cocurency_count(state) do + Enum.map(state.queues, fn(q) -> + [{_, concurrency, _}] = :ets.lookup(state.work_table, q) + cond do + concurrency == :infinite -> 1000000 + true -> concurrency + end + end) + |> Enum.sum + end + def handle_call({:enqueue, queue, worker, args, options}, from, state) do Enqueuer.enqueue(state.enqueuer, from, queue, worker, args, options) {:noreply, state, 10} @@ -214,7 +239,7 @@ defmodule Exq.Manager.Server do """ def handle_cast(:cleanup_host_stats, state) do rescue_timeout(fn -> - Exq.Stats.Server.cleanup_host_stats(state.stats, state.namespace, state.node_id) + Exq.Stats.Server.cleanup_host_stats(state.stats, state.namespace, state.node_id, state.pid) end) {:noreply, state, 0} end @@ -240,6 +265,11 @@ defmodule Exq.Manager.Server do ##=========================================================== ## Internal Functions ##=========================================================== + + defp redis_worker_name(state) do + JobQueue.full_key(state.namespace, "#{state.node_id}:elixir") + end + @doc """ Dequeue jobs and dispatch to workers """ @@ -251,6 +281,17 @@ defmodule Exq.Manager.Server do job_results = jobs |> Enum.map(fn(potential_job) -> dispatch_job(state, potential_job) end) + # Update worker info in redis that it is alive + name = redis_worker_name(state) + worker_init = [ + ["SADD", JobQueue.full_key(state.namespace, "processes"), name], + ["HSET", name, "quiet", "false"], + ["HSET", name, "info", Poison.encode!(%{ hostname: state.node_id, started_at: state.started_at, pid: "#{:erlang.pid_to_list(state.pid)}", concurrency: cocurency_count(state), queues: state.queues})], + ["HSET", name, "beat", Time.unix_seconds], + ["EXPIRE", name, (state.poll_timeout / 1000 + 5)], # expire information about live worker in poll_interval + 5s + ] + Connection.qp!(state.redis, worker_init) + cond do Enum.any?(job_results, fn(status) -> elem(status, 1) == :dispatch end) -> {state, 0} @@ -297,7 +338,6 @@ defmodule Exq.Manager.Server do update_worker_count(state.work_table, queue, 1) end - # Setup queues from options / configs. # The following is done: diff --git a/lib/exq/redis/connection.ex b/lib/exq/redis/connection.ex index d3006b24..8afe7a20 100644 --- a/lib/exq/redis/connection.ex +++ b/lib/exq/redis/connection.ex @@ -28,6 +28,21 @@ defmodule Exq.Redis.Connection do val end + def hget!(redis, key, field) do + {:ok, val} = q(redis, ["HGET", key, field]) + val + end + + def hvals!(redis, key) do + {:ok, val} = q(redis, ["HVALS", key]) + val + end + + def hlen!(redis, key) do + {:ok, val} = q(redis, ["HLEN", key]) + val + end + def set!(redis, key, val \\ 0) do q(redis, ["SET", key, val]) end diff --git a/lib/exq/redis/job_stat.ex b/lib/exq/redis/job_stat.ex index 042dbf34..9f78f778 100644 --- a/lib/exq/redis/job_stat.ex +++ b/lib/exq/redis/job_stat.ex @@ -38,9 +38,20 @@ defmodule Exq.Redis.JobStat do {:ok, count} end - def add_process_commands(namespace, process_info, serialized_process \\ nil) do - serialized = serialized_process || Exq.Support.Process.encode(process_info) - [["SADD", JobQueue.full_key(namespace, "processes"), serialized]] + def add_process_commands(namespace, process_info, _) do + name = supervisor_worker_name(namespace, process_info) + string_pid = :erlang.pid_to_list(process_info.pid) + [ + ["SADD", JobQueue.full_key(namespace, "processes"), name], # ensure supervisor worker is added to list + ["HINCRBY", name, "busy", "1"], + ["HSET", "#{name}:workers", string_pid, Poison.encode!(%{ + run_at: process_info.started_at, + pid: string_pid, + payload: serialize_processing_payload(process_info.job), + hostname: process_info.hostname, + queue: process_info.job && process_info.job.queue + })] + ] end def add_process(redis, namespace, process_info, serialized_process \\ nil) do instr = add_process_commands(namespace, process_info, serialized_process) @@ -48,9 +59,26 @@ defmodule Exq.Redis.JobStat do :ok end - def remove_process_commands(namespace, process_info, serialized_process \\ nil) do - serialized = serialized_process || Exq.Support.Process.encode(process_info) - [["SREM", JobQueue.full_key(namespace, "processes"), serialized]] + defp serialize_processing_payload(nil) do + %{} + end + defp serialize_processing_payload(job) do + %{ + queue: job.queue, + class: job.class, + args: job.args, + jid: job.jid, + created_at: job.enqueued_at, + enqueued_at: job.enqueued_at + } + end + + def remove_process_commands(namespace, process_info, _) do + name = supervisor_worker_name(namespace, process_info) + [ + ["HINCRBY", name, "busy", "-1"], + ["HDEL", "#{name}:workers", :erlang.pid_to_list(process_info.pid)], + ] end def remove_process(redis, namespace, process_info, serialized_process \\ nil) do instr = remove_process_commands(namespace, process_info, serialized_process) @@ -58,21 +86,30 @@ defmodule Exq.Redis.JobStat do :ok end - def cleanup_processes(redis, namespace, host) do - Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) - |> Enum.map(fn(serialized) -> {Process.decode(serialized), serialized} end) - |> Enum.filter(fn({process, _}) -> process.host == host end) - |> Enum.each(fn({process, serialized}) -> remove_process(redis, namespace, process, serialized) end) + def cleanup_processes(redis, namespace, hostname, master_pid) do + processes = JobQueue.full_key(namespace, "processes") + master_pid_string = "#{:erlang.pid_to_list(master_pid)}" + instr = Connection.smembers!(redis, processes) + |> Enum.filter(fn(key) -> key =~ "#{hostname}:" end) + |> Enum.filter(fn(key) -> ((Connection.hget!(redis, key, "info") || '{}') |> Poison.decode!)["pid"] != master_pid_string end) + |> Enum.flat_map(fn(key) -> [["SREM", processes, key], ["DEL", "#{processes}:workers"]] end) + + if Enum.count(instr) > 0 do + Connection.qp!(redis, instr) + end :ok end def busy(redis, namespace) do - Connection.scard!(redis, JobQueue.full_key(namespace, "processes")) + (Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) || []) + |> Enum.map(fn(key) -> Connection.hlen!(redis, "#{key}:workers") end) + |> Enum.sum end def processes(redis, namespace) do - list = Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) || [] - Enum.map(list, &Process.decode/1) + (Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) || []) + |> Enum.flat_map(fn(key) -> Connection.hvals!(redis, "#{key}:workers") end) + |> Enum.map(&Process.decode(&1)) end def find_failed(redis, namespace, jid) do @@ -120,6 +157,10 @@ defmodule Exq.Redis.JobStat do {:ok, failures, successes} end + defp supervisor_worker_name(namespace, process_info) do + JobQueue.full_key(namespace, "#{process_info.hostname}:elixir") + end + defp realtime_stats_formatter(redis, namespace) do fn(keys, ns) -> if Enum.empty?(keys) do diff --git a/lib/exq/serializers/json_serializer.ex b/lib/exq/serializers/json_serializer.ex index b3db3794..2ef91b8f 100644 --- a/lib/exq/serializers/json_serializer.ex +++ b/lib/exq/serializers/json_serializer.ex @@ -56,7 +56,9 @@ defmodule Exq.Serializers.JsonSerializer do deserialized = decode!(serialized) %Exq.Support.Process{ pid: Map.get(deserialized, "pid"), - host: Map.get(deserialized, "host"), + hostname: Map.get(deserialized, "hostname"), + queues: Map.get(deserialized, "queues"), + concurrency: Map.get(deserialized, "concurrency"), job: Map.get(deserialized, "job"), started_at: Map.get(deserialized, "started_at") } @@ -66,7 +68,9 @@ defmodule Exq.Serializers.JsonSerializer do formatted_pid = to_string(:io_lib.format("~p", [process.pid])) deserialized = Enum.into([ pid: formatted_pid, - host: process.host, + hostname: process.hostname, + queues: process.queues, + concurrency: process.concurrency, job: process.job, started_at: process.started_at], Map.new) diff --git a/lib/exq/stats/server.ex b/lib/exq/stats/server.ex index b2eabdb2..809b0ac4 100644 --- a/lib/exq/stats/server.ex +++ b/lib/exq/stats/server.ex @@ -24,9 +24,9 @@ defmodule Exq.Stats.Server do @doc """ Add in progress worker process """ - def add_process(stats, namespace, worker, host, job_serialized) do + def add_process(stats, namespace, worker, hostname, job_serialized) do process_info = %Process{pid: worker, - host: host, + hostname: hostname, job: Exq.Support.Config.serializer.decode_job(job_serialized), started_at: Time.unix_seconds} serialized = Exq.Support.Process.encode(process_info) @@ -62,8 +62,8 @@ defmodule Exq.Stats.Server do @doc """ Cleanup stats on boot. This includes cleaning up busy workers. """ - def cleanup_host_stats(stats, namespace, host) do - GenServer.call(stats, {:cleanup_host_stats, namespace, host}) + def cleanup_host_stats(stats, namespace, host, master_pid) do + GenServer.call(stats, {:cleanup_host_stats, namespace, host, master_pid}) :ok end @@ -76,7 +76,6 @@ defmodule Exq.Stats.Server do GenServer.call(stats, :force_flush) end - ##=========================================================== ## gen server callbacks ##=========================================================== @@ -102,9 +101,9 @@ defmodule Exq.Stats.Server do {:reply, :ok, state} end - def handle_call({:cleanup_host_stats, namespace, host}, _from, state) do + def handle_call({:cleanup_host_stats, namespace, host, master_pid}, _from, state) do try do - JobStat.cleanup_processes(state.redis, namespace, host) + JobStat.cleanup_processes(state.redis, namespace, host, master_pid) rescue e -> Logger.error("Error cleaning up processes - #{Kernel.inspect e}") end @@ -124,7 +123,6 @@ defmodule Exq.Stats.Server do :ok end - ##=========================================================== ## Methods ##=========================================================== diff --git a/lib/exq/support/process.ex b/lib/exq/support/process.ex index c40bb16d..e90ddd99 100644 --- a/lib/exq/support/process.ex +++ b/lib/exq/support/process.ex @@ -2,7 +2,7 @@ defmodule Exq.Support.Process do @moduledoc """ Struct for in progress worker """ - defstruct pid: nil, host: nil, job: nil, started_at: nil + defstruct pid: nil, hostname: nil, job: nil, started_at: nil, concurrency: nil, job: nil, queues: nil alias Exq.Support.Config diff --git a/lib/exq/worker/server.ex b/lib/exq/worker/server.ex index 85765976..261d2e77 100644 --- a/lib/exq/worker/server.ex +++ b/lib/exq/worker/server.ex @@ -73,7 +73,6 @@ defmodule Exq.Worker.Server do {:noreply, state} end - @doc """ Dispatch work to the target module (call :perform method of target) """ diff --git a/test/api_test.exs b/test/api_test.exs index 0b33e4a8..f5d3585e 100644 --- a/test/api_test.exs +++ b/test/api_test.exs @@ -67,8 +67,8 @@ defmodule ApiTest do test "processes with data" do JobStat.add_process(:testredis, "test", %Process{pid: self()}) assert {:ok, [processes]} = Exq.Api.processes(Exq.Api) - my_pid_str = to_string(:erlang.pid_to_list(self())) - assert %Process{pid: ^my_pid_str} = processes + pid = to_string(:erlang.pid_to_list(self())) + assert pid = processes.pid end test "jobs when empty" do diff --git a/test/exq_test.exs b/test/exq_test.exs index d5713b8e..ce65855b 100644 --- a/test/exq_test.exs +++ b/test/exq_test.exs @@ -281,7 +281,7 @@ defmodule ExqTest do # Clear processes for this node host = Exq.NodeIdentifier.HostnameIdentifier.node_id() - Exq.Stats.Server.cleanup_host_stats(ExqP.Stats, "test", host) + Exq.Stats.Server.cleanup_host_stats(ExqP.Stats, "test", host, self()) # Check that process has been cleared processes = Exq.Redis.JobStat.processes(state.redis, "test") @@ -320,7 +320,6 @@ defmodule ExqTest do {:ok, count} = TestStats.failed_count(state.redis, "test") assert count == "2" - {:ok, jid} = Exq.enqueue(Exq, "default", "ExqTest.FailWorker/failure_perform", []) # if we kill Exq too fast we dont record the failure because exq is gone diff --git a/test/job_stat_test.exs b/test/job_stat_test.exs index 77d141ef..4c91d11a 100644 --- a/test/job_stat_test.exs +++ b/test/job_stat_test.exs @@ -28,9 +28,9 @@ defmodule JobStatTest do {:ok, jid} end - def create_process_info(host) do + def create_process_info(hostname) do process_info = %Process{pid: self(), - host: host, + hostname: hostname, job: %Job{}, started_at: Time.unix_seconds} serialized = Exq.Support.Process.encode(process_info) @@ -122,11 +122,10 @@ defmodule JobStatTest do assert Enum.count(Exq.Redis.JobStat.processes(:testredis, namespace)) == 2 # Should cleanup only the host that is passed in - JobStat.cleanup_processes(:testredis, namespace, "host123") + JobStat.cleanup_processes(:testredis, namespace, "host123", self()) processes = Exq.Redis.JobStat.processes(:testredis, namespace) assert Enum.count(processes) == 1 - assert Enum.find(processes, fn(process) -> process.host == "host456" end) != nil + assert Enum.find(processes, fn(process) -> process.hostname == "host456" end) != nil end - end