Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support device deletion #816

Merged
merged 17 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/astarte_data_updater_plant/config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ config :astarte_data_updater_plant, :amqp_consumer_options,
config :logger, :console,
format: {PrettyLog.UserFriendlyFormatter, :format},
metadata: [:realm, :device_id, :function]

config :astarte_data_updater_plant, :rpc_client, MockRPCClient
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2019 Ispirata Srl
# Copyright 2019 - 2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@ defmodule Astarte.DataUpdaterPlant.ConsumersSupervisor do

alias Astarte.DataUpdaterPlant.AMQPDataConsumer
alias Astarte.DataUpdaterPlant.Config
alias Astarte.DataUpdater.DeletionScheduler

def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
Expand All @@ -34,7 +35,8 @@ defmodule Astarte.DataUpdaterPlant.ConsumersSupervisor do
children = [
{Registry, [keys: :unique, name: Registry.AMQPDataConsumer]},
{AMQPDataConsumer.ConnectionManager, amqp_opts: Config.amqp_consumer_options!()},
AMQPDataConsumer.Supervisor
AMQPDataConsumer.Supervisor,
DeletionScheduler
]

opts = [strategy: :rest_for_one, name: __MODULE__]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017 Ispirata Srl
# Copyright 2017 - 2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -160,6 +160,15 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater do
|> GenServer.call({:dump_state})
end

def start_device_deletion(realm, encoded_device_id, timestamp) do
with :ok <- verify_device_exists(realm, encoded_device_id) do
message_tracker = get_message_tracker(realm, encoded_device_id, offload_start: true)

get_data_updater_process(realm, encoded_device_id, message_tracker, offload_start: true)
|> GenServer.call({:start_device_deletion, timestamp})
end
end

def get_data_updater_process(realm, encoded_device_id, message_tracker, opts \\ []) do
with {:ok, device_id} <- Device.decode_device_id(encoded_device_id) do
case Registry.lookup(Registry.DataUpdater, {realm, device_id}) do
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#
# This file is part of Astarte.
#
# Copyright 2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
defmodule Astarte.DataUpdater.DeletionScheduler do
@moduledoc """
This module sends messages to start deletion to a
Astarte.DataUpdater.Server. When a deletion notice
is received, the Server will start the device deletion
procedure, write the dup_start_ack to db, and
synchronously acknowledge it to the Scheduler.
"""
use GenServer

alias Astarte.DataUpdaterPlant.DataUpdater.Queries
alias Astarte.DataUpdaterPlant.DataUpdater
alias Astarte.DataUpdaterPlant.Config
alias Astarte.Core.Device

require Logger

# TODO expose this via config
@reconciliation_timeout :timer.minutes(5)
def start_link(_args) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

def init(_args) do
# TODO: manually start_device_deletion!() when needed
schedule_next_device_deletion()
{:ok, %{}}
end

def handle_info(:delete_devices, state) do
_ = Logger.debug("Reconciling devices for whom deletion shall begin")

start_device_deletion!()
schedule_next_device_deletion()
{:noreply, state}
end

defp start_device_deletion! do
retrieve_devices_to_delete!()
|> Enum.each(fn %{realm_name: realm_name, encoded_device_id: encoded_device_id} ->
timestamp = now_us_x10_timestamp()
# This must be a call, as we want to be sure this was completed
:ok = DataUpdater.start_device_deletion(realm_name, encoded_device_id, timestamp)
end)
end

defp schedule_next_device_deletion do
Process.send_after(self(), :delete_devices, @reconciliation_timeout)
end

defp retrieve_devices_to_delete! do
realms = Queries.retrieve_realms!()

for %{"realm_name" => realm_name} <- realms,
%{"device_id" => device_id} <-
Queries.retrieve_devices_waiting_to_start_deletion!(realm_name),
encoded_device_id = Device.encode_device_id(device_id),
should_handle_data_from_device?(realm_name, encoded_device_id) do
_ =
Logger.debug("Retrieved device to delete",
tag: "device_to_delete",
realm_name: realm_name,
device_id: encoded_device_id
)

%{realm_name: realm_name, encoded_device_id: encoded_device_id}
end
end

defp should_handle_data_from_device?(realm_name, encoded_device_id) do
# TODO extract a function from Astarte.DataUpdaterPlant.AMQPDataConsumer
# This is the same sharding algorithm used in astarte_vmq_plugin
# Make sure they stay in sync
queue_index =
{realm_name, encoded_device_id}
|> :erlang.phash2(Config.data_queue_total_count!())

queue_index >= Config.data_queue_range_start!() and
queue_index <= Config.data_queue_range_end!()
end

# TODO this is copied from astarte_vmq_plugin
defp now_us_x10_timestamp do
DateTime.utc_now()
|> DateTime.to_unix(:microsecond)
|> Kernel.*(10)
end
end
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017 Ispirata Srl
# Copyright 2017 - 2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,6 +50,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
@interface_lifespan_decimicroseconds 60 * 10 * 1000 * 10000
@device_triggers_lifespan_decimicroseconds 60 * 10 * 1000 * 10000
@groups_lifespan_decimicroseconds 60 * 10 * 1000 * 10000
@deletion_refresh_lifespan_decimicroseconds 60 * 10 * 1000 * 10000

def init_state(realm, device_id, message_tracker) do
MessageTracker.register_data_updater(message_tracker)
Expand All @@ -74,7 +75,9 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
last_seen_message: 0,
last_device_triggers_refresh: 0,
last_groups_refresh: 0,
trigger_id_to_policy_name: %{}
trigger_id_to_policy_name: %{},
discard_messages: false,
last_deletion_in_progress_refresh: 0
}

encoded_device_id = Device.encode_device_id(device_id)
Expand All @@ -98,6 +101,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
:ok
end

def handle_connection(%State{discard_messages: true} = state, _, message_id, _) do
MessageTracker.discard(state.message_tracker, message_id)
state
end

def handle_connection(state, ip_address_string, message_id, timestamp) do
{:ok, db_client} = Database.connect(realm: state.realm)

Expand Down Expand Up @@ -153,6 +161,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
%{new_state | connected: true, last_seen_message: timestamp}
end

def handle_heartbeat(%State{discard_messages: true} = state, _, message_id, _) do
MessageTracker.discard(state.message_tracker, message_id)
state
end

# TODO make this private when all heartbeats will be moved to internal
def handle_heartbeat(state, message_id, timestamp) do
{:ok, db_client} = Database.connect(realm: state.realm)
Expand All @@ -168,7 +181,14 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
end

def handle_internal(state, "/heartbeat", _payload, message_id, timestamp) do
handle_heartbeat(state, message_id, timestamp)
{:continue, handle_heartbeat(state, message_id, timestamp)}
end

def handle_internal(%State{discard_messages: true} = state, "/f", _, message_id, _) do
:ok = Queries.ack_end_device_deletion(state.realm, state.device_id)
_ = Logger.info("End device deletion acked.", tag: "device_delete_ack")
MessageTracker.ack_delivery(state.message_tracker, message_id)
{:stop, state}
end

def handle_internal(state, path, payload, message_id, timestamp) do
Expand Down Expand Up @@ -200,7 +220,16 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
timestamp
)

update_stats(new_state, "", nil, path, payload)
{:continue, update_stats(new_state, "", nil, path, payload)}
end

def start_device_deletion(state, timestamp) do
{:ok, db_client} = Database.connect(realm: state.realm)

# Device deletion is among time-based actions
new_state = execute_time_based_actions(state, timestamp, db_client)

{:ok, new_state}
end

def handle_disconnection(state, message_id, timestamp) do
Expand Down Expand Up @@ -456,6 +485,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
:ok
end

def handle_data(%State{discard_messages: true} = state, _, _, _, message_id, _) do
MessageTracker.discard(state.message_tracker, message_id)
state
end

def handle_data(state, interface, path, payload, message_id, timestamp) do
{:ok, db_client} = Database.connect(realm: state.realm)

Expand Down Expand Up @@ -1147,6 +1181,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
}
end

def handle_introspection(%State{discard_messages: true} = state, _, message_id, _) do
MessageTracker.discard(state.message_tracker, message_id)
state
end

def handle_introspection(state, payload, message_id, timestamp) do
with {:ok, new_introspection_list} <- PayloadsDecoder.parse_introspection(payload) do
process_introspection(state, new_introspection_list, payload, message_id, timestamp)
Expand Down Expand Up @@ -1409,6 +1448,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
}
end

def handle_control(%State{discard_messages: true} = state, _, _, message_id, _) do
MessageTracker.discard(state.message_tracker, message_id)
state
end

def handle_control(state, "/producer/properties", <<0, 0, 0, 0>>, message_id, timestamp) do
{:ok, db_client} = Database.connect(realm: state.realm)

Expand Down Expand Up @@ -1575,6 +1619,16 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
update_stats(new_state, "", nil, path, payload)
end

def handle_install_volatile_trigger(
%State{discard_messages: true} = state,
_,
message_id,
_
) do
MessageTracker.ack_delivery(state.message_tracker, message_id)
state
end

def handle_install_volatile_trigger(
state,
object_id,
Expand Down Expand Up @@ -1668,6 +1722,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
end
end

def handle_delete_volatile_trigger(%State{discard_messages: true} = state, _, message_id, _) do
MessageTracker.discard(state.message_tracker, message_id)
state
end

def handle_delete_volatile_trigger(state, trigger_id) do
{new_volatile, maybe_trigger} =
Enum.reduce(state.volatile_triggers, {[], nil}, fn item, {acc, found} ->
Expand Down Expand Up @@ -1836,6 +1895,55 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
|> reload_groups_on_expiry(timestamp, db_client)
|> purge_expired_interfaces(timestamp)
|> reload_device_triggers_on_expiry(timestamp, db_client)
|> reload_device_deletion_status_on_expiry(timestamp, db_client)
end

defp reload_device_deletion_status_on_expiry(state, timestamp, db_client) do
if state.last_deletion_in_progress_refresh + @deletion_refresh_lifespan_decimicroseconds <=
timestamp do
new_state = maybe_start_device_deletion(db_client, state, timestamp)
%State{new_state | last_deletion_in_progress_refresh: timestamp}
else
state
end
end

defp maybe_start_device_deletion(db_client, state, timestamp) do
if should_start_device_deletion?(state.realm, state.device_id) do
encoded_device_id = Device.encode_device_id(state.device_id)

:ok = force_device_deletion_from_broker(state.realm, encoded_device_id)
new_state = set_device_disconnected(state, db_client, timestamp)

_ =
Logger.info("Stop handling data from device in deletion, device_id #{encoded_device_id}")

# It's ok to repeat that, as we always write ⊤
Queries.ack_start_device_deletion(state.realm, state.device_id)

%State{new_state | discard_messages: true}
else
state
end
end

defp should_start_device_deletion?(realm_name, device_id) do
case Queries.check_device_deletion_in_progress(realm_name, device_id) do
{:ok, true} ->
true

{:ok, false} ->
false

{:error, reason} ->
_ =
Logger.warn(
"Cannot check device deletion status for #{inspect(device_id)}, reason #{inspect(reason)}",
tag: "should_start_device_deletion_fail"
)

false
end
end

defp purge_expired_interfaces(state, timestamp) do
Expand Down Expand Up @@ -2166,6 +2274,24 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
end
end

defp force_device_deletion_from_broker(realm, encoded_device_id) do
_ = Logger.info("Disconnecting device to be deleted, device_id #{encoded_device_id}")

case VMQPlugin.delete(realm, encoded_device_id) do
# Successfully disconnected
:ok ->
:ok

# Not found means it was already disconnected, succeed anyway
{:error, :not_found} ->
:ok

# Some other error, return it
{:error, reason} ->
{:error, reason}
end
end

defp get_on_data_triggers(state, event, interface_id, endpoint_id) do
key = {event, interface_id, endpoint_id}

Expand Down
Loading