Skip to content

Commit

Permalink
Support device deletion
Browse files Browse the repository at this point in the history
When a device is being deleted, it is also disconnected.
Disconnection due to deletion is required via RPC from
DataUpdaterPlant. After the device has been disconnected,
the broker sends a message on the internal `"/f"` (for
"farewell") topic to signal that the emission of messages
from the device has ended. Then, the deletion ack for
the device is written to database.
A device under deletion cannot be allowed to reconnect
until deletion is completed. This is made sure by
checking the database in the `auth_on_register` plugin
hook.

Signed-off-by: Arnaldo Cesco <[email protected]>
  • Loading branch information
Annopaolo committed Sep 8, 2023
1 parent 19048c2 commit 968c384
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 16 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
(defaults to `true`)
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__SSL_CUSTOM_SNI`
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__CASSANDRA__SSL_CA_FILE`
- Added support for device deletion. During deletion, a device is
disconnected and not allowed to reconnect until deletion ends.
Inflight messages are discarded. After deletion, a device must be
registered again in order to connect to Astarte.

## [1.1.0] - 2023-06-20

Expand Down
95 changes: 82 additions & 13 deletions lib/astarte_vmq_plugin.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017 Ispirata Srl
# Copyright 2017 - 2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,8 @@ defmodule Astarte.VMQ.Plugin do
alias Astarte.VMQ.Plugin.AMQPClient
alias Astarte.VMQ.Plugin.Connection.Synchronizer
alias Astarte.VMQ.Plugin.Connection.Synchronizer.Supervisor, as: SynchronizerSupervisor
alias Astarte.VMQ.Plugin.Queries
alias Astarte.Core.Device

@max_rand trunc(:math.pow(2, 32) - 1)

Expand All @@ -38,16 +40,7 @@ defmodule Astarte.VMQ.Plugin do
# Not a device, let someone else decide
:next
else
subscriber_id = {mountpoint, username}
# TODO: we probably want some of these values to be configurable in some way
{:ok,
[
subscriber_id: subscriber_id,
max_inflight_messages: 100,
max_message_size: 65535,
retry_interval: 20000,
upgrade_qos: false
]}
authorize_registration(mountpoint, username)
end
end

Expand Down Expand Up @@ -181,6 +174,14 @@ defmodule Astarte.VMQ.Plugin do
end
end

def ack_device_deletion(realm_name, encoded_device_id) do
timestamp = now_us_x10_timestamp()
publish_internal_message(realm_name, encoded_device_id, "/f", "", timestamp)
{:ok, decoded_device_id} = Device.decode_device_id(encoded_device_id)
{:ok, _} = Queries.ack_device_deletion(realm_name, decoded_device_id)
:ok
end

defp setup_heartbeat_timer(realm, device_id, session_pid) do
args = [realm, device_id, session_pid]
interval = Config.device_heartbeat_interval_ms() |> randomize_interval(0.25)
Expand Down Expand Up @@ -213,6 +214,12 @@ defmodule Astarte.VMQ.Plugin do
publish(realm, device_id, payload, "control", timestamp, additional_headers)
end

defp publish_internal_message(realm, device_id, internal_path, payload, timestamp) do
additional_headers = [x_astarte_internal_path: internal_path]

publish(realm, device_id, payload, "internal", timestamp, additional_headers)
end

def publish_event(client_id, event_string, timestamp, additional_headers \\ []) do
with [realm, device_id] <- String.split(client_id, "/") do
publish(realm, device_id, "", event_string, timestamp, additional_headers)
Expand All @@ -224,10 +231,9 @@ defmodule Astarte.VMQ.Plugin do
end

defp publish_heartbeat(realm, device_id) do
additional_headers = [x_astarte_internal_path: "/heartbeat"]
timestamp = now_us_x10_timestamp()

publish(realm, device_id, "", "internal", timestamp, additional_headers)
publish_internal_message(realm, device_id, "/heartbeat", "", timestamp)
end

defp publish(realm, device_id, payload, event_string, timestamp, additional_headers \\ []) do
Expand Down Expand Up @@ -272,4 +278,67 @@ defmodule Astarte.VMQ.Plugin do
{:error, {:already_started, pid}} -> pid
end
end

defp authorize_registration(mountpoint, username) do
[realm, device_id] = String.split(username, "/")
{:ok, decoded_device_id} = Device.decode_device_id(device_id, allow_extended_id: true)

cond do
not device_exists?(realm, decoded_device_id) ->
{:error, :device_does_not_exist}

device_deletion_in_progress?(realm, decoded_device_id) ->
{:error, :device_deletion_in_progress}

true ->
{:ok, registration_modifiers(mountpoint, username)}
end
end

defp registration_modifiers(mountpoint, username) do
# TODO: we probably want some of these values to be configurable in some way
[
subscriber_id: {mountpoint, username},
max_inflight_messages: 100,
max_message_size: 65535,
retry_interval: 20000,
upgrade_qos: false
]
end

defp device_exists?(realm, device_id) do
case Queries.check_if_device_exists(realm, device_id) do
{:ok, result} ->
result

{:error, :invalid_realm_name} ->
false

# Allow a device to connect even if right now the DB is not available
{:error, %Xandra.ConnectionError{}} ->
true

# Allow a device to connect even if right now the DB is not available
{:error, %Xandra.Error{}} ->
true
end
end

defp device_deletion_in_progress?(realm, device_id) do
case Queries.check_device_deletion_in_progress(realm, device_id) do
{:ok, result} ->
result

{:error, :invalid_realm_name} ->
false

{:error, %Xandra.ConnectionError{}} ->
# Be conservative: if the device is not being deleted but we can't reach the DB, it will try to connect again when DB is available
true

{:error, %Xandra.Error{}} ->
# Be conservative: if the device is not being deleted but we can't reach the DB, it will try to connect again when DB is available
true
end
end
end
12 changes: 11 additions & 1 deletion lib/astarte_vmq_plugin/rpc/handler.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2018 Ispirata Srl
# Copyright 2018 - 2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@ defmodule Astarte.VMQ.Plugin.RPC.Handler do

alias Astarte.RPC.Protocol.VMQ.Plugin.{
Call,
Delete,
Disconnect,
GenericErrorReply,
GenericOkReply,
Expand Down Expand Up @@ -69,6 +70,15 @@ defmodule Astarte.VMQ.Plugin.RPC.Handler do
end
end

defp call_rpc({:delete, %Delete{realm_name: realm_name, device_id: device_id}}) do
client_id = "#{realm_name}/#{device_id}"
# Either the client has been deleted or it is :not_found,
# which means that there is no session anyway.
Plugin.disconnect_client(client_id, true)
Plugin.ack_device_deletion(realm_name, device_id)
generic_ok()
end

defp call_rpc({:publish, %Publish{topic_tokens: []}}) do
Logger.warn("Publish with empty topic_tokens", tag: "publish_empty_topic_tokens")
generic_error(:empty_topic_tokens, "empty topic tokens")
Expand Down
5 changes: 3 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ defmodule Astarte.VMQ.Plugin.Mixfile do
]
end

defp elixirc_paths(:test), do: ["lib", "test/support"]
# Compile order is important to make sure support files are available when testing
defp elixirc_paths(:test), do: ["test/support", "lib"]
defp elixirc_paths(_), do: ["lib"]

defp dialyzer_cache_directory(:ci) do
Expand All @@ -77,7 +78,7 @@ defmodule Astarte.VMQ.Plugin.Mixfile do

defp astarte_required_modules(_) do
[
{:astarte_rpc, github: "astarte-platform/astarte_rpc"},
{:astarte_rpc, github: "Annopaolo/astarte_rpc", branch: "delete-device"},
{:astarte_core, github: "astarte-platform/astarte_core"}
]
end
Expand Down

0 comments on commit 968c384

Please sign in to comment.