diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 216f10e8..cfde9066 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,17 +1,17 @@ -name: 'CI' +name: "CI" on: push: branches: - master - develop - - 'feature/**' - - 'hotfix/**' + - "feature/**" + - "hotfix/**" jobs: deps: name: Dependencies - runs-on: warp-ubuntu-latest-x64-2x + runs-on: warp-ubuntu-2204-x64-2x container: image: alpine:3.18 @@ -55,7 +55,7 @@ jobs: static_code_analysis: name: Static Code Analysis needs: deps - runs-on: warp-ubuntu-latest-x64-2x + runs-on: warp-ubuntu-2204-x64-2x container: image: alpine:3.18 @@ -96,7 +96,7 @@ jobs: test: name: Unit Tests needs: deps - runs-on: warp-ubuntu-latest-x64-2x + runs-on: warp-ubuntu-2204-x64-2x container: image: alpine:3.18 diff --git a/.github/workflows/deployment.yml b/.github/workflows/deployment.yml index 1e7b13ef..8f19a5e6 100644 --- a/.github/workflows/deployment.yml +++ b/.github/workflows/deployment.yml @@ -10,7 +10,7 @@ on: jobs: build: name: Build - runs-on: warp-ubuntu-latest-x64-2x + runs-on: warp-ubuntu-2204-x64-2x if: ${{ github.event.workflow_run.conclusion == 'success' }} steps: - name: "Checkout" @@ -49,7 +49,7 @@ jobs: deploy: name: Deploy needs: build - runs-on: warp-ubuntu-latest-x64-2x + runs-on: warp-ubuntu-2204-x64-2x steps: - name: Checkout uses: actions/checkout@v4 diff --git a/.gitignore b/.gitignore index ffa6ed84..c31ab52f 100644 --- a/.gitignore +++ b/.gitignore @@ -34,4 +34,6 @@ uplink-*.tar mnesia -.mnesia \ No newline at end of file +.mnesia + +.envrc diff --git a/config/config.exs b/config/config.exs index e020588d..8a58d56d 100644 --- a/config/config.exs +++ b/config/config.exs @@ -33,10 +33,6 @@ config :uplink, Oban, components: 1 ] -config :reverse_proxy_plug, - :http_client, - ReverseProxyPlug.HTTPClient.Adapters.HTTPoison - config :formation, Formation.Lxd, timeout: 180 config :logger, diff --git a/config/dev.exs b/config/dev.exs index 25c4e064..619f4cba 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -1,10 +1,13 @@ import Config -config :uplink, Uplink.Secret, "secretsomethingsixteen" +config :uplink, + Uplink.Secret, + System.get_env("UPLINK_SECRET", "secretsomethingsixteen") config :uplink, Uplink.Data, mode: "lite" -config :uplink, Uplink.Clients.Instellar, endpoint: "http://localhost/uplink" +config :uplink, Uplink.Clients.Instellar, + endpoint: "http://localhost:4000/uplink" config :uplink, :environment, :dev diff --git a/config/test.exs b/config/test.exs index 27d7ac52..f7056499 100644 --- a/config/test.exs +++ b/config/test.exs @@ -2,6 +2,14 @@ import Config config :uplink, Uplink.Data, mode: "pro" +config :uplink, Uplink.Monitors, enabled: false + +config :uplink, Uplink.Metrics.Pipeline, + producer_module: Broadway.DummyProducer, + producer_options: [] + +config :uplink, Uplink.PipelineSupervisor, sync_interval: 100 + config :uplink, Uplink.Repo, username: System.get_env("UPLINK_DB_USERNAME") || System.get_env("POSTGRES_USERNAME"), @@ -38,4 +46,5 @@ config :uplink, :drivers, aws_s3: Uplink.Drivers.Bucket.AwsMock # config :plug, :validate_header_keys_during_test, false # Print only warnings and errors during test +# Disable logging in tests config :logger, level: :warn diff --git a/lib/uplink/application.ex b/lib/uplink/application.ex index f5807590..4e9da693 100644 --- a/lib/uplink/application.ex +++ b/lib/uplink/application.ex @@ -5,8 +5,17 @@ defmodule Uplink.Application do alias Uplink.Web + @pipeline_supervisor Uplink.PipelineSupervisor + def start(_type, _args) do %{key: key, cert: cert} = Web.Certificate.generate() + + pipeline_supervisor_config = + Application.get_env(:uplink, @pipeline_supervisor, []) + + sync_interval = + Keyword.get(pipeline_supervisor_config, :sync_interval, 5_000) + router_config = Application.get_env(:uplink, Uplink.Router, port: 4040) internal_router_config = @@ -22,6 +31,9 @@ defmodule Uplink.Application do {Cluster.Supervisor, [topologies, [name: Uplink.ClusterSupervisor]]}, {Task.Supervisor, name: Uplink.TaskSupervisor}, {Plug.Cowboy, plug: Uplink.Internal, scheme: :http, port: internal_port}, + {Pogo.DynamicSupervisor, + name: @pipeline_supervisor, scope: :uplink, sync_interval: sync_interval}, + {Uplink.Monitors, []}, { Plug.Cowboy, plug: Uplink.Router, diff --git a/lib/uplink/clients/instellar.ex b/lib/uplink/clients/instellar.ex index fa3a0613..e06409e7 100644 --- a/lib/uplink/clients/instellar.ex +++ b/lib/uplink/clients/instellar.ex @@ -9,6 +9,7 @@ defmodule Uplink.Clients.Instellar do Instance, Register, Component, + Monitor, Variable, Proxy, Self @@ -35,6 +36,10 @@ defmodule Uplink.Clients.Instellar do to: Proxy, as: :list + defdelegate list_monitors, + to: Monitor, + as: :list + defdelegate deployment_metadata(install), to: Installation, as: :metadata diff --git a/lib/uplink/clients/instellar/monitor.ex b/lib/uplink/clients/instellar/monitor.ex new file mode 100644 index 00000000..9703775c --- /dev/null +++ b/lib/uplink/clients/instellar/monitor.ex @@ -0,0 +1,21 @@ +defmodule Uplink.Clients.Instellar.Monitor do + alias Uplink.Clients.Instellar + + def list do + headers = Instellar.Self.headers() + + [Instellar.endpoint(), "self", "monitors"] + |> Path.join() + |> Req.get(headers: headers, max_retries: 1) + |> case do + {:ok, %{status: 200, body: %{"data" => monitors}}} -> + {:ok, monitors} + + {:ok, %{status: _, body: body}} -> + {:error, body} + + {:error, error} -> + {:error, error} + end + end +end diff --git a/lib/uplink/clients/lxd.ex b/lib/uplink/clients/lxd.ex index f15b5134..a919698c 100644 --- a/lib/uplink/clients/lxd.ex +++ b/lib/uplink/clients/lxd.ex @@ -4,6 +4,10 @@ defmodule Uplink.Clients.LXD do alias Uplink.Clients.LXD + defdelegate get_node(name), + to: __MODULE__.Node.Manager, + as: :show + defdelegate list_cluster_members(), to: __MODULE__.Cluster.Manager, as: :list_members @@ -16,11 +20,11 @@ defmodule Uplink.Clients.LXD do to: __MODULE__.Profile.Manager, as: :get - defdelegate list_instances(project), - to: __MODULE__.Instance.Manager, + defdelegate list_metrics(options \\ []), + to: __MODULE__.Metric.Manager, as: :list - defdelegate list_instances(), + defdelegate list_instances(options \\ []), to: __MODULE__.Instance.Manager, as: :list @@ -33,37 +37,36 @@ defmodule Uplink.Clients.LXD do as: :leases def uplink_leases do - Cache.get({:leases, "uplink"}) || - ( - config = Application.get_env(:uplink, Uplink.Data) || [] - uplink_project = Keyword.get(config, :project, "default") - client = LXD.client() - - uplink_project = - client - |> Lexdee.get_project(uplink_project) - |> case do - {:ok, %{body: %{"name" => name}}} -> name - {:error, %{"error_code" => 404}} -> "default" - end - - case LXD.network_leases(uplink_project) do - leases when is_list(leases) -> - uplink_addresses = - Enum.map(leases, fn lease -> - lease.address - end) - - Cache.put({:leases, "uplink"}, uplink_addresses, - ttl: :timer.hours(3) - ) - - uplink_addresses - - {:error, error} -> - {:error, error} - end - ) + Cache.get({:leases, "uplink"}) || fetch_leases() + end + + defp fetch_leases do + config = Application.get_env(:uplink, Uplink.Data) || [] + uplink_project = Keyword.get(config, :project, "default") + client = LXD.client() + + uplink_project = + client + |> Lexdee.get_project(uplink_project) + |> case do + {:ok, %{body: %{"name" => name}}} -> name + {:error, %{"error_code" => 404}} -> "default" + end + + case LXD.network_leases(uplink_project) do + leases when is_list(leases) -> + uplink_addresses = + Enum.map(leases, fn lease -> + lease.address + end) + + Cache.put({:leases, "uplink"}, uplink_addresses, ttl: :timer.hours(3)) + + uplink_addresses + + {:error, error} -> + {:error, error} + end end def client do diff --git a/lib/uplink/clients/lxd/instance.ex b/lib/uplink/clients/lxd/instance.ex index c601f7bf..9a9f8db1 100644 --- a/lib/uplink/clients/lxd/instance.ex +++ b/lib/uplink/clients/lxd/instance.ex @@ -9,10 +9,13 @@ defmodule Uplink.Clients.LXD.Instance do status architecture profiles + project description created_at last_used_at expanded_config + expanded_devices + state )a @required_attrs ~w( @@ -34,6 +37,7 @@ defmodule Uplink.Clients.LXD.Instance do field :status, :string field :architecture, :string field :profiles, {:array, :string} + field :project, :string field :description, :string field :created_at, :utc_datetime_usec @@ -41,6 +45,8 @@ defmodule Uplink.Clients.LXD.Instance do field :expanded_config, :map field :expanded_devices, :map + + field :state, :map end def changeset(schema, params) do diff --git a/lib/uplink/clients/lxd/instance/manager.ex b/lib/uplink/clients/lxd/instance/manager.ex index 47178ee7..1dacc0e9 100644 --- a/lib/uplink/clients/lxd/instance/manager.ex +++ b/lib/uplink/clients/lxd/instance/manager.ex @@ -6,27 +6,25 @@ defmodule Uplink.Clients.LXD.Instance.Manager do alias Clients.LXD alias LXD.Instance - def list do - LXD.client() - |> Lexdee.list_instances(query: [{:recursion, 1}, {"all-projects", true}]) - |> case do - {:ok, %{body: instances}} -> - instances = - instances - |> Enum.map(fn instance -> - Instance.parse(instance) - end) + def list(options \\ []) do + project = Keyword.get(options, :project, nil) + recursion = Keyword.get(options, :recursion, 1) - instances - - error -> - error + if recursion < 1 do + raise "recursion must be greater than 0 and less than 3 but got #{recursion}" end - end - def list(project) do + project_query = + if project do + [project: project] + else + [{"all-projects", true}] + end + + query = [{:recursion, recursion} | project_query] + LXD.client() - |> Lexdee.list_instances(query: [recursion: 1, project: project]) + |> Lexdee.list_instances(query: query) |> case do {:ok, %{body: instances}} -> instances = diff --git a/lib/uplink/clients/lxd/metric.ex b/lib/uplink/clients/lxd/metric.ex new file mode 100644 index 00000000..9bcc0538 --- /dev/null +++ b/lib/uplink/clients/lxd/metric.ex @@ -0,0 +1,23 @@ +defmodule Uplink.Clients.LXD.Metric do + use Ecto.Schema + import Ecto.Changeset + + @primary_key false + embedded_schema do + field :instance, :string + field :label, :string + field :value, :string + end + + def changeset(metric, params) do + metric + |> cast(params, [:instance, :type, :value]) + |> validate_required([:instance, :type, :value]) + end + + def parse(params) do + %__MODULE__{} + |> changeset(params) + |> apply_action!(:insert) + end +end diff --git a/lib/uplink/clients/lxd/metric/manager.ex b/lib/uplink/clients/lxd/metric/manager.ex new file mode 100644 index 00000000..c9211bc7 --- /dev/null +++ b/lib/uplink/clients/lxd/metric/manager.ex @@ -0,0 +1,29 @@ +defmodule Uplink.Clients.LXD.Metric.Manager do + alias Uplink.Clients.LXD + + def list(options \\ []) do + target = Keyword.get(options, :target, nil) + + LXD.client() + |> Lexdee.list_metrics(query: [target: target]) + |> case do + {:ok, %{body: raw_metrics}} -> + raw_metrics + |> String.split("\n") + |> Enum.map(fn line -> + PrometheusParser.parse(line) + end) + |> Enum.map(fn + {:ok, line} -> line + _ -> nil + end) + |> Enum.reject(&is_nil/1) + |> Enum.reject(fn line -> + line.line_type != "ENTRY" + end) + + error -> + error + end + end +end diff --git a/lib/uplink/clients/lxd/node.ex b/lib/uplink/clients/lxd/node.ex new file mode 100644 index 00000000..df12dcfc --- /dev/null +++ b/lib/uplink/clients/lxd/node.ex @@ -0,0 +1,29 @@ +defmodule Uplink.Clients.LXD.Node do + use Ecto.Schema + import Ecto.Changeset + + @primary_key false + embedded_schema do + field :name, :string + field :cpu_cores_count, :integer + field :total_memory, :integer + field :total_storage, :integer + end + + def changeset(node, params) do + node + |> cast(params, [:name, :cpu_cores_count, :total_memory, :total_storage]) + |> validate_required([ + :name, + :cpu_cores_count, + :total_memory, + :total_storage + ]) + end + + def parse(params) do + %__MODULE__{} + |> changeset(params) + |> apply_action!(:insert) + end +end diff --git a/lib/uplink/clients/lxd/node/manager.ex b/lib/uplink/clients/lxd/node/manager.ex new file mode 100644 index 00000000..5cc32fce --- /dev/null +++ b/lib/uplink/clients/lxd/node/manager.ex @@ -0,0 +1,38 @@ +defmodule Uplink.Clients.LXD.Node.Manager do + alias Uplink.Cache + alias Uplink.Clients.LXD + alias Uplink.Clients.LXD.Node + + def show(name) do + Cache.get({:node, name}) || fetch_node(name) + end + + defp fetch_node(name) do + LXD.client() + |> Lexdee.show_resources(name) + |> case do + {:ok, %{body: node}} -> + %{ + "cpu" => %{"total" => total_cores_count}, + "memory" => %{"total" => total_memory}, + "storage" => %{"disks" => disks} + } = node + + disk_sizes = + Enum.map(disks, fn disk -> + disk["size"] + end) + |> Enum.sum() + + Node.parse(%{ + name: name, + cpu_cores_count: total_cores_count, + total_memory: total_memory, + total_storage: disk_sizes + }) + + error -> + error + end + end +end diff --git a/lib/uplink/instances.ex b/lib/uplink/instances.ex index 44850e05..698b675b 100644 --- a/lib/uplink/instances.ex +++ b/lib/uplink/instances.ex @@ -1,6 +1,14 @@ defmodule Uplink.Instances do alias Uplink.Cache + def exists?(state, install_id, instance_name) do + Cache.get({:install, install_id, state}) + |> case do + nil -> false + member_instances -> Enum.member?(member_instances, instance_name) + end + end + def mark(state, install_id, instance_name) do Cache.transaction( [keys: [{:install, install_id, state}]], diff --git a/lib/uplink/internal.ex b/lib/uplink/internal.ex index a0cc0e12..035600e3 100644 --- a/lib/uplink/internal.ex +++ b/lib/uplink/internal.ex @@ -11,6 +11,8 @@ defmodule Uplink.Internal do Install } + plug Plug.Logger + plug :match plug :dispatch diff --git a/lib/uplink/metrics.ex b/lib/uplink/metrics.ex new file mode 100644 index 00000000..5fd6e2e2 --- /dev/null +++ b/lib/uplink/metrics.ex @@ -0,0 +1,41 @@ +defmodule Uplink.Metrics do + alias Uplink.Clients.Instellar + + defdelegate for_instances(), + to: __MODULE__.Instance, + as: :metrics + + def push!(%{"attributes" => attributes} = monitor, documents) do + headers = headers(monitor) + endpoint = Map.fetch!(attributes, "endpoint") + + request = + Req.new( + base_url: endpoint, + connect_options: [ + protocols: [:http1], + transport_opts: [ + verify: :verify_none + ] + ], + headers: headers + ) + + Req.post!(request, url: "/_bulk", body: documents) + end + + def index(type) do + %{"uplink" => %{"id" => uplink_id}} = Instellar.get_self() + + "metrics-system.#{type}-uplink-#{uplink_id}" + end + + defp headers(%{"attributes" => %{"uid" => uid, "token" => token}}) do + encoded_token = Base.encode64("#{uid}:#{token}") + + [ + {"authorization", "ApiKey #{encoded_token}"}, + {"content-type", "application/json"} + ] + end +end diff --git a/lib/uplink/metrics/document.ex b/lib/uplink/metrics/document.ex new file mode 100644 index 00000000..a4bb63bb --- /dev/null +++ b/lib/uplink/metrics/document.ex @@ -0,0 +1,22 @@ +defprotocol Uplink.Metrics.Document do + @spec uptime(struct) :: map() | nil + def uptime(data) + + @spec filesystem(struct) :: map() | nil + def filesystem(data) + + @spec cpu(struct, map | nil) :: map() | nil + def cpu(data, previous_cpu_metric) + + @spec memory(struct) :: map() | nil + def memory(data) + + @spec diskio(struct) :: map() | nil + def diskio(data) + + @spec network(struct, map | nil) :: map() | nil + def network(data, previous_network_metric) + + @spec load(struct, map | nil) :: map() | nil + def load(data, previous_load_metric) +end diff --git a/lib/uplink/metrics/instance.ex b/lib/uplink/metrics/instance.ex new file mode 100644 index 00000000..cb7cabbd --- /dev/null +++ b/lib/uplink/metrics/instance.ex @@ -0,0 +1,77 @@ +defmodule Uplink.Metrics.Instance do + alias Uplink.Clients.LXD + alias Uplink.Clients.Instellar + + defstruct [:name, :cycle, :timestamp, :data, :node, :metrics, :account] + + def metrics do + instances = LXD.list_instances(recursion: 2) + + cluster_members = LXD.list_cluster_members() + + metrics = + Enum.flat_map(cluster_members, fn cluster_member -> + LXD.list_metrics(target: cluster_member.server_name) + end) + + %{"organization" => %{"slug" => account_id}} = Instellar.get_self() + + metrics = + metrics + |> Enum.filter(fn line -> + line.label in [ + "lxd_disk_read_bytes_total", + "lxd_disk_reads_completed_total", + "lxd_disk_written_bytes_total", + "lxd_disk_writes_completed_total", + "lxd_memory_Cached_bytes", + "lxd_memory_MemTotal_bytes" + ] && line.value != "0" + end) + |> Enum.group_by(fn line -> + pairs_map = Enum.into(line.pairs, %{}) + + {Map.get(pairs_map, "name"), Map.get(pairs_map, "project")} + end) + + nodes = + instances + |> Enum.map(fn instance -> + instance.location + end) + |> Enum.uniq() + + nodes = + Enum.map(nodes, fn node -> + LXD.get_node(node) + end) + + instances + |> Enum.map(fn instance -> + node = + Enum.find(nodes, fn node -> + node.name == instance.location + end) + + lxd_metrics = + Enum.find(metrics, fn {{name, project}, _m} -> + name == instance.name and project == instance.project + end) + |> case do + {_, metrics} -> metrics + nil -> [] + end + + %__MODULE__{ + name: instance.name, + data: instance, + node: node, + account: %{ + id: account_id + }, + metrics: lxd_metrics, + timestamp: DateTime.utc_now() + } + end) + end +end diff --git a/lib/uplink/metrics/instance/document.ex b/lib/uplink/metrics/instance/document.ex new file mode 100644 index 00000000..94fd1a75 --- /dev/null +++ b/lib/uplink/metrics/instance/document.ex @@ -0,0 +1,443 @@ +defimpl Uplink.Metrics.Document, for: Uplink.Metrics.Instance do + alias Uplink.Metrics.Instance + + def memory(%Instance{data: data, metrics: metrics} = instance) do + %{"memory" => %{"usage" => memory_usage}} = data.state + + memory_usage = Decimal.new(memory_usage) + + total_memory = + Enum.find(metrics, fn metric -> + metric.label == "lxd_memory_MemTotal_bytes" + end) + + memory_total = + if total_memory do + Decimal.new(total_memory.value) + else + Decimal.new(0) + end + + pct = + memory_percentage(%{ + "usage" => memory_usage, + "total" => memory_total + }) + + cached_memory = + Enum.find(metrics, fn metric -> + metric.label == "lxd_memory_Cached_bytes" + end) + + actual_used_bytes = actual_memory_usage(cached_memory, memory_usage) + + actual_used_pct = + memory_percentage(%{"usage" => actual_used_bytes, "total" => memory_total}) + + memory_params = %{ + "system" => %{ + "memory" => %{ + "free" => Decimal.sub(memory_total, memory_usage), + "actual" => %{ + "used" => %{ + "bytes" => actual_used_bytes, + "pct" => actual_used_pct + } + }, + "total" => memory_total, + "used" => %{ + "bytes" => memory_usage, + "pct" => pct + } + } + } + } + + instance + |> build_base() + |> Map.merge(memory_params) + end + + def network(%Instance{}, nil), do: nil + + def network(%Instance{data: %{state: %{"network" => nil}}}, _), do: nil + + def network( + %Instance{ + timestamp: timestamp, + data: %{name: name, state: %{"network" => network}} = data + } = instance, + %{ + timestamp: previous_network_metric_timestamp, + data: previous_network_metric_data + } + ) + when is_map(network) do + config = data.expanded_config + os = Map.get(config, "image.os") + release = Map.get(config, "image.release") + serial = Map.get(config, "image.serial") + + time_diff_milliseconds = + DateTime.to_unix(timestamp, :millisecond) - + previous_network_metric_timestamp + + Enum.map(network, fn {interface, network_data} -> + {_, previous_network_data} = + Enum.find(previous_network_metric_data, fn {i, _} -> i == interface end) + + %{"counters" => current_counters} = network_data + + %{"counters" => previous_counters} = previous_network_data + + %{ + "bytes_received" => previous_bytes_received, + "bytes_sent" => previous_bytes_sent, + "packets_received" => previous_packets_received, + "packets_sent" => previous_packets_sent + } = previous_counters + + %{ + "bytes_received" => bytes_received, + "bytes_sent" => bytes_sent, + "errors_received" => errors_received, + "errors_sent" => errors_sent, + "packets_dropped_inbound" => packets_dropped_inbound, + "packets_dropped_outbound" => packets_dropped_outbound, + "packets_received" => packets_received, + "packets_sent" => packets_sent + } = current_counters + + diff_bytes_received = bytes_received - previous_bytes_received + diff_packets_received = packets_received - previous_packets_received + diff_bytes_sent = bytes_sent - previous_bytes_sent + diff_packets_sent = packets_sent - previous_packets_sent + + network_params = %{ + "metricset" => %{ + "period" => time_diff_milliseconds + }, + "host" => %{ + "name" => name, + "created" => data.created_at, + "accessed" => data.last_used_at, + "containerized" => data.type == "container", + "os" => %{ + "codename" => os, + "build" => "#{release}-#{serial}" + }, + "network" => %{ + "in" => %{ + "bytes" => diff_bytes_received, + "packets" => diff_packets_received + }, + "ingress" => %{ + "bytes" => diff_bytes_received, + "packets" => diff_packets_received + }, + "out" => %{ + "bytes" => diff_bytes_sent, + "packets" => diff_packets_sent + }, + "egress" => %{ + "bytes" => diff_bytes_sent, + "packets" => diff_packets_sent + } + } + }, + "system" => %{ + "network" => %{ + "in" => %{ + "bytes" => diff_bytes_received, + "dropped" => packets_dropped_inbound, + "errors" => errors_received, + "packets" => diff_packets_received + }, + "name" => interface, + "out" => %{ + "bytes" => diff_bytes_received, + "dropped" => packets_dropped_outbound, + "errors" => errors_sent, + "packets" => diff_packets_sent + } + } + } + } + + instance + |> build_base() + |> Map.merge(network_params) + end) + end + + def diskio(%Instance{metrics: metrics} = instance) do + disk_read_bytes = sum_metrics(metrics, "lxd_disk_read_bytes_total") + disk_read_count = sum_metrics(metrics, "lxd_disk_reads_completed_total") + disk_write_bytes = sum_metrics(metrics, "lxd_disk_written_bytes_total") + disk_write_count = sum_metrics(metrics, "lxd_disk_writes_completed_total") + + diskio_params = %{ + "system" => %{ + "diskio" => %{ + "read" => %{ + "bytes" => disk_read_bytes, + "count" => disk_read_count + }, + "write" => %{ + "bytes" => disk_write_bytes, + "count" => disk_write_count + } + } + } + } + + instance + |> build_base() + |> Map.merge(diskio_params) + end + + def uptime(%Instance{data: %{state: %{"status" => "Running"}}} = instance) do + now = DateTime.to_unix(instance.timestamp, :millisecond) + last_used_at = DateTime.to_unix(instance.data.last_used_at, :millisecond) + + duration_ms = now - last_used_at + + uptime_params = %{ + "system" => %{ + "uptime" => %{"duration" => %{"ms" => duration_ms}} + } + } + + instance + |> build_base() + |> Map.merge(uptime_params) + end + + def uptime(%Instance{}), do: nil + + def load(%Instance{}, %{cpu_60_metric: nil}), do: nil + + def load( + %Instance{data: data, timestamp: timestamp, node: node} = instance, + %{cpu_60_metric: cpu_60_metric} = params + ) do + cores = + Map.get(data.expanded_config, "limits.cpu") || "#{node.cpu_cores_count}" + + cores = String.to_integer(cores) + + %{data: %{"usage" => load_1_usage}} = cpu_60_metric + %{"usage" => current_usage} = data.state["cpu"] + + load_1_time_diff_seconds = + (DateTime.to_unix(timestamp, :millisecond) - cpu_60_metric.timestamp) / + 1000 + + load_1 = + cpu_percentage( + cores, + load_1_time_diff_seconds, + load_1_usage, + current_usage + ) + + load_1 = %{load: load_1 * cores, norm: load_1} + + load_5 = + if cpu_300_metric = Map.get(params, :cpu_300_metric) do + %{data: %{"usage" => load_5_usage}} = cpu_300_metric + + load_5_time_diff_seconds = + (DateTime.to_unix(timestamp, :millisecond) - cpu_300_metric.timestamp) / + 1000 + + load_5 = + cpu_percentage( + cores, + load_5_time_diff_seconds, + load_5_usage, + current_usage + ) + + %{load: load_5 * cores, norm: load_5} + else + %{} + end + + load_15 = + if cpu_900_metric = Map.get(params, :cpu_900_metric) do + %{data: %{"usage" => load_15_usage}} = cpu_900_metric + + load_15_time_diff_seconds = + (DateTime.to_unix(timestamp, :millisecond) - cpu_900_metric.timestamp) / + 1000 + + load_15 = + cpu_percentage( + cores, + load_15_time_diff_seconds, + load_15_usage, + current_usage + ) + + %{load: load_15 * cores, norm: load_15} + else + %{} + end + + load_params = %{ + "system" => %{ + "load" => %{ + "cores" => cores, + "1" => load_1.load, + "5" => Map.get(load_5, :load), + "15" => Map.get(load_15, :load), + "norm" => %{ + "1" => load_1.norm, + "5" => Map.get(load_5, :norm), + "15" => Map.get(load_15, :norm) + } + } + } + } + + instance + |> build_base() + |> Map.merge(load_params) + end + + def cpu(%Instance{}, nil), do: nil + + def cpu( + %Instance{node: node, timestamp: timestamp, data: data} = instance, + %{ + timestamp: previous_cpu_metric_timestamp, + data: previous_cpu_metric_data + } + ) do + cores = + Map.get(data.expanded_config, "limits.cpu") || "#{node.cpu_cores_count}" + + cores = String.to_integer(cores) + + time_diff_seconds = + (DateTime.to_unix(timestamp, :millisecond) - previous_cpu_metric_timestamp) / + 1000 + + %{"usage" => later_usage} = data.state["cpu"] + + %{"usage" => earlier_usage} = previous_cpu_metric_data + + pct = cpu_percentage(cores, time_diff_seconds, earlier_usage, later_usage) + + cpu_params = %{ + "system" => %{ + "cpu" => %{ + "cores" => cores, + "system" => %{ + "pct" => 0.0 + }, + "user" => %{ + "pct" => pct + } + } + } + } + + instance + |> build_base() + |> Map.merge(cpu_params) + end + + def filesystem(%Instance{data: data, node: node} = instance) do + %{"disk" => %{"root" => %{"usage" => usage_bytes, "total" => total_bytes}}} = + data.state + + total_bytes = if total_bytes > 0, do: total_bytes, else: node.total_storage + + pct = usage_bytes / total_bytes + + filesystem_params = %{ + "system" => %{ + "filesystem" => %{ + "device_name" => "root", + "mount_point" => "/", + "total" => total_bytes, + "used" => %{ + "bytes" => usage_bytes, + "pct" => pct + } + } + } + } + + instance + |> build_base() + |> Map.merge(filesystem_params) + end + + defp cpu_percentage(cores, time_diff_seconds, earlier_usage, later_usage) do + available_compute = cores * time_diff_seconds * :math.pow(10, 9) + + (later_usage - earlier_usage) / available_compute + end + + defp memory_percentage(%{"total" => total, "usage" => usage_bytes}) do + if Decimal.gt?(usage_bytes, 0) and Decimal.gt?(total, 0), + do: Decimal.div(usage_bytes, total), + else: Decimal.new(0) + end + + defp actual_memory_usage( + %PrometheusParser.Line{value: cached_memory_value}, + memory_usage + ) do + Decimal.sub(memory_usage, Decimal.new(cached_memory_value)) + end + + defp actual_memory_usage(nil, _), do: 0 + + defp sum_metrics(metrics, key) when is_list(metrics) do + metrics + |> Enum.filter(&(&1.label == key)) + |> Enum.map(& &1.value) + |> Enum.map(&Decimal.new/1) + |> Enum.map(&Decimal.to_integer/1) + |> Enum.sum() + end + + defp sum_metrics(nil, _key), do: 0 + + defp build_base(%Instance{ + account: account, + name: name, + timestamp: timestamp, + data: data, + node: node + }) do + config = data.expanded_config + os = Map.get(config, "image.os") + release = Map.get(config, "image.release") + serial = Map.get(config, "image.serial") + + %{ + "@timestamp" => timestamp, + "host" => %{ + "name" => name, + "created" => data.created_at, + "accessed" => data.last_used_at, + "containerized" => data.type == "container", + "os" => %{ + "codename" => os, + "build" => "#{release}-#{serial}" + } + }, + "container.id" => name, + "agent.id" => "uplink", + "cloud" => %{ + "account.id" => account.id, + "instance.id" => node.name + } + } + end +end diff --git a/lib/uplink/metrics/pipeline.ex b/lib/uplink/metrics/pipeline.ex new file mode 100644 index 00000000..dd825340 --- /dev/null +++ b/lib/uplink/metrics/pipeline.ex @@ -0,0 +1,144 @@ +defmodule Uplink.Metrics.Pipeline do + use Broadway + + alias Broadway.Message + + alias Uplink.Pipelines + + alias Uplink.Metrics + alias Uplink.Metrics.Document + + require Logger + + def start_link(_opts \\ []) do + configuration = Application.get_env(:uplink, __MODULE__) || [] + + producer_module = + Keyword.get(configuration, :producer_module, Uplink.Metrics.Producer) + + producer_options = + Keyword.get(configuration, :producer_options, + poll_interval: :timer.seconds(15) + ) + + Broadway.start_link(__MODULE__, + name: __MODULE__, + context: :metrics, + producer: [ + module: {producer_module, producer_options}, + concurrency: 1 + ], + processors: [ + default: [ + concurrency: 3, + max_demand: 10 + ] + ], + batchers: [ + default: [ + concurrency: 3, + batch_size: 10 + ] + ] + ) + |> case do + {:ok, pid} -> + Logger.info("[Uplink.Metrics.Pipeline] Started...") + + {:ok, pid} + + {:error, {:already_started, pid}} -> + Process.link(pid) + {:ok, pid} + end + end + + def handle_message(_, %Message{data: data} = message, _) do + %{ + metric: instance_metric, + previous_cpu_metric: previous_cpu_metric, + previous_network_metric: previous_network_metric, + cpu_60_metric: cpu_60_metric, + cpu_300_metric: cpu_300_metric, + cpu_900_metric: cpu_900_metric + } = data + + memory = Document.memory(instance_metric) + cpu = Document.cpu(instance_metric, previous_cpu_metric) + uptime = Document.uptime(instance_metric) + filesystem = Document.filesystem(instance_metric) + diskio = Document.diskio(instance_metric) + network = Document.network(instance_metric, previous_network_metric) + + load = + Document.load(instance_metric, %{ + cpu_60_metric: cpu_60_metric, + cpu_300_metric: cpu_300_metric, + cpu_900_metric: cpu_900_metric + }) + + data = %{ + memory: memory, + cpu: cpu, + uptime: uptime, + filesystem: filesystem, + diskio: diskio, + network: network, + load: load + } + + Message.put_data(message, data) + end + + def handle_batch(_, messages, _batch_info, context) do + documents = to_ndjson(messages) + monitors = Pipelines.get_monitors(context) + + Logger.info("[Metrics.Pipeline] pushing #{DateTime.utc_now()}") + + monitors + |> Enum.map(fn monitor -> + Metrics.push!(monitor, documents) + end) + + messages + end + + defp to_ndjson(messages) do + documents = + Enum.flat_map(messages, &to_entry/1) + |> Enum.map(&Jason.encode!/1) + |> Enum.join("\n") + + documents <> "\n" + end + + defp to_entry(%Message{} = message) do + dataset = + message.data + |> Enum.to_list() + |> Enum.reject(fn {_key, value} -> + is_nil(value) + end) + + dataset + |> Enum.flat_map(&build_request/1) + end + + defp build_request({type, data}) when is_list(data) do + index = Metrics.index(type) + + Enum.reduce(data, [], fn entry, acc -> + metadata = %{"create" => %{"_index" => index}} + + [metadata, entry | acc] + end) + end + + defp build_request({type, data}) when is_map(data) do + index = Metrics.index(type) + metadata = %{"create" => %{"_index" => index}} + + [metadata, data] + end +end diff --git a/lib/uplink/metrics/producer.ex b/lib/uplink/metrics/producer.ex new file mode 100644 index 00000000..cd8c6efd --- /dev/null +++ b/lib/uplink/metrics/producer.ex @@ -0,0 +1,219 @@ +defmodule Uplink.Metrics.Producer do + use GenStage + @behaviour Broadway.Producer + + alias Uplink.Cache + alias Uplink.Metrics + + @last_fetched_timestamp {:monitors, :metrics, :last_fetched_timestamp} + + @doc false + def start_link(opts) do + GenStage.start_link(__MODULE__, opts) + end + + @impl true + def init(opts) do + poll_interval = Keyword.get(opts, :poll_interval, 15_000) + next_schedule = div(poll_interval, 3) + + state = %{ + demand: 0, + poll_interval: poll_interval, + cycle: 0, + previous_cpu_metrics: [], + previous_network_metrics: [], + cpu_60_metrics: [], + cpu_300_metrics: [], + cpu_900_metrics: [] + } + + Process.send_after(self(), :poll, next_schedule) + + {:producer, state} + end + + @impl true + def handle_demand(demand, state) when demand <= 0 do + {:noreply, [], state} + end + + def handle_demand(demand, state) do + if ready_to_fetch?(state) do + {messages, state} = load_metrics(demand, state) + {:noreply, messages, state} + else + {:noreply, [], state} + end + end + + @impl true + def handle_info(:poll, state) do + next_schedule = div(state.poll_interval + 300, 3) + Process.send_after(self(), :poll, next_schedule) + + if ready_to_fetch?(state) do + {messages, state} = load_metrics(0, state) + {:noreply, messages, state} + else + {:noreply, [], state} + end + end + + defp load_metrics(demand, state) do + demand = demand + state.demand + + metrics = Metrics.for_instances() + + previous_cpu_metrics = state.previous_cpu_metrics + previous_network_metrics = state.previous_network_metrics + cpu_60_metrics = state.cpu_60_metrics + cpu_300_metrics = state.cpu_300_metrics + cpu_900_metrics = state.cpu_900_metrics + + messages = + transform_metrics(metrics, %{ + previous_cpu_metrics: previous_cpu_metrics, + previous_network_metrics: previous_network_metrics, + cpu_60_metrics: cpu_60_metrics, + cpu_300_metrics: cpu_300_metrics, + cpu_900_metrics: cpu_900_metrics, + cycle: state.cycle + }) + + current_demand = demand - length(messages) + + fetch_timestamp = DateTime.to_unix(DateTime.utc_now(), :millisecond) + + Cache.put(@last_fetched_timestamp, fetch_timestamp) + + previous_cpu_metrics = + Enum.map(metrics, fn instance -> + %{ + name: instance.data.name, + project: instance.data.project, + timestamp: fetch_timestamp, + data: Map.get(instance.data.state, "cpu") + } + end) + + previous_network_metrics = + Enum.map(metrics, fn instance -> + %{ + name: instance.data.name, + project: instance.data.project, + timestamp: fetch_timestamp, + data: Map.get(instance.data.state, "network") + } + end) + + state = + state + |> Map.put(:demand, current_demand) + |> Map.put(:last_fetched_timestamp, fetch_timestamp) + |> Map.put(:previous_cpu_metrics, previous_cpu_metrics) + |> Map.put(:previous_network_metrics, previous_network_metrics) + |> Map.put(:cycle, state.cycle + 1) + + state = + if rem(state.cycle, 4) == 0 do + Map.put(state, :cpu_60_metrics, previous_cpu_metrics) + else + state + end + + state = + if rem(state.cycle, 20) == 0 do + Map.put(state, :cpu_300_metrics, previous_cpu_metrics) + else + state + end + + state = + if rem(state.cycle, 60) == 0 do + Map.put(state, :cpu_900_metrics, previous_cpu_metrics) + else + state + end + + {messages, state} + end + + defp transform_metrics(metrics, %{ + previous_cpu_metrics: previous_cpu_metrics, + previous_network_metrics: previous_network_metrics, + cpu_60_metrics: cpu_60_metrics, + cpu_300_metrics: cpu_300_metrics, + cpu_900_metrics: cpu_900_metrics, + cycle: cycle + }) do + metrics + |> Enum.map(fn metric -> + previous_cpu_metric = + Enum.find( + previous_cpu_metrics, + &find_matching_previous(&1, metric.data.name, metric.data.project) + ) + + previous_network_metric = + Enum.find( + previous_network_metrics, + &find_matching_previous(&1, metric.data.name, metric.data.project) + ) + + cpu_60_metric = + Enum.find( + cpu_60_metrics, + &find_matching_previous(&1, metric.data.name, metric.data.project) + ) + + cpu_300_metric = + Enum.find( + cpu_300_metrics, + &find_matching_previous(&1, metric.data.name, metric.data.project) + ) + + cpu_900_metric = + Enum.find( + cpu_900_metrics, + &find_matching_previous(&1, metric.data.name, metric.data.project) + ) + + %{ + metric: metric, + cycle: cycle, + previous_network_metric: previous_network_metric, + previous_cpu_metric: previous_cpu_metric, + cpu_60_metric: cpu_60_metric, + cpu_300_metric: cpu_300_metric, + cpu_900_metric: cpu_900_metric + } + end) + |> Enum.map(&transform_message/1) + end + + defp transform_message(message) do + %Broadway.Message{ + data: message, + acknowledger: Broadway.NoopAcknowledger.init() + } + end + + defp ready_to_fetch?(state) do + Cache.transaction( + [keys: [@last_fetched_timestamp]], + fn -> + now = DateTime.to_unix(DateTime.utc_now(), :millisecond) + + last_fetched_timestamp = Cache.get(@last_fetched_timestamp) + + is_nil(last_fetched_timestamp) || + now - last_fetched_timestamp > state.poll_interval + end + ) + end + + defp find_matching_previous(metric, name, project) do + metric.name == name && metric.project == project + end +end diff --git a/lib/uplink/monitors.ex b/lib/uplink/monitors.ex new file mode 100644 index 00000000..d3088f4f --- /dev/null +++ b/lib/uplink/monitors.ex @@ -0,0 +1,74 @@ +defmodule Uplink.Monitors do + use Task + + alias Uplink.Cache + alias Uplink.Pipelines + alias Uplink.Clients.Instellar + + require Logger + + @pipeline_modules %{ + metrics: Uplink.Metrics.Pipeline + } + + def start_link(options) do + enabled? = config(:enabled, true) + + if enabled? do + Task.start_link(__MODULE__, :run, [options]) + else + :ignore + end + end + + def run(_options \\ []) do + Cache.put_new({:monitors, :metrics}, []) + + Instellar.list_monitors() + |> case do + {:ok, monitors} -> + Cache.transaction([keys: [{:monitors, :metrics}]], fn -> + start_pipeline(monitors, :metrics) + end) + + error -> + {:error, error} + end + end + + defp start_pipeline(monitors, context) do + Logger.info("[Uplink.Monitors] Starting pipeline...") + + started_metrics_monitor_ids = + Pipelines.get_monitors(context) + |> Enum.map(fn monitor -> + monitor["attributes"]["id"] + end) + + not_started_monitors = + Enum.filter(monitors, fn monitor -> + monitor["attributes"]["id"] not in started_metrics_monitor_ids + end) + + grouped_monitors = + Enum.group_by(not_started_monitors, fn monitor -> + monitor["attributes"]["type"] + end) + + context_monitors = Map.get(grouped_monitors, "#{context}") || [] + + if Enum.count(context_monitors) > 0 do + Pipelines.append_monitors(context, context_monitors) + end + + module = Map.fetch!(@pipeline_modules, context) + + Pipelines.start(module) + end + + def config(key, default) do + configuration = Application.get_env(:uplink, __MODULE__) || [] + + Keyword.get(configuration, key, default) + end +end diff --git a/lib/uplink/monitors/router.ex b/lib/uplink/monitors/router.ex new file mode 100644 index 00000000..95811ca8 --- /dev/null +++ b/lib/uplink/monitors/router.ex @@ -0,0 +1,23 @@ +defmodule Uplink.Monitors.Router do + use Plug.Router + use Uplink.Web + + alias Uplink.Secret + + plug :match + + plug Plug.Parsers, + parsers: [:urlencoded, :json], + body_reader: {Uplink.Web.CacheBodyReader, :read_body, []}, + json_decoder: Jason + + plug Secret.VerificationPlug + + plug :dispatch + + post "/refresh" do + Uplink.Monitors.run() + + json(conn, :ok, %{message: "monitors refreshed."}) + end +end diff --git a/lib/uplink/packages/deployment/router.ex b/lib/uplink/packages/deployment/router.ex index 984ce745..aed0e702 100644 --- a/lib/uplink/packages/deployment/router.ex +++ b/lib/uplink/packages/deployment/router.ex @@ -146,12 +146,6 @@ defmodule Uplink.Packages.Deployment.Router do {:error, error} -> json(conn, :unprocessable_entity, %{error: %{message: error}}) - {:error, error, _} -> - json(conn, :unprocessable_entity, %{error: %{message: error}}) - - {:error, error, _, _} -> - json(conn, :unprocessable_entity, %{error: %{message: error}}) - {:actor, :not_found} -> json(conn, :not_found, %{error: %{message: "actor not found"}}) end diff --git a/lib/uplink/packages/distribution.ex b/lib/uplink/packages/distribution.ex index 061bb4a6..2798db71 100644 --- a/lib/uplink/packages/distribution.ex +++ b/lib/uplink/packages/distribution.ex @@ -1,6 +1,5 @@ defmodule Uplink.Packages.Distribution do use Plug.Builder - plug Plug.Logger alias Uplink.{ Internal, @@ -16,7 +15,7 @@ defmodule Uplink.Packages.Distribution do plug :validate - plug :serve_or_proxy + plug :serve_or_redirect defp validate(conn, _opts) do case Firewall.allowed?(conn) do @@ -33,7 +32,7 @@ defmodule Uplink.Packages.Distribution do end end - defp serve_or_proxy(conn, _opts) do + defp serve_or_redirect(conn, _opts) do %{"glob" => params} = conn.params [channel, org, package] = Enum.take(params, 3) @@ -43,7 +42,7 @@ defmodule Uplink.Packages.Distribution do |> Packages.get_latest_deployment(channel) |> case do %Deployment{archive: archive} -> - serve(conn, archive) + respond(conn, archive) nil -> conn @@ -52,7 +51,7 @@ defmodule Uplink.Packages.Distribution do end end - defp serve(conn, %Archive{node: archive_node}) do + defp respond(conn, %Archive{node: archive_node}) do if Atom.to_string(Node.self()) == archive_node do static_options = Plug.Static.init( @@ -67,15 +66,11 @@ defmodule Uplink.Packages.Distribution do internal_router_config = Application.get_env(:uplink, Uplink.Internal) port = Keyword.get(internal_router_config, :port, 4080) - upstream = + location = ["#{conn.scheme}://", "#{node_host_name}:#{port}", conn.request_path] |> Path.join() - reverse_proxy_options = ReverseProxyPlug.init(upstream: upstream) - - conn - |> Map.put(:path_info, []) - |> ReverseProxyPlug.call(reverse_proxy_options) + Uplink.Web.redirect(conn, location) end end end diff --git a/lib/uplink/packages/install/execute.ex b/lib/uplink/packages/install/execute.ex index 08ce7987..7a28bea3 100644 --- a/lib/uplink/packages/install/execute.ex +++ b/lib/uplink/packages/install/execute.ex @@ -60,7 +60,7 @@ defmodule Uplink.Packages.Install.Execute do project = Packages.get_project_name(client, metadata) existing_instances = - LXD.list_instances(project) + LXD.list_instances(project: project) |> Enum.filter(&only_uplink_instance/1) Cache.put_new({:install, state.install.id, "completed"}, [], diff --git a/lib/uplink/packages/install/triggers.ex b/lib/uplink/packages/install/triggers.ex index ecd23b64..c7aca39a 100644 --- a/lib/uplink/packages/install/triggers.ex +++ b/lib/uplink/packages/install/triggers.ex @@ -1,6 +1,8 @@ defmodule Uplink.Packages.Install.Triggers do use Eventful.Trigger + alias Uplink.Cache + alias Uplink.{ Packages, Clients @@ -36,6 +38,8 @@ defmodule Uplink.Packages.Install.Triggers do Install |> trigger([currently: "completed"], fn event, install -> + Cache.put({:install, install.id, "executing"}, []) + Clients.Caddy.schedule_config_reload(install, actor_id: event.actor_id) end) end diff --git a/lib/uplink/packages/install/validate.ex b/lib/uplink/packages/install/validate.ex index 0c15657e..0267dc72 100644 --- a/lib/uplink/packages/install/validate.ex +++ b/lib/uplink/packages/install/validate.ex @@ -1,5 +1,5 @@ defmodule Uplink.Packages.Install.Validate do - use Oban.Worker, queue: :install, max_attempts: 1 + use Oban.Worker, queue: :install, max_attempts: 3 alias Uplink.{ Clients, @@ -23,6 +23,12 @@ defmodule Uplink.Packages.Install.Validate do @state ~s(validating) + @impl Worker + def backoff(%Job{attempt: attempt}) do + trunc(:math.pow(attempt, 4) + 15 + :rand.uniform(30) * attempt) + end + + @impl Worker def perform(%Oban.Job{ args: %{"install_id" => install_id, "actor_id" => actor_id} }) do diff --git a/lib/uplink/packages/instance/bootstrap.ex b/lib/uplink/packages/instance/bootstrap.ex index 6a006226..a0635ae8 100644 --- a/lib/uplink/packages/instance/bootstrap.ex +++ b/lib/uplink/packages/instance/bootstrap.ex @@ -19,6 +19,13 @@ defmodule Uplink.Packages.Instance.Bootstrap do alias Uplink.Clients.LXD alias Uplink.Clients.LXD.Cluster.Member + @instance_bootstrappable_states [ + "executing", + "completed", + "refreshing", + "degraded" + ] + @transition_parameters %{ "from" => "uplink", "trigger" => false @@ -68,9 +75,16 @@ defmodule Uplink.Packages.Instance.Bootstrap do end defp handle_placement( - %{install: %{deployment: %{current_state: "live"}} = install} = state, + %{ + install: + %{ + deployment: %{current_state: "live"}, + current_state: install_current_state + } = install + } = state, %{"slug" => instance_name} - ) do + ) + when install_current_state in @instance_bootstrappable_states do placement_name = Placement.name(instance_name) Instances.mark("executing", install.id, instance_name) @@ -81,7 +95,7 @@ defmodule Uplink.Packages.Instance.Bootstrap do end defp handle_placement( - %{install: %{deployment: %{current_state: _}}}, + %{install: %{deployment: %{current_state: _}, current_state: _}}, _instance_params ) do {:snooze, 10} diff --git a/lib/uplink/packages/instance/install.ex b/lib/uplink/packages/instance/install.ex index 37909553..513264dc 100644 --- a/lib/uplink/packages/instance/install.ex +++ b/lib/uplink/packages/instance/install.ex @@ -2,7 +2,6 @@ defmodule Uplink.Packages.Instance.Install do use Oban.Worker, queue: :instance, max_attempts: 3 alias Uplink.Repo - alias Uplink.Cache alias Uplink.Instances alias Uplink.Clients.LXD diff --git a/lib/uplink/pipelines.ex b/lib/uplink/pipelines.ex new file mode 100644 index 00000000..0161f680 --- /dev/null +++ b/lib/uplink/pipelines.ex @@ -0,0 +1,26 @@ +defmodule Uplink.Pipelines do + alias Uplink.Cache + + def get_monitors(context) do + Cache.get({:monitors, context}) || [] + end + + def append_monitors(context, monitors) do + Cache.get_and_update({:monitors, context}, fn existing_monitors -> + {existing_monitors, existing_monitors ++ monitors} + end) + end + + def start(module) do + spec = %{ + id: module, + start: {module, :start_link, []} + } + + Pogo.DynamicSupervisor.start_child(Uplink.PipelineSupervisor, spec) + end + + def list do + Pogo.DynamicSupervisor.which_children(Uplink.PipelineSupervisor, :global) + end +end diff --git a/lib/uplink/router.ex b/lib/uplink/router.ex index fa6f16a2..208dc4d1 100644 --- a/lib/uplink/router.ex +++ b/lib/uplink/router.ex @@ -5,12 +5,15 @@ defmodule Uplink.Router do alias Uplink.Components alias Uplink.Installations alias Uplink.Cache + alias Uplink.Monitors alias Uplink.Packages.{ Instance, Deployment } + plug Plug.Logger + plug :match plug :dispatch @@ -23,6 +26,7 @@ defmodule Uplink.Router do forward "/instances", to: Instance.Router forward "/components", to: Components.Router forward "/cache", to: Cache.Router + forward "/monitors", to: Monitors.Router match _ do send_resp(conn, 404, "not found") diff --git a/lib/uplink/web.ex b/lib/uplink/web.ex index a0e49c22..cb48eff6 100644 --- a/lib/uplink/web.ex +++ b/lib/uplink/web.ex @@ -7,6 +7,18 @@ defmodule Uplink.Web do |> send_resp(status, Jason.encode!(%{data: response})) end + def redirect(conn, location) do + html = Plug.HTML.html_escape(location) + + body = + "You are being redirected." + + conn + |> put_resp_header("location", location) + |> put_resp_content_type("text/html") + |> send_resp(conn.status || 302, body) + end + def translate_errors(%Ecto.Changeset{} = changeset) do Ecto.Changeset.traverse_errors(changeset, fn {msg, opts} -> Regex.replace(~r"%{(\w+)}", msg, fn _, key -> diff --git a/mix.exs b/mix.exs index f6780f53..f769a63f 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Uplink.MixProject do def project do [ app: :uplink, - version: "0.16.0", + version: "0.17.0", elixir: "~> 1.13", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, @@ -57,6 +57,7 @@ defmodule Uplink.MixProject do # Clustering {:libcluster, "~> 3.0"}, + {:pogo, "~> 0.3.0"}, # One time password {:pot, "~> 1.0.2"}, @@ -66,10 +67,11 @@ defmodule Uplink.MixProject do # Infrastructure {:formation, "~> 0.15"}, - {:lexdee, "~> 2.4"}, + {:lexdee, "~> 2.4.4"}, {:plug_cowboy, "~> 2.0"}, - {:reverse_proxy_plug, "~> 2.1"}, {:mint_web_socket, "~> 1.0.2"}, + {:broadway, "~> 1.0"}, + {:prometheus_parser, "~> 0.1.10"}, # Test {:bypass, "~> 2.1", only: :test}, diff --git a/mix.lock b/mix.lock index 9a3c25ac..40b53ba5 100644 --- a/mix.lock +++ b/mix.lock @@ -1,8 +1,9 @@ %{ "aws": {:hex, :aws, "0.13.3", "d2e932c2588e2b15fca04f345dfced6e07b81d6534e65783de23190c57891df7", [:mix], [{:aws_signature, "~> 0.3", [hex: :aws_signature, repo: "hexpm", optional: false]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "4ad46a204370d3ef8eab102776a00070ec6fcc4414db7f6b39f3fc0d99564e18"}, "aws_signature": {:hex, :aws_signature, "0.3.2", "adf33bc4af00b2089b7708bf20e3246f09c639a905a619b3689f0a0a22c3ef8f", [:rebar3], [], "hexpm", "b0daf61feb4250a8ab0adea60db3e336af732ff71dd3fb22e45ae3dcbd071e44"}, + "broadway": {:hex, :broadway, "1.1.0", "8ed3aea01fd6f5640b3e1515b90eca51c4fc1fac15fb954cdcf75dc054ae719c", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.7 or ~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "25e315ef1afe823129485d981dcc6d9b221cea30e625fd5439e9b05f44fb60e4"}, "bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"}, - "castore": {:hex, :castore, "1.0.8", "dedcf20ea746694647f883590b82d9e96014057aff1d44d03ec90f36a5c0dc6e", [:mix], [], "hexpm", "0b2b66d2ee742cb1d9cb8c8be3b43c3a70ee8651f37b75a8b982e036752983f1"}, + "castore": {:hex, :castore, "1.0.9", "5cc77474afadf02c7c017823f460a17daa7908e991b0cc917febc90e466a375c", [:mix], [], "hexpm", "5ea956504f1ba6f2b4eb707061d8e17870de2bee95fb59d512872c2ef06925e7"}, "certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"}, "cowboy": {:hex, :cowboy, "2.10.0", "ff9ffeff91dae4ae270dd975642997afe2a1179d94b1887863e43f681a203e26", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "3afdccb7183cc6f143cb14d3cf51fa00e53db9ec80cdcd525482f5e99bc41d6b"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, @@ -17,13 +18,15 @@ "eventful": {:hex, :eventful, "0.2.3", "dc795c95b2d00d90b3a5d58c66bd0188b39be08b9da61a743ac40186fd313034", [:mix], [{:ecto, "~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:ecto_sql, "~> 3.0", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "a1607d9d98ebc45e47573a81c11adfb571259a1db900748c6bee2dc25e5e171c"}, "finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"}, "formation": {:hex, :formation, "0.15.2", "0ff349edbc11b7b9f4bb385f12d37f278907b796a3c19ff8cd9d105c17d1eb3f", [:mix], [{:aws, "~> 0.13.0", [hex: :aws, repo: "hexpm", optional: false]}, {:ecto, "~> 3.10", [hex: :ecto, repo: "hexpm", optional: false]}, {:finch, "~> 0.18.0", [hex: :finch, repo: "hexpm", optional: false]}, {:lexdee, "~> 2.3", [hex: :lexdee, repo: "hexpm", optional: false]}, {:mustache, "~> 0.5.0", [hex: :mustache, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.17.1", [hex: :postgrex, repo: "hexpm", optional: false]}, {:tesla, "~> 1.7.0", [hex: :tesla, repo: "hexpm", optional: false]}], "hexpm", "c233e3ffc307665fab34ddb75aa2201c1daa7d5d4519fa12b6f9a5ccc5be44a9"}, + "gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"}, "hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"}, "hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"}, "httpoison": {:hex, :httpoison, "2.2.1", "87b7ed6d95db0389f7df02779644171d7319d319178f6680438167d7b69b1f3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "51364e6d2f429d80e14fe4b5f8e39719cacd03eb3f9a9286e61e216feac2d2df"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, - "jason": {:hex, :jason, "1.4.3", "d3f984eeb96fe53b85d20e0b049f03e57d075b5acda3ac8d465c969a2536c17b", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "9a90e868927f7c777689baa16d86f4d0e086d968db5c05d917ccff6d443e58a3"}, - "lexdee": {:hex, :lexdee, "2.4.2", "05a74d4b90116167b75a08bea2ff9ae563d89f382dee9432fdae9524746240ce", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mint, "~> 1.5", [hex: :mint, repo: "hexpm", optional: false]}, {:mint_web_socket, "~> 1.0.2", [hex: :mint_web_socket, repo: "hexpm", optional: false]}, {:tesla, "~> 1.7.0", [hex: :tesla, repo: "hexpm", optional: false]}, {:x509, "~> 0.8.1", [hex: :x509, repo: "hexpm", optional: false]}], "hexpm", "5a7dd7d1bfbc9fe14bf60c31aa214dd855e924f524c085f45e680031e4f7859c"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, + "lexdee": {:hex, :lexdee, "2.4.4", "1763f4e0ab7a97cadc25622faaf80ae63186f12bf5e36d1f92bd8d820e31fa16", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mint, "~> 1.5", [hex: :mint, repo: "hexpm", optional: false]}, {:mint_web_socket, "~> 1.0.2", [hex: :mint_web_socket, repo: "hexpm", optional: false]}, {:tesla, "~> 1.7.0", [hex: :tesla, repo: "hexpm", optional: false]}, {:x509, "~> 0.8.1", [hex: :x509, repo: "hexpm", optional: false]}], "hexpm", "d460902b4f0a686485850cc4fd1eb7c9325a90bbf43106aac06a3b567bdc5e82"}, "libcluster": {:hex, :libcluster, "3.3.3", "a4f17721a19004cfc4467268e17cff8b1f951befe428975dd4f6f7b84d927fe0", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "7c0a2275a0bb83c07acd17dab3c3bfb4897b145106750eeccc62d302e3bdfee5"}, + "libring": {:hex, :libring, "1.6.0", "d5dca4bcb1765f862ab59f175b403e356dec493f565670e0bacc4b35e109ce0d", [:mix], [], "hexpm", "5e91ece396af4bce99953d49ee0b02f698cd38326d93cd068361038167484319"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"}, "mimerl": {:hex, :mimerl, "1.3.0", "d0cd9fc04b9061f82490f6581e0128379830e78535e017f7780f37fea7545726", [:rebar3], [], "hexpm", "a1e15a50d1887217de95f0b9b0793e32853f7c258a5cd227650889b38839fe9d"}, @@ -33,21 +36,23 @@ "mustache": {:hex, :mustache, "0.5.1", "1bfee8a68a8e72d47616541a93a632ae1684c1abc17c9eb5f7bfecb78d7496e5", [:mix], [], "hexpm", "524c9bbb6080a52d7b6806436b4e269e0224c785a228faf3293ef30a75016bfa"}, "nebulex": {:hex, :nebulex, "2.5.1", "8ffbde30643e76d6cec712281ca68ab05f73170de9e758a39bc7e4e6987f608f", [:mix], [{:decorator, "~> 1.4", [hex: :decorator, repo: "hexpm", optional: true]}, {:shards, "~> 1.1", [hex: :shards, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "8d0d3800d98c68ee19b229b7fe35fac0192ab5963a573612cf74a388e083bccf"}, "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "oban": {:hex, :oban, "2.15.2", "8f934a49db39163633965139c8846d8e24c2beb4180f34a005c2c7c3f69a6aa2", [:mix], [{:ecto_sql, "~> 3.6", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:ecto_sqlite3, "~> 0.9", [hex: :ecto_sqlite3, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.16", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "0f4a579ea48fc7489e0d84facf8b01566e142bdc6542d7dabce32c10e664f1e9"}, "parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"}, - "plug": {:hex, :plug, "1.15.2", "94cf1fa375526f30ff8770837cb804798e0045fd97185f0bb9e5fcd858c792a3", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "02731fa0c2dcb03d8d21a1d941bdbbe99c2946c0db098eee31008e04c6283615"}, + "plug": {:hex, :plug, "1.16.1", "40c74619c12f82736d2214557dedec2e9762029b2438d6d175c5074c933edc9d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a13ff6b9006b03d7e33874945b2755253841b238c34071ed85b0e86057f8cddc"}, "plug_cowboy": {:hex, :plug_cowboy, "2.6.1", "9a3bbfceeb65eff5f39dab529e5cd79137ac36e913c02067dba3963a26efe9b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "de36e1a21f451a18b790f37765db198075c25875c64834bcc82d90b309eb6613"}, - "plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"}, + "plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"}, + "pogo": {:hex, :pogo, "0.3.0", "4983ae7c52735af088fb3733c17482ca801975bb1f15c32c2c6f08086b1ac47e", [:mix], [{:libring, "~> 1.6.0", [hex: :libring, repo: "hexpm", optional: false]}], "hexpm", "a810511e242538e59369efca5aa8bc315dc80ae641b667252ea7930a6dc0ce1e"}, "postgrex": {:hex, :postgrex, "0.17.5", "0483d054938a8dc069b21bdd636bf56c487404c241ce6c319c1f43588246b281", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "50b8b11afbb2c4095a3ba675b4f055c416d0f3d7de6633a595fc131a828a67eb"}, "pot": {:hex, :pot, "1.0.2", "13abb849139fdc04ab8154986abbcb63bdee5de6ed2ba7e1713527e33df923dd", [:rebar3], [], "hexpm", "78fe127f5a4f5f919d6ea5a2a671827bd53eb9d37e5b4128c0ad3df99856c2e0"}, + "prometheus_parser": {:hex, :prometheus_parser, "0.1.10", "d1657b308506261b17f111429b38c427d7e4699b9b77601113ccec658c8cb7f9", [:mix], [{:nimble_parsec, "~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "3db190840fc9634a8b1a478cbaebd6ef6118d8e4970c71c80a8d11cd24613640"}, "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, - "req": {:hex, :req, "0.4.5", "2071bbedd280f107b9e33e1ddff2beb3991ec1ae06caa2cca2ab756393d8aca5", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.9", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "dd23e9c7303ddeb2dee09ff11ad8102cca019e38394456f265fb7b9655c64dd8"}, - "reverse_proxy_plug": {:hex, :reverse_proxy_plug, "2.3.0", "6830ae83b383576899826d09ac00328e559455de650c9f047a413f1b64de450d", [:mix], [{:cowboy, "~> 2.4", [hex: :cowboy, repo: "hexpm", optional: false]}, {:httpoison, "~> 1.2 or ~> 2.0", [hex: :httpoison, repo: "hexpm", optional: true]}, {:plug, "~> 1.6", [hex: :plug, repo: "hexpm", optional: false]}, {:tesla, "~> 1.4", [hex: :tesla, repo: "hexpm", optional: true]}], "hexpm", "eb0007fb8fc6a4a1627c62270e2713e99bea31883daac77c85d60dd10dcdfdc3"}, + "req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"}, "shards": {:hex, :shards, "1.1.0", "ed3032e63ae99f0eaa6d012b8b9f9cead48b9a810b3f91aeac266cfc4118eff6", [:make, :rebar3], [], "hexpm", "1d188e565a54a458a7a601c2fd1e74f5cfeba755c5a534239266d28b7ff124c7"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, - "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, "tesla": {:hex, :tesla, "1.7.0", "a62dda2f80d4f8a925eb7b8c5b78c461e0eb996672719fe1a63b26321a5f8b4e", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, "~> 1.3", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.0", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "2e64f01ebfdb026209b47bc651a0e65203fcff4ae79c11efb73c4852b00dc313"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, - "x509": {:hex, :x509, "0.8.9", "03c47e507171507d3d3028d802f48dd575206af2ef00f764a900789dfbe17476", [:mix], [], "hexpm", "ea3fb16a870a199cb2c45908a2c3e89cc934f0434173dc0c828136f878f11661"}, + "x509": {:hex, :x509, "0.8.10", "5d1ec6d5f4db31982f9dc34e6a1eebd631d04599e0b6c1c259f1dadd4495e11f", [:mix], [], "hexpm", "a191221665af28b9bdfff0c986ef55f80e126d8ce751bbdf6cefa846410140c0"}, } diff --git a/test/scenarios/deployment.ex b/test/scenarios/deployment.ex index 9d2990fd..0cded556 100644 --- a/test/scenarios/deployment.ex +++ b/test/scenarios/deployment.ex @@ -137,6 +137,7 @@ defmodule Uplink.Scenarios.Deployment do "endpoint" => "http://localhost:#{bypass.port}" }, "uplink" => %{ + "id" => 1, "image_server" => "https://localhost/spaces/test" }, "organization" => %{ diff --git a/test/scenarios/pipeline.ex b/test/scenarios/pipeline.ex new file mode 100644 index 00000000..26861af6 --- /dev/null +++ b/test/scenarios/pipeline.ex @@ -0,0 +1,493 @@ +defmodule Uplink.Scenarios.Pipeline do + alias Uplink.Cache + + @network_metric %{ + "eth0" => %{ + "addresses" => [ + %{ + "address" => "10.89.212.187", + "family" => "inet", + "netmask" => "24", + "scope" => "global" + }, + %{ + "address" => "fd42:bcd9:8738:4f0b:216:3eff:fe49:974b", + "family" => "inet6", + "netmask" => "64", + "scope" => "global" + }, + %{ + "address" => "fe80::216:3eff:fe49:974b", + "family" => "inet6", + "netmask" => "64", + "scope" => "link" + } + ], + "counters" => %{ + "bytes_received" => 536_178_767, + "bytes_sent" => 598_876_792, + "errors_received" => 0, + "errors_sent" => 0, + "packets_dropped_inbound" => 0, + "packets_dropped_outbound" => 0, + "packets_received" => 3_861_861, + "packets_sent" => 5_494_401 + }, + "host_name" => "veth7baf4590", + "hwaddr" => "00:16:3e:49:97:4b", + "mtu" => 1500, + "state" => "up", + "type" => "broadcast" + }, + "lo" => %{ + "addresses" => [ + %{ + "address" => "127.0.0.1", + "family" => "inet", + "netmask" => "8", + "scope" => "local" + }, + %{ + "address" => "::1", + "family" => "inet6", + "netmask" => "128", + "scope" => "local" + } + ], + "counters" => %{ + "bytes_received" => 5_145_976, + "bytes_sent" => 5_145_976, + "errors_received" => 0, + "errors_sent" => 0, + "packets_dropped_inbound" => 0, + "packets_dropped_outbound" => 0, + "packets_received" => 102_914, + "packets_sent" => 102_914 + }, + "host_name" => "", + "hwaddr" => "", + "mtu" => 65536, + "state" => "up", + "type" => "loopback" + }, + "tailscale0" => %{ + "addresses" => [ + %{ + "address" => "100.100.201.17", + "family" => "inet", + "netmask" => "32", + "scope" => "global" + }, + %{ + "address" => "fd7a:115c:a1e0::6501:c911", + "family" => "inet6", + "netmask" => "128", + "scope" => "global" + }, + %{ + "address" => "fe80::cf0d:6cf3:a4c3:23a4", + "family" => "inet6", + "netmask" => "64", + "scope" => "link" + } + ], + "counters" => %{ + "bytes_received" => 63_039_392, + "bytes_sent" => 83_946_128, + "errors_received" => 0, + "errors_sent" => 0, + "packets_dropped_inbound" => 0, + "packets_dropped_outbound" => 0, + "packets_received" => 603_885, + "packets_sent" => 766_923 + }, + "host_name" => "", + "hwaddr" => "", + "mtu" => 1280, + "state" => "up", + "type" => "point-to-point" + } + } + + @instance_metric %Uplink.Metrics.Instance{ + name: "insterra-testing", + timestamp: ~U[2024-10-25 09:44:50.138459Z], + data: %Uplink.Clients.LXD.Instance{ + name: "insterra-testing", + type: "container", + location: "arrakis", + status: "Running", + architecture: "x86_64", + profiles: ["default"], + project: "testing", + description: nil, + created_at: ~U[2024-10-04 06:48:24.543615Z], + last_used_at: ~U[2024-10-05 22:37:55.312007Z], + expanded_config: %{ + "image.architecture" => "amd64", + "image.description" => "alpine 3.19 amd64 (20240708-44)", + "image.os" => "alpine", + "image.release" => "3.19", + "image.requirements.secureboot" => "false", + "image.serial" => "20240708-44", + "image.type" => "squashfs", + "image.variant" => "default", + "volatile.base_image" => + "8279423f529b339b6ebd619e8a69001bd277cd2bd30fc641dbc74516d09c51fc", + "volatile.cloud-init.instance-id" => + "594f1088-f3ff-401b-981a-aef7fd9470c2", + "volatile.eth0.host_name" => "veth7baf4590", + "volatile.eth0.hwaddr" => "00:16:3e:49:97:4b", + "volatile.idmap.base" => "0", + "volatile.idmap.current" => + "[{\"Isuid\":true,\"Isgid\":false,\"Hostid\":1000000,\"Nsid\":0,\"Maprange\":1000000000},{\"Isuid\":false,\"Isgid\":true,\"Hostid\":1000000,\"Nsid\":0,\"Maprange\":1000000000}]", + "volatile.idmap.next" => + "[{\"Isuid\":true,\"Isgid\":false,\"Hostid\":1000000,\"Nsid\":0,\"Maprange\":1000000000},{\"Isuid\":false,\"Isgid\":true,\"Hostid\":1000000,\"Nsid\":0,\"Maprange\":1000000000}]", + "volatile.last_state.idmap" => "[]", + "volatile.last_state.power" => "RUNNING", + "volatile.uuid" => "e1a4139a-ffbf-4099-b652-37ef8ea441ee", + "volatile.uuid.generation" => "e1a4139a-ffbf-4099-b652-37ef8ea441ee" + }, + expanded_devices: %{ + "eth0" => %{ + "name" => "eth0", + "network" => "lxdbr0", + "type" => "nic" + }, + "root" => %{"path" => "/", "pool" => "default", "type" => "disk"} + }, + state: %{ + "cpu" => %{"usage" => 33_909_892_118_000}, + "disk" => %{"root" => %{"total" => 0, "usage" => 432_270_464}}, + "memory" => %{ + "swap_usage" => 0, + "swap_usage_peak" => 0, + "total" => 65_747_700_000, + "usage" => 115_965_952, + "usage_peak" => 0 + }, + "network" => @network_metric, + "pid" => 3219, + "processes" => 33, + "status" => "Running", + "status_code" => 103 + } + }, + node: %Uplink.Clients.LXD.Node{ + name: "arrakis", + cpu_cores_count: 36, + total_memory: 68_719_476_736, + total_storage: 24_504_830_042_112 + }, + metrics: [ + %PrometheusParser.Line{ + line_type: "ENTRY", + timestamp: nil, + pairs: [ + {"device", "sdd"}, + {"name", "insterra-testing"}, + {"project", "testing"}, + {"type", "container"} + ], + value: "2.7889664e+07", + documentation: nil, + type: nil, + label: "lxd_disk_read_bytes_total" + }, + %PrometheusParser.Line{ + line_type: "ENTRY", + timestamp: nil, + pairs: [ + {"device", "sdf"}, + {"name", "insterra-testing"}, + {"project", "testing"}, + {"type", "container"} + ], + value: "2.8053504e+07", + documentation: nil, + type: nil, + label: "lxd_disk_read_bytes_total" + }, + %PrometheusParser.Line{ + line_type: "ENTRY", + timestamp: nil, + pairs: [ + {"device", "sde"}, + {"name", "insterra-testing"}, + {"project", "testing"}, + {"type", "container"} + ], + value: "3.1424512e+07", + documentation: nil, + type: nil, + label: "lxd_disk_read_bytes_total" + }, + %PrometheusParser.Line{ + line_type: "ENTRY", + timestamp: nil, + pairs: [ + {"device", "sdb"}, + {"name", "insterra-testing"}, + {"project", "testing"}, + {"type", "container"} + ], + value: "2.7951104e+07", + documentation: nil, + type: nil, + label: "lxd_disk_read_bytes_total" + }, + %PrometheusParser.Line{ + line_type: "ENTRY", + timestamp: nil, + pairs: [ + {"device", "sdc"}, + {"name", "insterra-testing"}, + {"project", "testing"}, + {"type", "container"} + ], + value: "3.1215616e+07", + documentation: nil, + type: nil, + label: "lxd_disk_read_bytes_total" + }, + %PrometheusParser.Line{ + line_type: "ENTRY", + timestamp: nil, + pairs: [ + {"device", "sda"}, + {"name", "insterra-testing"}, + {"project", "testing"}, + {"type", "container"} + ], + value: "3.1203328e+07", + documentation: nil, + type: nil, + label: "lxd_disk_read_bytes_total" + }, + %PrometheusParser.Line{ + line_type: "ENTRY", + timestamp: nil, + pairs: [ + {"device", "sdd"}, + {"name", "insterra-testing"}, + {"project", "testing"}, + {"type", "container"} + ], + value: "2769", + documentation: nil, + type: nil, + label: "lxd_disk_reads_completed_total" + }, + %PrometheusParser.Line{ + line_type: "ENTRY", + timestamp: nil, + pairs: [ + {"device", "sdf"}, + {"name", "insterra-testing"}, + {"project", "testing"}, + {"type", "container"} + ], + value: "2818", + documentation: nil, + type: nil, + label: "lxd_disk_reads_completed_total" + }, + %PrometheusParser.Line{ + line_type: "ENTRY", + timestamp: nil, + pairs: [ + {"device", "sde"}, + {"name", "insterra-testing"}, + {"project", "testing"}, + {"type", "container"} + ], + value: "3204", + documentation: nil, + type: nil, + label: "lxd_disk_reads_completed_total" + }, + %PrometheusParser.Line{ + line_type: "ENTRY", + timestamp: nil, + pairs: [ + {"device", "sdb"}, + {"name", "insterra-testing"}, + {"project", "testing"}, + {"type", "container"} + ], + value: "2821", + documentation: nil, + type: nil, + label: "lxd_disk_reads_completed_total" + }, + %PrometheusParser.Line{ + line_type: "ENTRY", + timestamp: nil, + pairs: [ + {"device", "sdc"}, + {"name", "insterra-testing"}, + {"project", "testing"}, + {"type", "container"} + ], + value: "3244", + documentation: nil, + type: nil, + label: "lxd_disk_reads_completed_total" + }, + %PrometheusParser.Line{ + line_type: "ENTRY", + timestamp: nil, + pairs: [ + {"device", "sda"}, + {"name", "insterra-testing"}, + {"project", "testing"}, + {"type", "container"} + ], + value: "3215", + documentation: nil, + type: nil, + label: "lxd_disk_reads_completed_total" + }, + %PrometheusParser.Line{ + line_type: "ENTRY", + timestamp: nil, + pairs: [ + {"name", "insterra-testing"}, + {"project", "testing"}, + {"type", "container"} + ], + value: "7.8544896e+07", + documentation: nil, + type: nil, + label: "lxd_memory_Cached_bytes" + } + ], + account: %{id: "upmaru-stage"} + } + + def self(_context) do + Cache.put(:self, %{ + "credential" => %{ + "endpoint" => "http://localhost" + }, + "uplink" => %{ + "id" => 1, + "image_server" => "https://localhost/spaces/test" + }, + "organization" => %{ + "slug" => "someorg", + "storage" => %{ + "type" => "s3", + "host" => "some.host", + "bucket" => "some-bucket", + "region" => "sgp1", + "credential" => %{ + "access_key_id" => "access-key", + "secret_access_key" => "secret" + } + } + }, + "instances" => [ + %{ + "id" => 1, + "slug" => "uplink-01", + "node" => %{ + "id" => 1, + "slug" => "some-node-01", + "public_ip" => "127.0.0.1" + } + } + ] + }) + + :ok + end + + def messages(_context) do + message_without_previous_cpu_metric = %{ + metric: @instance_metric, + previous_cpu_metric: nil, + previous_network_metric: nil, + cpu_60_metric: nil, + cpu_300_metric: nil, + cpu_900_metric: nil + } + + message_with_previous_cpu_metric = %{ + metric: @instance_metric, + previous_cpu_metric: %{ + data: %{"usage" => 1}, + timestamp: 1 + }, + previous_network_metric: nil, + cpu_60_metric: nil, + cpu_300_metric: nil, + cpu_900_metric: nil + } + + message_with_previous_network_metric = %{ + metric: @instance_metric, + previous_cpu_metric: nil, + previous_network_metric: %{ + timestamp: 1, + data: @network_metric + }, + cpu_60_metric: nil, + cpu_300_metric: nil, + cpu_900_metric: nil + } + + message_with_cpu_60_metric = %{ + metric: @instance_metric, + previous_cpu_metric: nil, + previous_network_metric: nil, + cpu_60_metric: %{ + data: %{"usage" => 1_000_000}, + timestamp: 1 + }, + cpu_300_metric: nil, + cpu_900_metric: nil + } + + message_with_cpu_300_metric = %{ + metric: @instance_metric, + previous_cpu_metric: nil, + previous_network_metric: nil, + cpu_60_metric: %{ + data: %{"usage" => 1_000_000}, + timestamp: 1 + }, + cpu_300_metric: %{ + data: %{"usage" => 1_000_000}, + timestamp: 1 + }, + cpu_900_metric: nil + } + + message_with_cpu_900_metric = %{ + metric: @instance_metric, + previous_cpu_metric: nil, + previous_network_metric: nil, + cpu_60_metric: %{ + data: %{"usage" => 1_000_000}, + timestamp: 1 + }, + cpu_300_metric: %{ + data: %{"usage" => 1_000_000}, + timestamp: 1 + }, + cpu_900_metric: %{ + data: %{"usage" => 1_000_000}, + timestamp: 1 + } + } + + {:ok, + message_without_previous_cpu_metric: message_without_previous_cpu_metric, + message_with_previous_cpu_metric: message_with_previous_cpu_metric, + message_with_previous_network_metric: message_with_previous_network_metric, + message_with_cpu_60_metric: message_with_cpu_60_metric, + message_with_cpu_300_metric: message_with_cpu_300_metric, + message_with_cpu_900_metric: message_with_cpu_900_metric} + end +end diff --git a/test/support/assert_async.ex b/test/support/assert_async.ex new file mode 100644 index 00000000..93054a59 --- /dev/null +++ b/test/support/assert_async.ex @@ -0,0 +1,79 @@ +defmodule AssertAsync do + @moduledoc """ + Helper macro for making assertions on async actions. This is particularly + useful for testing GenServers and other processes that may be synchronously + processing messages. The macro will retry an assertion until it passes + or times out. + + ## Example + + defmodule Foo do + use GenServer + + def init(opts) do + {:ok, state, {:continue, :sleep}} + end + + def handle_continue(:sleep, state) do + Process.sleep(2_000) + {:noreply, state} + end + + def handle_call(:bar, _, state) do + Map.get(state, :bar) + end + end + + iex> import AssertAsync + iex {:ok, pid} = GenServer.start_link(Foo, %{bar: 42}) + iex> assert_async do + ...> assert GenServer.call(pid, :bar) == 42 + ...> end + + ## Configuration + + * `sleep` - Time in milliseconds to wait before next retry. Defaults to `200`. + * `max_tries` - Number of attempts to make before flunking assertion. Defaults to `10`. + * `debug` - Boolean for producing `DEBUG` messages on failing iterations. Defaults `false`. + """ + + defmodule Impl do + @moduledoc false + require Logger + + @defaults %{ + sleep: 200, + max_tries: 10, + debug: false + } + + def assert(function, opts) do + state = Map.merge(@defaults, Map.new(opts)) + do_assert(function, state) + end + + defp do_assert(function, %{max_tries: 1}) do + function.() + end + + defp do_assert(function, %{max_tries: max_tries} = opts) do + function.() + rescue + e in ExUnit.AssertionError -> + if opts.debug do + Logger.debug(fn -> + "AssertAsync(remaining #{max_tries - 1}): #{ExUnit.AssertionError.message(e)}" + end) + end + + Process.sleep(opts.sleep) + do_assert(function, %{opts | max_tries: max_tries - 1}) + end + end + + defmacro assert_async(opts \\ [], do: do_block) do + quote do + AssertAsync.Impl.assert(fn -> unquote(do_block) end, unquote(opts)) + end + end +end diff --git a/test/uplink/clients/caddy/config/reload_test.exs b/test/uplink/clients/caddy/config/reload_test.exs index 58ad9318..ec992b0e 100644 --- a/test/uplink/clients/caddy/config/reload_test.exs +++ b/test/uplink/clients/caddy/config/reload_test.exs @@ -101,6 +101,9 @@ defmodule Uplink.Clients.Caddy.Config.ReloadTest do "credential" => %{ "endpoint" => "http://localhost:#{bypass.port}" }, + "uplink" => %{ + "id" => 1 + }, "organization" => %{ "slug" => "someorg", "storage" => %{ diff --git a/test/uplink/clients/lxd/instance/manager_test.exs b/test/uplink/clients/lxd/instance/manager_test.exs index 2632bf0f..acbbdc69 100644 --- a/test/uplink/clients/lxd/instance/manager_test.exs +++ b/test/uplink/clients/lxd/instance/manager_test.exs @@ -37,7 +37,7 @@ defmodule Uplink.Clients.LXD.Instance.ManagerTest do |> Plug.Conn.resp(200, response) end) - assert [instance1, _instance2] = Manager.list("default") + assert [instance1, _instance2] = Manager.list(project: "default") assert %Instance{} = instance1 end end diff --git a/test/uplink/metrics/pipeline_test.exs b/test/uplink/metrics/pipeline_test.exs new file mode 100644 index 00000000..1976443f --- /dev/null +++ b/test/uplink/metrics/pipeline_test.exs @@ -0,0 +1,131 @@ +defmodule Uplink.Metrics.PipelineTest do + use ExUnit.Case + + import Uplink.Scenarios.Pipeline + import AssertAsync + + alias Uplink.Cache + alias Uplink.Pipelines + + setup [:self, :messages] + + setup do + Cache.put_new({:monitors, :metrics}, []) + + Pipelines.start(Uplink.Metrics.Pipeline) + + assert_async do + assert Uplink.Pipelines.list() != [] + end + + :ok + end + + test "handle message without previous cpu metric", %{ + message_without_previous_cpu_metric: message + } do + ref = Broadway.test_message(Uplink.Metrics.Pipeline, message) + + assert_receive {:ack, ^ref, [%{data: data}], []} + + assert %{ + memory: memory, + filesystem: filesystem, + cpu: cpu, + diskio: diskio, + uptime: uptime, + network: network, + load: load + } = data + + assert is_nil(cpu) + assert is_nil(load) + assert is_nil(network) + + assert %{"system" => %{"memory" => _}} = memory + assert %{"system" => %{"filesystem" => _}} = filesystem + assert %{"system" => %{"diskio" => _}} = diskio + assert %{"system" => %{"uptime" => _}} = uptime + end + + test "handle message with cpu metric", %{ + message_with_previous_cpu_metric: message + } do + ref = Broadway.test_message(Uplink.Metrics.Pipeline, message) + + assert_receive {:ack, ^ref, [%{data: data}], []}, 10_000 + + assert %{cpu: cpu} = data + + assert %{"system" => %{"cpu" => _}} = cpu + end + + test "handle message with network metric", %{ + message_with_previous_network_metric: message + } do + ref = Broadway.test_message(Uplink.Metrics.Pipeline, message) + + assert_receive {:ack, ^ref, [%{data: data}], []} + + assert %{network: network} = data + + assert [ + %{"system" => %{"network" => _}}, + %{"system" => %{"network" => _}}, + %{"system" => %{"network" => _}} + ] = network + end + + test "handle message with load 1", %{ + message_with_cpu_60_metric: message + } do + ref = Broadway.test_message(Uplink.Metrics.Pipeline, message) + + assert_receive {:ack, ^ref, [%{data: data}], []}, 10_000 + + assert %{load: load} = data + + assert %{"system" => %{"load" => _}} = load + end + + test "handle message with load 5", %{ + message_with_cpu_300_metric: message + } do + ref = Broadway.test_message(Uplink.Metrics.Pipeline, message) + + assert_receive {:ack, ^ref, [%{data: data}], []}, 10_000 + + assert %{load: load} = data + + assert %{"system" => %{"load" => %{"1" => _, "5" => load_5}}} = load + + assert not is_nil(load_5) + end + + test "handle message with load 15", %{ + message_with_cpu_900_metric: message + } do + ref = Broadway.test_message(Uplink.Metrics.Pipeline, message) + + assert_receive {:ack, ^ref, [%{data: data}], []}, 10_000 + + assert %{load: load} = data + + assert %{"system" => %{"load" => %{"1" => _, "5" => _, "15" => load_15}}} = + load + + assert not is_nil(load_15) + end + + test "handle batch", %{ + message_with_previous_cpu_metric: message1, + message_with_previous_network_metric: message2 + } do + ref = Broadway.test_batch(Uplink.Metrics.Pipeline, [message1, message2]) + + assert_receive {:ack, ^ref, successful, failed}, 10_000 + + assert length(successful) == 2 + assert length(failed) == 0 + end +end diff --git a/test/uplink/monitors/router_test.exs b/test/uplink/monitors/router_test.exs new file mode 100644 index 00000000..5215db9a --- /dev/null +++ b/test/uplink/monitors/router_test.exs @@ -0,0 +1,77 @@ +defmodule Uplink.Monitors.RouterTest do + use ExUnit.Case + use Plug.Test + + alias Uplink.Monitors.Router + + @opts Router.init([]) + + @monitors_response %{ + "data" => [ + %{ + "attributes" => %{ + "current_state" => "active", + "endpoint" => "https://elastic:9200", + "expires_at" => "2024-11-21T03:14:17Z", + "id" => 1, + "token" => "some-token", + "type" => "metrics", + "uid" => "some-uid" + }, + "id" => "1", + "links" => %{"self" => "http://localhost:4000/uplink/self/monitors/1"}, + "relationships" => %{}, + "type" => "monitors" + } + ], + "included" => [], + "links" => %{"self" => "http://localhost:4000/uplink/self/monitors"} + } + + @valid_refresh_body Jason.encode!(%{ + "actor" => %{ + "provider" => "instellar", + "identifier" => "zacksiri", + "id" => "1" + } + }) + + setup do + bypass = Bypass.open() + + Application.put_env( + :uplink, + Uplink.Clients.Instellar, + endpoint: "http://localhost:#{bypass.port}/uplink" + ) + + {:ok, bypass: bypass} + end + + describe "refresh monitors" do + setup do + signature = + :crypto.mac(:hmac, :sha256, Uplink.Secret.get(), @valid_refresh_body) + |> Base.encode16() + |> String.downcase() + + {:ok, signature: signature} + end + + test "can refresh monitors list", %{signature: signature, bypass: bypass} do + Bypass.expect_once(bypass, "GET", "/uplink/self/monitors", fn conn -> + conn + |> Plug.Conn.put_resp_header("content-type", "application/json") + |> Plug.Conn.send_resp(200, Jason.encode!(@monitors_response)) + end) + + conn = + conn(:post, "/refresh", @valid_refresh_body) + |> put_req_header("x-uplink-signature-256", "sha256=#{signature}") + |> put_req_header("content-type", "application/json") + |> Router.call(@opts) + + assert conn.status == 200 + end + end +end diff --git a/test/uplink/monitors_test.exs b/test/uplink/monitors_test.exs new file mode 100644 index 00000000..c9a1bd01 --- /dev/null +++ b/test/uplink/monitors_test.exs @@ -0,0 +1,47 @@ +defmodule Uplink.MonitorsTest do + use ExUnit.Case + + @monitors_response %{ + "data" => [ + %{ + "attributes" => %{ + "current_state" => "active", + "endpoint" => "https://elastic:9200", + "expires_at" => "2024-11-21T03:14:17Z", + "id" => 1, + "token" => "some-token", + "type" => "metrics", + "uid" => "some-uid" + }, + "id" => "1", + "links" => %{"self" => "http://localhost:4000/uplink/self/monitors/1"}, + "relationships" => %{}, + "type" => "monitors" + } + ], + "included" => [], + "links" => %{"self" => "http://localhost:4000/uplink/self/monitors"} + } + + setup do + bypass = Bypass.open() + + Application.put_env( + :uplink, + Uplink.Clients.Instellar, + endpoint: "http://localhost:#{bypass.port}/uplink" + ) + + {:ok, bypass: bypass} + end + + test "run", %{bypass: bypass} do + Bypass.expect_once(bypass, "GET", "/uplink/self/monitors", fn conn -> + conn + |> Plug.Conn.put_resp_header("content-type", "application/json") + |> Plug.Conn.send_resp(200, Jason.encode!(@monitors_response)) + end) + + assert :ok == Uplink.Monitors.run() + end +end diff --git a/test/uplink/packages/distribution_test.exs b/test/uplink/packages/distribution_test.exs index 977d938f..6e65b4ec 100644 --- a/test/uplink/packages/distribution_test.exs +++ b/test/uplink/packages/distribution_test.exs @@ -183,4 +183,60 @@ defmodule Uplink.Packages.DistributionTest do assert conn.status == 200 end end + + describe "not matching arvhive node" do + setup %{deployment: deployment, actor: actor} do + node_host = "somethingelse" + + {:ok, archive} = + Packages.create_archive(deployment, %{ + node: "nonode@#{node_host}", + locations: [ + "#{deployment.channel}/#{@app_slug}/x86_64/APKINDEX.tar.gz" + ] + }) + + {:ok, %{resource: completed_deployment}} = + Packages.transition_deployment_with(deployment, actor, "complete") + + {:ok, + archive: archive, deployment: completed_deployment, node_host: node_host} + end + + test "successfully redirect", %{ + bypass: bypass, + deployment: deployment, + address: address, + node_host: node_host + } do + project_found = File.read!("test/fixtures/lxd/projects/show.json") + + Bypass.expect( + bypass, + "GET", + "/1.0/projects/default", + fn conn -> + conn + |> Plug.Conn.put_resp_header("content-type", "application/json") + |> Plug.Conn.resp(200, project_found) + end + ) + + path = + "/distribution/#{deployment.channel}/#{@app_slug}/x86_64/APKINDEX.tar.gz" + + conn = + conn(:get, path) + |> Map.put(:remote_ip, address) + |> Uplink.Internal.call([]) + + assert [location] = get_resp_header(conn, "location") + + assert conn.status == 302 + + assert location =~ node_host + assert location =~ path + assert location =~ "4080" + end + end end