Skip to content

Commit

Permalink
Device Connections (#1572)
Browse files Browse the repository at this point in the history
  • Loading branch information
elinol authored Nov 16, 2024
1 parent 2793c75 commit cf66bef
Show file tree
Hide file tree
Showing 27 changed files with 518 additions and 209 deletions.
2 changes: 1 addition & 1 deletion lib/nerves_hub/deployments.ex
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ defmodule NervesHub.Deployments do
^deployment.id
)
})
|> where([d], not is_nil(d.connection_last_seen_at))
|> where([d], d.status == :provisioned)
|> where(
[d],
d.deployment_id == ^deployment.id or
Expand Down
111 changes: 51 additions & 60 deletions lib/nerves_hub/devices.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule NervesHub.Devices do
alias NervesHub.Deployments.Orchestrator
alias NervesHub.Devices.CACertificate
alias NervesHub.Devices.Alarms
alias NervesHub.Devices.Connections
alias NervesHub.Devices.Device
alias NervesHub.Devices.DeviceCertificate
alias NervesHub.Devices.DeviceHealth
Expand All @@ -34,6 +35,13 @@ defmodule NervesHub.Devices do
Repo.get(Device, device_id)
end

def get_device(device_id, :preload_latest_connection) when is_integer(device_id) do
Device
|> where(id: ^device_id)
|> Connections.preload_latest_connection()
|> Repo.one()
end

def get_active_device(filters) do
Device
|> Repo.exclude_deleted()
Expand All @@ -45,14 +53,18 @@ defmodule NervesHub.Devices do
end

def get_devices_by_org_id_and_product_id(org_id, product_id) do
query =
from(
d in Device,
where: d.org_id == ^org_id,
where: d.product_id == ^product_id
)
Device
|> where([d], d.org_id == ^org_id)
|> where([d], d.product_id == ^product_id)
|> Repo.exclude_deleted()
|> Repo.all()
end

query
def get_devices_by_org_id_and_product_id(org_id, product_id, :preload_latest_connection) do
Device
|> where([d], d.org_id == ^org_id)
|> where([d], d.product_id == ^product_id)
|> Connections.preload_latest_connection()
|> Repo.exclude_deleted()
|> Repo.all()
end
Expand All @@ -73,6 +85,7 @@ defmodule NervesHub.Devices do
|> order_by(^sort_devices(sorting))
|> filtering(filters)
|> preload([d, o, p, dp, f], org: o, product: p, deployment: {dp, firmware: f})
|> Connections.preload_latest_connection()
|> Repo.paginate(pagination)
end

Expand All @@ -83,6 +96,7 @@ defmodule NervesHub.Devices do

Device
|> where([d], d.product_id == ^product_id)
|> Connections.preload_latest_connection()
|> Repo.exclude_deleted()
|> filtering(filters)
|> order_by(^sort_devices(sorting))
Expand All @@ -91,18 +105,20 @@ defmodule NervesHub.Devices do

def get_minimal_device_location_by_org_id_and_product_id(org_id, product_id) do
Device
|> select([d], %{
|> where(org_id: ^org_id)
|> where(product_id: ^product_id)
|> where([d], not is_nil(fragment("?->'location'->'latitude'", d.connection_metadata)))
|> where([d], not is_nil(fragment("?->'location'->'longitude'", d.connection_metadata)))
|> join(:left, [d], dc in subquery(Connections.latest_row_query()), on: dc.device_id == d.id)
|> where([d, dc], dc.rn == 1)
|> select([d, dc], %{
id: d.id,
identifier: d.identifier,
connection_status: d.connection_status,
connection_status: dc.status,
latitude: fragment("?->'location'->'latitude'", d.connection_metadata),
longitude: fragment("?->'location'->'longitude'", d.connection_metadata),
firmware_uuid: fragment("?->'uuid'", d.firmware_metadata)
})
|> where(org_id: ^org_id)
|> where(product_id: ^product_id)
|> where([d], not is_nil(fragment("?->'location'->'latitude'", d.connection_metadata)))
|> where([d], not is_nil(fragment("?->'location'->'longitude'", d.connection_metadata)))
|> Repo.exclude_deleted()
|> Repo.all()
end
Expand Down Expand Up @@ -164,17 +180,24 @@ defmodule NervesHub.Devices do
{_, ""} ->
query

{:alarm, value} ->
where(query, [d], d.id in subquery(Alarms.query_devices_with_alarm(value)))

{:alarm_status, "with"} ->
where(query, [d], d.id in subquery(Alarms.query_devices_with_alarms()))

{:alarm_status, "without"} ->
where(query, [d], d.id not in subquery(Alarms.query_devices_with_alarms()))

{:alarm, value} ->
where(query, [d], d.id in subquery(Alarms.query_devices_with_alarm(value)))
{:connection, "not_seen"} ->
where(query, [d], d.status == :registered)

{:connection, _value} ->
where(query, [d], d.connection_status == ^String.to_atom(value))
{:connection, value} ->
where(
query,
[d],
d.id in subquery(Connections.query_devices_with_connection_status(value))
)

{:connection_type, value} ->
where(query, [d], ^value in d.connection_types)
Expand Down Expand Up @@ -310,6 +333,11 @@ defmodule NervesHub.Devices do
|> preload([d, device_certificates: dc], device_certificates: dc)
end

defp join_and_preload(query, :latest_connection) do
query
|> Connections.preload_latest_connection()
end

@spec get_shared_secret_auth(String.t()) ::
{:ok, SharedSecretAuth.t()} | {:error, :not_found}
def get_shared_secret_auth(key) do
Expand Down Expand Up @@ -391,6 +419,12 @@ defmodule NervesHub.Devices do
|> Repo.insert()
end

def set_as_provisioned!(device) do
device
|> Device.changeset(%{status: :provisioned, first_seen_at: DateTime.utc_now()})
|> Repo.update!()
end

def delete_device(%Device{} = device) do
device_certificates_query = from(dc in DeviceCertificate, where: dc.device_id == ^device.id)
changeset = Repo.soft_delete_changeset(device)
Expand Down Expand Up @@ -657,49 +691,6 @@ defmodule NervesHub.Devices do
|> Repo.all()
end

def device_connected(device) do
device
|> clear_connection_information()
|> Device.changeset(%{
connection_status: :connected,
connection_established_at: DateTime.utc_now(),
connection_disconnected_at: nil,
connection_last_seen_at: DateTime.utc_now()
})
|> Repo.update()
end

def device_heartbeat(device) do
device
|> clear_connection_information()
|> Device.changeset(%{
connection_status: :connected,
connection_disconnected_at: nil,
connection_last_seen_at: DateTime.utc_now()
})
|> Repo.update()
end

def device_disconnected(device) do
device
|> clear_connection_information()
|> Device.changeset(%{
connection_status: :disconnected,
connection_disconnected_at: DateTime.utc_now(),
connection_last_seen_at: DateTime.utc_now()
})
|> Repo.update()
end

defp clear_connection_information(device) do
%{
device
| connection_status: nil,
connection_disconnected_at: "dummy",
connection_last_seen_at: nil
}
end

def clean_connection_states() do
interval = Application.get_env(:nerves_hub, :device_last_seen_update_interval_minutes)
a_minute_ago = DateTime.shift(DateTime.utc_now(), minute: -(interval + 1))
Expand Down
131 changes: 131 additions & 0 deletions lib/nerves_hub/devices/connections.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
defmodule NervesHub.Devices.Connections do
@moduledoc """
Handles connection data for devices, reported from device socket.
"""
import Ecto.Query

alias NervesHub.Devices.Device
alias NervesHub.Devices.DeviceConnection
alias NervesHub.Repo

@doc """
Get all connections for a device.
"""
@spec get_device_connections(non_neg_integer()) :: [DeviceConnection.t()]
def get_device_connections(device_id) do
DeviceConnection
|> where(device_id: ^device_id)
|> order_by(asc: :last_seen_at)
|> Repo.all()
end

@doc """
Get latest inserted connection for a device.
"""
@spec get_latest_for_device(non_neg_integer()) :: DeviceConnection.t() | nil
def get_latest_for_device(device_id) do
DeviceConnection
|> where(device_id: ^device_id)
|> order_by(desc: :last_seen_at)
|> limit(1)
|> Repo.one()
end

@doc """
Preload latest respective connection in a device query.
"""
@spec preload_latest_connection(Query.t()) :: Query.t()
def preload_latest_connection(query) do
query
|> preload(device_connections: ^distinct_on_device())
end

@doc """
Creates a device connection, reported from device socket
"""
@spec device_connected(non_neg_integer()) ::
{:ok, DeviceConnection.t()} | {:error, Ecto.Changeset.t()}
def device_connected(device_id) do
now = DateTime.utc_now()

%{
device_id: device_id,
established_at: now,
last_seen_at: now,
status: :connected
}
|> DeviceConnection.create_changeset()
|> Repo.insert()
end

@doc """
Updates the `last_seen_at`field for a device connection with current timestamp
"""
@spec device_heartbeat(UUIDv7.t()) :: {:ok, DeviceConnection.t()} | {:error, Ecto.Changeset.t()}
def device_heartbeat(ref_id) do
DeviceConnection
|> Repo.get!(ref_id)
|> DeviceConnection.update_changeset(%{last_seen_at: DateTime.utc_now()})
|> Repo.update()
end

@doc """
Updates `status` and relevant timestamps for a device connection record,
and stores the reason for disconnection if provided.
"""
@spec device_disconnected(UUIDv7.t(), String.t() | nil) ::
{:ok, DeviceConnection.t()} | {:error, Ecto.Changeset.t()}
def device_disconnected(ref_id, reason \\ nil) do
now = DateTime.utc_now()

DeviceConnection
|> Repo.get!(ref_id)
|> DeviceConnection.update_changeset(%{
last_seen_at: now,
disconnected_at: now,
disconnected_reason: reason,
status: :disconnected
})
|> Repo.update()
end

@doc """
Selects devices id's which has provided status in it's latest connection record.
"""
@spec query_devices_with_connection_status(String.t()) :: Query.t()
def query_devices_with_connection_status(status) do
(lr in subquery(latest_row_query()))
|> from()
|> where([lr], lr.rn == 1)
|> where(
[lr],
lr.status == ^String.to_existing_atom(status)
)
|> join(:inner, [lr], d in Device, on: lr.device_id == d.id)
|> select([lr, d], d.id)
end

@doc """
Generates a query to retrieve the most recent `DeviceConnection` for devices.
The query includes the row number (`rn`)
for each record, which is used to identify the most recent connection.
Returns an Ecto query.
"""
@spec latest_row_query() :: Query.t()
def latest_row_query() do
DeviceConnection
|> select([dc], %{
device_id: dc.device_id,
status: dc.status,
last_seen_at: dc.last_seen_at,
rn: row_number() |> over(partition_by: dc.device_id, order_by: [desc: dc.last_seen_at])
})
end

defp distinct_on_device() do
DeviceConnection
|> distinct(:device_id)
|> order_by([:device_id, desc: :last_seen_at])
end
end
25 changes: 19 additions & 6 deletions lib/nerves_hub/devices/device.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule NervesHub.Devices.Device do

alias NervesHub.Accounts.Org
alias NervesHub.Devices.DeviceCertificate
alias NervesHub.Devices.DeviceConnection
alias NervesHub.Deployments.Deployment
alias NervesHub.Firmwares.FirmwareMetadata
alias NervesHub.Products.Product
Expand All @@ -26,7 +27,9 @@ defmodule NervesHub.Devices.Device do
:connection_disconnected_at,
:connection_last_seen_at,
:connection_types,
:connection_metadata
:connection_metadata,
:status,
:first_seen_at
]
@required_params [:org_id, :product_id, :identifier]

Expand All @@ -36,6 +39,7 @@ defmodule NervesHub.Devices.Device do
belongs_to(:deployment, Deployment)
embeds_one(:firmware_metadata, FirmwareMetadata, on_replace: :update)
has_many(:device_certificates, DeviceCertificate, on_delete: :delete_all)
has_many(:device_connections, DeviceConnection, on_delete: :delete_all)

field(:identifier, :string)
field(:description, :string)
Expand All @@ -45,6 +49,20 @@ defmodule NervesHub.Devices.Device do
field(:update_attempts, {:array, :utc_datetime}, default: [])
field(:updates_blocked_until, :utc_datetime)

field(:status, Ecto.Enum,
values: [:registered, :provisioned],
default: :registered
)

field(:first_seen_at, :utc_datetime)

field(:connection_types, {:array, Ecto.Enum}, values: [:cellular, :ethernet, :wifi])
field(:connecting_code, :string)
field(:connection_metadata, :map, default: %{})

timestamps()

# Deprecated fields, replaced with device_connections table.
field(:connection_status, Ecto.Enum,
values: [:connected, :disconnected, :not_seen],
default: :not_seen
Expand All @@ -53,11 +71,6 @@ defmodule NervesHub.Devices.Device do
field(:connection_established_at, :utc_datetime)
field(:connection_disconnected_at, :utc_datetime)
field(:connection_last_seen_at, :utc_datetime)
field(:connection_types, {:array, Ecto.Enum}, values: [:cellular, :ethernet, :wifi])
field(:connecting_code, :string)
field(:connection_metadata, :map, default: %{})

timestamps()
end

def changeset(%Device{} = device, params) do
Expand Down
Loading

0 comments on commit cf66bef

Please sign in to comment.