Skip to content

Commit

Permalink
Begin setting up broadway pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
zacksiri committed Oct 22, 2024
1 parent d93a6b4 commit 14231b9
Show file tree
Hide file tree
Showing 13 changed files with 212 additions and 67 deletions.
63 changes: 31 additions & 32 deletions lib/uplink/clients/lxd.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule Uplink.Clients.LXD do
to: __MODULE__.Instance.Manager,
as: :list

defdelegate list_instances(),
defdelegate list_instances(options \\ []),
to: __MODULE__.Instance.Manager,
as: :list

Expand All @@ -33,37 +33,36 @@ defmodule Uplink.Clients.LXD do
as: :leases

def uplink_leases do
Cache.get({:leases, "uplink"}) ||
(
config = Application.get_env(:uplink, Uplink.Data) || []
uplink_project = Keyword.get(config, :project, "default")
client = LXD.client()

uplink_project =
client
|> Lexdee.get_project(uplink_project)
|> case do
{:ok, %{body: %{"name" => name}}} -> name
{:error, %{"error_code" => 404}} -> "default"
end

case LXD.network_leases(uplink_project) do
leases when is_list(leases) ->
uplink_addresses =
Enum.map(leases, fn lease ->
lease.address
end)

Cache.put({:leases, "uplink"}, uplink_addresses,
ttl: :timer.hours(3)
)

uplink_addresses

{:error, error} ->
{:error, error}
end
)
Cache.get({:leases, "uplink"}) || fetch_leases()
end

defp fetch_leases do
config = Application.get_env(:uplink, Uplink.Data) || []
uplink_project = Keyword.get(config, :project, "default")
client = LXD.client()

uplink_project =
client
|> Lexdee.get_project(uplink_project)
|> case do
{:ok, %{body: %{"name" => name}}} -> name
{:error, %{"error_code" => 404}} -> "default"
end

case LXD.network_leases(uplink_project) do
leases when is_list(leases) ->
uplink_addresses =
Enum.map(leases, fn lease ->
lease.address
end)

Cache.put({:leases, "uplink"}, uplink_addresses, ttl: :timer.hours(3))

uplink_addresses

{:error, error} ->
{:error, error}
end
end

def client do
Expand Down
6 changes: 6 additions & 0 deletions lib/uplink/clients/lxd/instance.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ defmodule Uplink.Clients.LXD.Instance do
status
architecture
profiles
project
description
created_at
last_used_at
expanded_config
expanded_devices
state
)a

@required_attrs ~w(
Expand All @@ -34,13 +37,16 @@ defmodule Uplink.Clients.LXD.Instance do
field :status, :string
field :architecture, :string
field :profiles, {:array, :string}
field :project, :string
field :description, :string

field :created_at, :utc_datetime_usec
field :last_used_at, :utc_datetime_usec

field :expanded_config, :map
field :expanded_devices, :map

field :state, :map
end

def changeset(schema, params) do
Expand Down
32 changes: 15 additions & 17 deletions lib/uplink/clients/lxd/instance/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,25 @@ defmodule Uplink.Clients.LXD.Instance.Manager do
alias Clients.LXD
alias LXD.Instance

def list do
LXD.client()
|> Lexdee.list_instances(query: [{:recursion, 1}, {"all-projects", true}])
|> case do
{:ok, %{body: instances}} ->
instances =
instances
|> Enum.map(fn instance ->
Instance.parse(instance)
end)
def list(options \\ []) do
project = Keyword.get(options, :project, nil)
recursion = Keyword.get(options, :recursion, 1)

instances

error ->
error
if recursion < 1 do
raise "recursion must be greater than 0"
end
end

def list(project) do
project_query =
if project do
[project: project]
else
[{"all-projects", true}]
end

query = [{:recursion, recursion} | project_query]

LXD.client()
|> Lexdee.list_instances(query: [recursion: 1, project: project])
|> Lexdee.list_instances(query: query)
|> case do
{:ok, %{body: instances}} ->
instances =
Expand Down
27 changes: 15 additions & 12 deletions lib/uplink/monitors.ex
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
defmodule Uplink.Monitors do
alias Uplink.Clients.Instellar

def index(type) do
%{"uplink" => %{"id" => uplink_id}} = Instellar.get_self()
defdelegate get_instances_metrics,
to: __MODULE__.Instance,
as: :metrics

"metrics-system.#{type}-uplink-#{uplink_id}"
end

def push(monitor, type, params) do
def push(%{"attributes" => attributes} = monitor, type, document) do
headers = headers(monitor)
index = index(type)
endpoint = Map.fetch!(attributes, "endpoint")

[index, "_doc"]
[endpoint, index, "_doc"]
|> Path.join()
|> Repo.post(headers: headers, json: params)
|> Req.post(headers: headers, json: document)
end

defp index(type) do
%{"uplink" => %{"id" => uplink_id}} = Instellar.get_self()

"metrics-system.#{type}-uplink-#{uplink_id}"
end

defp headers(%{"attributes" => %{"uid" => uid, "token" => token}}) do
Base.encode64("#{uid}:#{token}")
encoded_token = Base.encode64("#{uid}:#{token}")

[
{"authorization", "ApiKey #{token}"}
]
[{"authorization", "ApiKey #{encoded_token}"}]
end
end
21 changes: 21 additions & 0 deletions lib/uplink/monitors/instance.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule Uplink.Monitors.Instance do
alias Uplink.Clients.LXD

defstruct [:name, :data]

def metrics do
LXD.list_instances(recursion: 2)

# members
# |> Enum.map(fn member ->
# Lexdee.show_resources(client, member.server_name)
# |> case do
# {:ok, %{body: metric}} ->
# %__MODULE__{node: member.server_name, data: metric}

# {:error, error} ->
# error
# end
# end)
end
end
13 changes: 13 additions & 0 deletions lib/uplink/monitors/instance/metric.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defimpl Uplink.Monitors.Metric, for: Uplink.Monitors.Instance do
alias Uplink.Monitors.Instance

def memory(%Instance{name: node_name, data: data}) do
%{
"@timestamp" => DateTime.utc_now(),
"host" => %{
"name" => node_name,
"containerized" => false
}
}
end
end
7 changes: 7 additions & 0 deletions lib/uplink/monitors/metric.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
defprotocol Uplink.Monitors.Metric do
@spec cpu(struct) :: {:ok, map()} | {:error, String.t()}
def cpu(data)

@spec memory(struct) :: {:ok, map()} | {:error, String.t()}
def memory(data)
end
21 changes: 21 additions & 0 deletions lib/uplink/monitors/observer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule Uplink.Monitors.Observer do
use GenStage

def start_link do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end

def init(_options) do
schedule_initial_job()

{:ok, %{}}
end

def handle_info(:perform, state) do


defp schedule_initial_job() do
# In 5 seconds
Process.send_after(self(), :perform, 5_000)
end
end
20 changes: 20 additions & 0 deletions lib/uplink/monitors/pipeline.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule Uplink.Monitors.Pipeline do
use Broadway

alias Broadway.Message

def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {Uplink.Monitors.Producer, [poll_interval: :timer.seconds(15)]},
cncurrency: 1
],
processors: [
default: [
concurrency: 1
]
]
)
end
end
54 changes: 54 additions & 0 deletions lib/uplink/monitors/producer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
defmodule Uplink.Monitors.Producer do
use GenStage
@behaviour Broadway.Producer

alias Uplink.Metrics

@doc false
def start_link(opts) do
GenStage.start_link(__MODULE__, opts)
end

@impl true
def init(opts) do
state = %{
client: LXD.client(),
demand: 0,
poll_interval: Keyword.get(opts, :poll_interval, 15_000)
}

{:producer, state}
end

@impl true
def handle_demand(demand, state) do
{messages, state} = load_metrics(demand, state)
{:noreply, messages, state}
end

def handle_info(:poll, state) do
{messages, state} = load_metrics(0, state)
Process.send_after(self(), :poll, state.poll_interval)
end

defp load_metrics(demand, state) when demand <= 0, do: {[], state}

defp load_metrics(demand, state) do
messages =
Metrics.get_instances_metrics()
|> transform_metrics()

{messages, %{state | demand: demand - Enum.count(messages)}}
end

defp transform_metrics(metrics) do
Enum.map(metrics, &transform_message/1)
end

defp transform_message(message) do
%Broadway.Message{
data: message,
acknowledger: Broadway.NoopAcknowledger.init()
}
end
end
2 changes: 1 addition & 1 deletion lib/uplink/packages/install/execute.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ defmodule Uplink.Packages.Install.Execute do
project = Packages.get_project_name(client, metadata)

existing_instances =
LXD.list_instances(project)
LXD.list_instances(project: project)
|> Enum.filter(&only_uplink_instance/1)

Cache.put_new({:install, state.install.id, "completed"}, [],
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ defmodule Uplink.MixProject do
{:plug_cowboy, "~> 2.0"},
{:reverse_proxy_plug, "~> 2.1"},
{:mint_web_socket, "~> 1.0.2"},
{:broadway, "~> 1.0"},

# Test
{:bypass, "~> 2.1", only: :test},
Expand Down
Loading

0 comments on commit 14231b9

Please sign in to comment.