Skip to content

Commit

Permalink
Setup supervision tree for pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
zacksiri committed Oct 25, 2024
1 parent 92a1cc4 commit 9b7058e
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 4 deletions.
19 changes: 15 additions & 4 deletions lib/uplink/metrics/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ defmodule Uplink.Metrics.Pipeline do
alias Uplink.Metrics.Document

def start_link(opts) do
monitor = Keyword.fetch!(opts, :monitor)
monitors = Keyword.fetch!(opts, :monitors)

Broadway.start_link(__MODULE__,
name: __MODULE__,
name: {:global, __MODULE__},
context: %{
monitor: monitor
monitors: monitors
},
producer: [
module: {Uplink.Metrics.Producer, [poll_interval: :timer.seconds(15)]},
Expand All @@ -31,6 +31,14 @@ defmodule Uplink.Metrics.Pipeline do
]
]
)
|> case do
{:ok, pid} ->
{:ok, pid}

{:error, {:already_started, pid}} ->
Process.link(pid)
{:ok, pid}
end
end

def handle_message(_, %Message{data: data} = message, _) do
Expand All @@ -56,7 +64,10 @@ defmodule Uplink.Metrics.Pipeline do
def handle_batch(_, messages, _batch_info, context) do
documents = to_ndjson(messages)

Metrics.push!(context.monitor, documents) |> IO.inspect()
context.monitors
|> Enum.map(fn monitor ->
Metrics.push!(context.monitor, documents)
end)

messages
end
Expand Down
33 changes: 33 additions & 0 deletions lib/uplink/monitors.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule Uplink.Monitors do
use Task

alias Uplink.Cache
alias Uplink.Pipelines
alias Uplink.Clients.Instellar

def start_link(options) do
Task.start_link(__MODULE__, :run, options)
end

def run(options) do
Instellar.list_monitors()
|> case do
{:ok, %{body: monitors}} ->
state = maybe_start_pipeline(monitors)

error ->
{:error, error}
end
end

defp maybe_start_pipeline(monitors) do
Cache.transaction([keys: [:monitors]], fn ->
started_monitors = Cache.get(:monitors)

not_started_monitors =
Enum.filter(monitors, fn monitor ->
monitor["attributes"]["id"] not in started_monitors
end)
end)
end
end
17 changes: 17 additions & 0 deletions lib/uplink/pipelines.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Uplink.Pipelines do
use DynamicSupervisor

def start_link(options) do
DynamicSupervisor.start_link(__MODULE__, options, name: __MODULE__)
end

def start_metrics(monitors) do
spec = {Uplink.Metrics.Pipeline, monitors: monitors}

DynamicSupervisor.start_child(__MODULE__, spec)
end

def init(_options) do
DynamicSupervisor.init(strategy: :one_for_one)
end
end

0 comments on commit 9b7058e

Please sign in to comment.