diff --git a/lib/uplink/application.ex b/lib/uplink/application.ex index dc3a8b8..dac725e 100644 --- a/lib/uplink/application.ex +++ b/lib/uplink/application.ex @@ -22,7 +22,6 @@ defmodule Uplink.Application do {Cluster.Supervisor, [topologies, [name: Uplink.ClusterSupervisor]]}, {Task.Supervisor, name: Uplink.TaskSupervisor}, {Plug.Cowboy, plug: Uplink.Internal, scheme: :http, port: internal_port}, - {Uplink.Pipelines.Context, [name: :metrics, monitors: []]}, {Pogo.DynamicSupervisor, [name: Uplink.PipelineSupervisor, scope: :uplink]}, {Uplink.Monitors, []}, diff --git a/lib/uplink/monitors.ex b/lib/uplink/monitors.ex index 10436c6..317af6a 100644 --- a/lib/uplink/monitors.ex +++ b/lib/uplink/monitors.ex @@ -1,6 +1,7 @@ defmodule Uplink.Monitors do use Task + alias Uplink.Cache alias Uplink.Pipelines alias Uplink.Clients.Instellar @@ -15,10 +16,14 @@ defmodule Uplink.Monitors do end def run(_options) do + Cache.put_new({:monitors, :metrics}, []) + Instellar.list_monitors() |> case do {:ok, monitors} -> - start_pipeline(monitors, :metrics) + Cache.transaction([keys: [{:monitors, :metrics}]], fn -> + start_pipeline(monitors, :metrics) + end) error -> {:error, error} diff --git a/lib/uplink/pipelines.ex b/lib/uplink/pipelines.ex index adf6c13..0161f68 100644 --- a/lib/uplink/pipelines.ex +++ b/lib/uplink/pipelines.ex @@ -1,11 +1,15 @@ defmodule Uplink.Pipelines do - defdelegate get_monitors(context), - to: __MODULE__.Context, - as: :get + alias Uplink.Cache - defdelegate append_monitors(context, monitors), - to: __MODULE__.Context, - as: :append + def get_monitors(context) do + Cache.get({:monitors, context}) || [] + end + + def append_monitors(context, monitors) do + Cache.get_and_update({:monitors, context}, fn existing_monitors -> + {existing_monitors, existing_monitors ++ monitors} + end) + end def start(module) do spec = %{ diff --git a/lib/uplink/pipelines/context.ex b/lib/uplink/pipelines/context.ex deleted file mode 100644 index 7328771..0000000 --- a/lib/uplink/pipelines/context.ex +++ /dev/null @@ -1,32 +0,0 @@ -defmodule Uplink.Pipelines.Context do - use Agent - - require Logger - - def start_link(options) do - monitors = Keyword.get(options, :monitors, []) - name = Keyword.fetch!(options, :name) - - Agent.start_link(fn -> monitors end, name: {:global, name}) - |> case do - {:ok, pid} -> - Logger.info("[Uplink.Pipelines.Context] started #{inspect(name)}") - - {:ok, pid} - - {:error, {:already_started, pid}} -> - Process.link(pid) - {:ok, pid} - end - end - - def get(pid_or_name) do - Agent.get({:global, pid_or_name}, fn monitors -> monitors end) - end - - def append(pid_or_name, new_monitors) do - Agent.get_and_update({:global, pid_or_name}, fn monitors -> - {monitors, monitors ++ new_monitors} - end) - end -end