-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support device deletion #816
Conversation
Do not merge until astarte-platform/astarte_vmq_plugin#75 and astarte-platform/astarte_rpc#63 are merged. |
Codecov Report
@@ Coverage Diff @@
## master #816 +/- ##
==========================================
+ Coverage 67.42% 67.92% +0.49%
==========================================
Files 264 269 +5
Lines 6429 6781 +352
==========================================
+ Hits 4335 4606 +271
- Misses 2094 2175 +81
|
1ea2a42
to
f914059
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have reported a couple instances where the copyright signature was incorrect, mainly in new files, but in most updated files the copyright wasn't updated.
There is a lot of formatting changes, mainly in 03d5cb2 and 445711b, which make a lot of unnecessary noise in the diff. Consider at least adding a separate commit for the formatting before the commit with actual changes.
I don't know how much/if it is a problem but 445711b isn't formatted with mix format
Logger.warn("Database error while retrieving property: #{inspect(reason)}.") | ||
|
||
{:error, :database_error} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this line was a mistake as the previous entry in the else
block doesn't have an empty line between the Logger.warn
and the returned tuple
with {:ok, _} <- DatabaseQuery.call(client, query) do | ||
:ok | ||
else | ||
%{acc: _, msg: error_message} -> | ||
Logger.warn("Database error when writing end ack device deletion: #{error_message}.") | ||
|
||
{:error, :database_error} | ||
|
||
{:error, reason} -> | ||
Logger.warn("Device deletion end ack failed with reason: #{inspect(reason)}.") | ||
|
||
{:error, :database_error} | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with {:ok, _} <- DatabaseQuery.call(client, query) do | |
:ok | |
else | |
%{acc: _, msg: error_message} -> | |
Logger.warn("Database error when writing end ack device deletion: #{error_message}.") | |
{:error, :database_error} | |
{:error, reason} -> | |
Logger.warn("Device deletion end ack failed with reason: #{inspect(reason)}.") | |
{:error, :database_error} | |
end | |
case DatabaseQuery.call(client, query) do | |
{:ok, _} -> | |
:ok | |
%{acc: _, msg: error_message} -> | |
Logger.warn("Database error when writing end ack device deletion: #{error_message}.") | |
{:error, :database_error} | |
{:error, reason} -> | |
Logger.warn("Device deletion end ack failed with reason: #{inspect(reason)}.") | |
{:error, :database_error} | |
end |
is there a reason we're not using Xandra for new queries?
retrieve_realms!/0
and retrieve_devices_waiting_to_start_deletion!/1
in this file are already using it.
I get that ack_start_device_deletion/2
and check_device_deletion_in_progress/2
are used in a context with a db_client
already defined, but this is not the case for ack_end_device_deletion/2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++, let's use Xandra
for new queries
with {:ok, _} <- DatabaseQuery.call(client, query) do | ||
:ok | ||
else | ||
%{acc: _, msg: error_message} -> | ||
Logger.warn("Database error when writing start ack device deletion: #{error_message}.") | ||
|
||
{:error, :database_error} | ||
|
||
{:error, reason} -> | ||
Logger.warn("Device deletion start ack failed with reason: #{inspect(reason)}.") | ||
|
||
{:error, :database_error} | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as before
with {:ok, result} <- DatabaseQuery.call(client, query), | ||
deletion_row when is_list(deletion_row) <- DatabaseResult.head(result) do | ||
{:ok, true} | ||
else | ||
:empty_dataset -> | ||
{:ok, false} | ||
|
||
%{acc: _, msg: error_message} -> | ||
_ = Logger.warn("Database error: #{error_message}.", tag: "db_error") | ||
{:error, :database_error} | ||
|
||
{:error, reason} -> | ||
_ = | ||
Logger.warn("Database error, reason: #{inspect(reason)}.", | ||
tag: "db_error" | ||
) | ||
|
||
{:error, :database_error} | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with {:ok, result} <- DatabaseQuery.call(client, query), | |
deletion_row when is_list(deletion_row) <- DatabaseResult.head(result) do | |
{:ok, true} | |
else | |
:empty_dataset -> | |
{:ok, false} | |
%{acc: _, msg: error_message} -> | |
_ = Logger.warn("Database error: #{error_message}.", tag: "db_error") | |
{:error, :database_error} | |
{:error, reason} -> | |
_ = | |
Logger.warn("Database error, reason: #{inspect(reason)}.", | |
tag: "db_error" | |
) | |
{:error, :database_error} | |
end | |
case DatabaseQuery.call(client, query) do | |
{:ok, result} -> | |
result_not_empty? = DatabaseResult.head(result) != :empty_dataset | |
{:ok, result_not_empty?} | |
%{acc: _, msg: error_message} -> | |
_ = Logger.warn("Database error: #{error_message}.", tag: "db_error") | |
{:error, :database_error} | |
{:error, reason} -> | |
_ = | |
Logger.warn("Database error, reason: #{inspect(reason)}.", | |
tag: "db_error" | |
) | |
{:error, :database_error} | |
end |
with {:ok, result} <- DatabaseQuery.call(client, query), | ||
deletion_row when is_list(deletion_row) <- DatabaseResult.head(result) do | ||
{:ok, true} | ||
else | ||
:empty_dataset -> | ||
{:ok, false} | ||
|
||
%{acc: _, msg: error_message} -> | ||
_ = Logger.warn("Database error: #{error_message}.", tag: "db_error") | ||
{:error, :database_error} | ||
|
||
{:error, reason} -> | ||
_ = | ||
Logger.warn("Database error, reason: #{inspect(reason)}.", | ||
tag: "db_error" | ||
) | ||
|
||
{:error, :database_error} | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as before
# | ||
# This file is part of Astarte. | ||
# | ||
# Copyright 20230 SECO Mind Srl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Copyright 20230 SECO Mind Srl | |
# Copyright 2023 SECO Mind Srl |
setup_all do | ||
with {:ok, client} <- DatabaseTestHelper.connect_to_test_database() do | ||
DatabaseTestHelper.create_test_keyspace(client) | ||
seed_device_data!() | ||
end | ||
|
||
on_exit(fn -> | ||
with {:ok, client} <- DatabaseTestHelper.connect_to_test_database() do | ||
DatabaseTestHelper.drop_test_keyspace(client) | ||
end | ||
end) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't really matter as we only have one test for now, but being assertive and failing in the setup makes the error message more readable as we only have one global database message for the file and not one for each test
setup_all do | |
with {:ok, client} <- DatabaseTestHelper.connect_to_test_database() do | |
DatabaseTestHelper.create_test_keyspace(client) | |
seed_device_data!() | |
end | |
on_exit(fn -> | |
with {:ok, client} <- DatabaseTestHelper.connect_to_test_database() do | |
DatabaseTestHelper.drop_test_keyspace(client) | |
end | |
end) | |
end | |
setup_all do | |
{:ok, client} = DatabaseTestHelper.connect_to_test_database() | |
DatabaseTestHelper.create_test_keyspace(client) | |
seed_device_data!() | |
on_exit(fn -> | |
{:ok, client} = DatabaseTestHelper.connect_to_test_database() | |
DatabaseTestHelper.drop_test_keyspace(client) | |
end) | |
end |
# my sanity is slowly degrading | ||
aliaz = "ahahahah now I can write 'alias' without Elixir complaining" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about device_alias
?
# | ||
# This file is part of Astarte. | ||
# | ||
# Copyright 2018 Ispirata Srl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Copyright 2018 Ispirata Srl | |
# Copyright 2023 SECO Mind Srl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
# TODO expose this via config | ||
# 5 minutes | ||
@reconciliation_timeout 300 * 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:timer.minutes(5)
with {:ok, _} <- DatabaseQuery.call(client, query) do | ||
:ok | ||
else | ||
%{acc: _, msg: error_message} -> | ||
Logger.warn("Database error when writing end ack device deletion: #{error_message}.") | ||
|
||
{:error, :database_error} | ||
|
||
{:error, reason} -> | ||
Logger.warn("Device deletion end ack failed with reason: #{inspect(reason)}.") | ||
|
||
{:error, :database_error} | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++, let's use Xandra
for new queries
astarte_core, astarte_rpc, astarte_data_access Signed-off-by: Arnaldo Cesco <[email protected]>
e9be707
to
2d2e97e
Compare
Other than the inline suggestions, these have been tackled, too (updated copyright, removed unnecessary formatting changes). TY @noaccOS! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
end | ||
|
||
def init(_args) do | ||
{:ok, %{}, {:continue, []}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given handle_continue
just sends a single message, handling this with handle_continue
adds an unnecessary indirection
{:noreply, state} | ||
end | ||
|
||
defp start_device_deletion!() do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for parentheses for declaration with no arguments. Also, imho having a start_device_deletion!
and a start_device_deletion
which actually do two different things is actually quite confusing. Given start_device_deletion
is 3 lines and it's only used once, I think you can just move its implementation in the anonymous function called by Enum.each
Enum.flat_map(realms, fn %{"realm_name" => realm_name} -> | ||
devices = Queries.retrieve_devices_waiting_to_start_deletion!(realm_name) | ||
Enum.map(devices, &Map.put(&1, "realm_name", realm_name)) | ||
end) | ||
|> Enum.filter(fn %{"realm_name" => realm_name, "device_id" => device_id} -> | ||
should_handle_data_from_device?(realm_name, device_id) | ||
end) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the ability of for-comprehension of doing nested loop and filtering this could be made more readable
for %{"realm_name" => realm_name} <- realms,
%{"device_id" => device_id} <- Queries.retrieve_devices_waiting_to_start_deletion!(realm_name),
encoded_device_id = Device.encode_device_id(device_id),
should_handle_data_from_device?(realm_name, encoded_device_id) do
%{realm_name: realm_name, device_id: encoded_device_id}
end
Moreover I switched the returned map to an atom-keyed map with only the fields you need. This makes the intent more clear by asserting that you're only using those two keys from the result, while if you take the result from the DB and do Map.put
the reader is not sure if you're using the other fields or not (which you are not).
Also, I encode the device ID here so it doesn't have to be encoded twice in should_handle...
and start_device_deletion
. Feel free to change the key to encoded_device_id:
if you think it's clearer.
|> tap(fn devices -> | ||
Logger.debug("Retrieved devices to delete #{inspect(devices)}", tag: "devices_to_delete") | ||
end) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd move this inside the loop and log device-per-device, so you can add realm
and device_id
as log metadata, making it more discoverable with log parsing (while just dumping the list of maps is not parsable by Loki and co)
2d2e97e
to
8301a12
Compare
@@ -98,6 +101,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do | |||
:ok | |||
end | |||
|
|||
def handle_connection(%State{drop_messages: true} = state, _, message_id, _) do | |||
MessageTracker.ack_delivery(state.message_tracker, message_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given we're throwing these away, I'd say that using discard
is more semantically accurate (here and in all similar clauses)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Annopaolo I think this wasn't fixed
Xandra.Cluster.run( | ||
:xandra, | ||
&do_ack_end_device_deletion(&1, realm_name, device_id) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should validate the realm name like we do in Astarte Data Access, otherwise you're interpolating a string in a query without checking it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be handled by #838 once rebased on this PR.
vmq_ack boolean, | ||
dup_start_ack boolean, | ||
dup_end_ack boolean, | ||
PRIMARY KEY ((device_id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for the double parentheses
defp do_check_device_exists(conn, realm_name, device_id) do | ||
statement = """ | ||
SELECT COUNT (*) | ||
FROM #{realm_name}.devices |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, realm_name
should be checked.
defp filter_object_interfaces(realm_name, introspection) do | ||
Enum.reduce_while(introspection, {:ok, []}, fn {interface_name, interface_major}, acc -> | ||
case Queries.retrieve_interface_descriptor(realm_name, interface_name, interface_major) do | ||
# TODO check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check wat
statement = """ | ||
INSERT INTO #{realm_name}.individual_datastreams | ||
(device_id, interface_id, endpoint_id, path, value_timestamp, reception_timestamp, reception_timestamp_submillis, integer_value) | ||
VALUES (:device_id, :interface_id, :endpoint_id, :path, '2017-09-28 04:06+0000', '2017-09-28 05:06+0000', 0, 42); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pass also the values you insert here as a parameter instead of hardcoding them, otherwise the known value magically appears in assertions in the test case and it's not clear where it is coming from.
assert :ok = Devices.delete_device(@realm, device_id) | ||
end | ||
|
||
test "install fails when the device does not exists" do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Install?
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
defmodule Astarte.RealmManagement.DeviceRemover do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I think this whole file is a leftover
_ = Logger.info("Starting to remove device #{encoded_device_id}", tag: "device_delete_start") | ||
|
||
datastream_keys = Queries.retrieve_individual_datastreams_keys!(realm_name, device_id) | ||
:ok = delete_individual_datastreams!(realm_name, datastream_keys) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Matching with :ok
on bang functions is redundant and adds noise, I'd drop it
introspection = retrieve_device_introspection_map!(realm_name, device_id) | ||
object_interfaces = filter_object_interfaces!(realm_name, introspection) | ||
object_tables = Enum.map(object_interfaces, &object_interface_to_table_name/1) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sometimes directly calling Enum
functions and some other times defining private helpers to call them, I'd prefer a consistent behavior. I'd suggest moving the Enum.each
here and leaving as helper only the function that unpacks the struct and uses that to create the arguments for the function call.
If you combine this with dropping the match on :ok
, you can easily group functions in pipes which end up handling a specific side effect, e.g:
# Delete individual datastreams
Queries.retrieve_individual_datastreams_keys!(realm_name, device_id)
|> Enum.each(&delete_individual_datastreams_from_key!/1)
...
end)
defp delete_individual_datastreams_from_key!(key) do
%{
device_id: device_id,
interface_id: interface_id,
endpoint_id: endpoint_id,
path: path
} = key
Queries.delete_individual_datastream_values!(
realm_name,
device_id,
interface_id,
endpoint_id,
path
)
end
7defc29
to
a3d3018
Compare
Signed-off-by: Arnaldo Cesco <[email protected]>
Signed-off-by: Arnaldo Cesco <[email protected]>
Device deletion begins when a device_id is put in the `deletion_in_progress` table. The actual deletion of data is started only when it has been acknowledged by both DUP (two times) and the broker (once). The DeviceRemoverSupervisor supervises DeviceRemover tasks. Since deletion is asynchronous, it may happen that the RM service goes down during deletion, or that a temporary filure brings down the deletion service. Therefore, a supervision tree is put in place. Data deletion is started (or resumed, if needed) by the DeviceRemoval.Scheduler and it is carried out by the DeviceRemoval.DeviceRemover Task. The DeviceRemoverSupervisor supervises DeviceRemovers.
astarte_core and astarte_rpc Signed-off-by: Arnaldo Cesco <[email protected]>
Allow to delete a device with an HTTP request using the DELETE method on the `/{realm_name}/devices/{device_id}` endpoint. Signed-off-by: Arnaldo Cesco <[email protected]>
Signed-off-by: Arnaldo Cesco <[email protected]>
astarte_core, astarte_rpc, astarte-data_access Signed-off-by: Arnaldo Cesco <[email protected]>
Add a migration for creating the `deletion_in_progress` table. Signed-off-by: Arnaldo Cesco <[email protected]>
Track devices currently under deletion in the `deletion_in_progress` table. It also contains fields to track the acknowledgement of the deletion by DUP and the broker. Signed-off-by: Arnaldo Cesco <[email protected]>
Bump cyanide, astarte_core, astarte_rpc, astarte-data_access. Contextually, remove some outdated macros. Signed-off-by: Arnaldo Cesco <[email protected]>
a3d3018
to
23cde71
Compare
A DataUpdater process checks periodically if the related device is being deleted. If that is the case, the process updates its `deletion_in_progress` state subfield to `true` and writes to db that deletion has started (`dup_start_ack` field). From now on, all received messages will be acked but ignored, in order to end consuming all inflight messages. The DataUpdater also performs an RPC to the broker to forcefully disconnect the device and prevent it from reconnecting. The last message to be processed by the DataUpdater is by construction received on the internal `"/f"` path (see astarte-platform/astarte_vmq_plugin#75). After that, the DUP deletion end ack is written to the database and the DataUpdater process is terminated. In order to allow deletion of offline (or old) devices, the RemovalScheduler periodically checks the `deletion_in_progress` table and makes the device process start just to perform the deletion check. Signed-off-by: Arnaldo Cesco <[email protected]>
Add the Mox library in order to mock the Astarte RPC client. Signed-off-by: Arnaldo Cesco <[email protected]>
Signed-off-by: Arnaldo Cesco <[email protected]>
Now the Astarte VerneMQ Plugin needs access to the astarte database. Add the relevant env var in the docker-compose.yml file to allow it to connect. Signed-off-by: Arnaldo Cesco <[email protected]>
23cde71
to
a47a2a8
Compare
A device may be deleted with a DELETE request on the
/:realm_name/devices/:device_id
endpoint of Realm Management API. Deletion is then performed asynchronously.When a device is being deleted, it is also disconnected from Astarte. Already stored data may still be available until deletion is complete. All inflight data coming from the device are discarded. A synchronization mechanism makes sure that data deletion is performed only when all inflight messages have been handled (i.e. discarded) and the device is not connected to Astarte.
In detail, the deletion procedure follows the steps outlined below:
deletion_in_progress
tabledeletion_in_progress
flag to true and writes thedup_start_ack
in thedeletion_in_progress
table. From now on, all received messages will be acked but ignored.deletion_in_progress
table and starts the DataUpdater process of offline (or old) devices with a:start_device_deletion
message.deletion_in_progress
table. Then the broker writes thevmq_ack
in thedeletion_in_progress
table and sends an empty message on the"/internal"
topic related to the device to signal that messages from the device are finished.dup_end_ack
in thedeletion_in_progress
table and terminates itselfdeletion_in_progress
table and spawns a task for deleting data from devices whose deletion has been acked both by the broker and the DataUpdater.Related to astarte-platform/astarte_vmq_plugin#75 and based, based on astarte-platform/astarte_rpc#63.
Closes #392.