Skip to content

Commit

Permalink
Merge pull request #782 from Annopaolo/internal-topic
Browse files Browse the repository at this point in the history
DUP: Use the `internal` event type for Astarte internal messages (e.g. device heartbeat).
  • Loading branch information
bettio authored Jun 9, 2023
2 parents 75ded2b + a80c716 commit af870f9
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [1.1.0-rc.0] - Unreleased
### Changed
- Update Elixir to 1.14.5 and Erlang/OTP to 25.3.2.
- [astarte_data_updater_plant] Use the `internal` event type for Astarte
internal messages. (e.g. device heartbeat).
### Fixed
- [astarte_realm_management_api] Provide detailed feedback when a trigger action
is malformed. Fix [#748](https://github.com/astarte-platform/astarte/issues/748).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer do
@control_path_header "x_astarte_control_path"
@interface_header "x_astarte_interface"
@path_header "x_astarte_path"
@internal_path_header "x_astarte_internal_path"

# API

Expand Down Expand Up @@ -255,6 +256,7 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer do
end
end

# TODO remove this when all heartbeats will be moved to internal
defp handle_consume("heartbeat", payload, headers, timestamp, meta) do
with %{
@realm_header => realm,
Expand All @@ -268,6 +270,27 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer do
end
end

defp handle_consume("internal", payload, headers, timestamp, meta) do
with %{
@realm_header => realm,
@device_id_header => device_id,
@internal_path_header => internal_path
} <- headers,
{:ok, tracking_id} <- get_tracking_id(meta) do
# Following call might spawn processes and implicitly monitor them
DataUpdater.handle_internal(
realm,
device_id,
internal_path,
payload,
tracking_id,
timestamp
)
else
_ -> handle_invalid_msg(payload, headers, timestamp, meta)
end
end

defp handle_consume("introspection", payload, headers, timestamp, meta) do
with %{
@realm_header => realm,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater do
|> GenServer.cast({:handle_disconnection, message_id, timestamp})
end

# TODO remove this when all heartbeats will be moved to internal
def handle_heartbeat(realm, encoded_device_id, tracking_id, timestamp) do
message_tracker = get_message_tracker(realm, encoded_device_id)
{message_id, delivery_tag} = tracking_id
Expand All @@ -58,6 +59,22 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater do
|> GenServer.cast({:handle_heartbeat, message_id, timestamp})
end

def handle_internal(
realm,
encoded_device_id,
path,
payload,
tracking_id,
timestamp
) do
message_tracker = get_message_tracker(realm, encoded_device_id)
{message_id, delivery_tag} = tracking_id
MessageTracker.track_delivery(message_tracker, message_id, delivery_tag)

get_data_updater_process(realm, encoded_device_id, message_tracker)
|> GenServer.cast({:handle_internal, path, payload, message_id, timestamp})
end

def handle_data(
realm,
encoded_device_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
%{new_state | connected: true, last_seen_message: timestamp}
end

# TODO make this private when all heartbeats will be moved to internal
def handle_heartbeat(state, message_id, timestamp) do
{:ok, db_client} = Database.connect(realm: state.realm)

Expand All @@ -166,6 +167,42 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
%{new_state | connected: true, last_seen_message: timestamp}
end

def handle_internal(state, "/heartbeat", _payload, message_id, timestamp) do
handle_heartbeat(state, message_id, timestamp)
end

def handle_internal(state, path, payload, message_id, timestamp) do
Logger.warn("Unexpected internal message on #{path}, payload: #{inspect(payload)}",
tag: "unexpected_internal_message"
)

{:ok, new_state} = ask_clean_session(state, timestamp)
MessageTracker.discard(new_state.message_tracker, message_id)

:telemetry.execute(
[:astarte, :data_updater_plant, :data_updater, :discarded_internal_message],
%{},
%{realm: new_state.realm}
)

base64_payload = Base.encode64(payload)

error_metadata = %{
"path" => inspect(path),
"base64_payload" => base64_payload
}

# TODO maybe we don't want triggers on unexpected internal messages?
execute_device_error_triggers(
new_state,
"unexpected_internal_message",
error_metadata,
timestamp
)

update_stats(new_state, "", nil, path, payload)
end

def handle_disconnection(state, message_id, timestamp) do
{:ok, db_client} = Database.connect(realm: state.realm)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Server do
end
end

# TODO remove this when all heartbeats will be moved to internal
def handle_cast({:handle_heartbeat, message_id, timestamp}, state) do
if MessageTracker.can_process_message(state.message_tracker, message_id) do
new_state = Impl.handle_heartbeat(state, message_id, timestamp)
Expand All @@ -64,6 +65,15 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Server do
end
end

def handle_cast({:handle_internal, payload, path, message_id, timestamp}, state) do
if MessageTracker.can_process_message(state.message_tracker, message_id) do
new_state = Impl.handle_internal(state, payload, path, message_id, timestamp)
{:noreply, new_state}
else
{:noreply, state}
end
end

def handle_cast({:handle_data, interface, path, payload, message_id, timestamp}, state) do
timeout = Config.data_updater_deactivation_interval_ms!()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1565,6 +1565,93 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do
) == {:error, :device_does_not_exist}
end

test "heartbeat message of type internal is correctly handled" do
alias Astarte.DataUpdaterPlant.DataUpdater.State

AMQPTestHelper.clean_queue()

realm = "autotestrealm"

encoded_device_id =
:crypto.strong_rand_bytes(16)
|> Base.url_encode64(padding: false)

{:ok, device_id} = Device.decode_device_id(encoded_device_id)

DatabaseTestHelper.insert_device(device_id)

{:ok, db_client} = Database.connect(realm: realm)

timestamp_us_x_10 = make_timestamp("2017-12-09T14:00:32+00:00")
timestamp_ms = div(timestamp_us_x_10, 10_000)

# Make sure a process for the device exists
DataUpdater.handle_connection(
realm,
encoded_device_id,
"10.0.0.1",
gen_tracking_id(),
timestamp_us_x_10
)

heartbeat_timestamp = make_timestamp("2023-05-12T18:05:32+00:00")

DataUpdater.handle_internal(
realm,
encoded_device_id,
"/heartbeat",
"",
gen_tracking_id(),
heartbeat_timestamp
)

assert %State{last_seen_message: ^heartbeat_timestamp} =
DataUpdater.dump_state(realm, encoded_device_id)
end

# TODO remove this when all heartbeats will be moved to internal
test "heartbeat message of type heartbeat is correctly handled" do
alias Astarte.DataUpdaterPlant.DataUpdater.State

AMQPTestHelper.clean_queue()

realm = "autotestrealm"

encoded_device_id =
:crypto.strong_rand_bytes(16)
|> Base.url_encode64(padding: false)

{:ok, device_id} = Device.decode_device_id(encoded_device_id)

DatabaseTestHelper.insert_device(device_id)

{:ok, db_client} = Database.connect(realm: realm)

timestamp_us_x_10 = make_timestamp("2017-12-09T14:00:32+00:00")
timestamp_ms = div(timestamp_us_x_10, 10_000)

# Make sure a process for the device exists
DataUpdater.handle_connection(
realm,
encoded_device_id,
"10.0.0.1",
gen_tracking_id(),
timestamp_us_x_10
)

heartbeat_timestamp = make_timestamp("2023-05-12T18:05:32+00:00")

DataUpdater.handle_heartbeat(
realm,
encoded_device_id,
gen_tracking_id(),
heartbeat_timestamp
)

assert %State{last_seen_message: ^heartbeat_timestamp} =
DataUpdater.dump_state(realm, encoded_device_id)
end

defp retrieve_endpoint_id(client, interface_name, interface_major, path) do
query =
DatabaseQuery.new()
Expand Down

0 comments on commit af870f9

Please sign in to comment.