From 0495a2b2eaedb73174d783d37df6f77bccdf31a2 Mon Sep 17 00:00:00 2001 From: Osman Hadzic Date: Mon, 23 Dec 2024 10:43:57 +0100 Subject: [PATCH] Astarte export: Add support for parametric interfaces Fetch endpoint paths for data mappings and update related processing logic. Closes #1028 Signed-off-by: Osman Hadzic --- tools/astarte_export/lib/astarte/export.ex | 6 +- .../lib/astarte/fetchdata/fetchdata.ex | 33 +++++++++- .../lib/astarte/fetchdata/queries/queries.ex | 64 ++++++++++++++++++- 3 files changed, 94 insertions(+), 9 deletions(-) diff --git a/tools/astarte_export/lib/astarte/export.ex b/tools/astarte_export/lib/astarte/export.ex index f50b28f33..6c6d03cba 100644 --- a/tools/astarte_export/lib/astarte/export.ex +++ b/tools/astarte_export/lib/astarte/export.ex @@ -185,9 +185,7 @@ defmodule Astarte.Export do defp process_object_streams(conn, realm, mappings, interface_info, fd, state, opts) do [h | _t] = mappings - fullpath = h.endpoint - [_, endpointprefix, _] = String.split(fullpath, "/") - path = "/" <> endpointprefix + path = "" <> h.path sub_paths_info = Enum.reduce(mappings, [], fn mapping, acc1 -> @@ -215,7 +213,7 @@ defmodule Astarte.Export do defp process_individual_streams(conn, realm, [h | t], interface_info, fd, state, opts) do with {:ok, state} <- - XMLGenerate.xml_write_start_tag(fd, {"datastream", [path: h.endpoint]}, state), + XMLGenerate.xml_write_start_tag(fd, {"datastream", [path: h.path]}, state), {:ok, state} <- do_process_individual_streams(conn, realm, h, interface_info, fd, state, opts), {:ok, state} <- XMLGenerate.xml_write_end_tag(fd, state) do diff --git a/tools/astarte_export/lib/astarte/fetchdata/fetchdata.ex b/tools/astarte_export/lib/astarte/fetchdata/fetchdata.ex index 50bb4a331..dca48fdf7 100644 --- a/tools/astarte_export/lib/astarte/fetchdata/fetchdata.ex +++ b/tools/astarte_export/lib/astarte/fetchdata/fetchdata.ex @@ -168,6 +168,21 @@ defmodule Astarte.Export.FetchData do {:ok, mappings} = Queries.fetch_interface_mappings(conn, realm, interface_id, []) mappings = Enum.sort_by(mappings, fn mapping -> mapping.endpoint end) + mappings = + Enum.map(mappings, fn mapping -> + path = + fetch_all_endpoint_paths( + conn, + realm, + interface_id, + device_id, + mapping.endpoint_id, + aggregation + ) + + Map.put(mapping, :path, path |> Enum.at(0) || mapping.endpoint) + end) + interface_attributes = [ interface_name: interface_name, major_version: to_string(major_version), @@ -203,6 +218,20 @@ defmodule Astarte.Export.FetchData do {:ok, mapped_interfaces} end + defp fetch_all_endpoint_paths(conn, realm, interface_id, device_id, endpoint_id, aggregation) do + with {:ok, result} <- + Queries.retrieve_all_endpoint_paths( + conn, + realm, + interface_id, + device_id, + endpoint_id, + aggregation + ) do + result + end + end + def fetch_individual_datastreams(conn, realm, mapping, interface_info, options) do %{ device_id: device_id, @@ -210,7 +239,7 @@ defmodule Astarte.Export.FetchData do } = interface_info endpoint_id = mapping.endpoint_id - path = mapping.endpoint + path = mapping.path data_type = mapping.value_type data_field = CQLUtils.type_to_db_column_name(data_type) @@ -300,7 +329,7 @@ defmodule Astarte.Export.FetchData do interface_id: interface_id } = interface_info - path = mapping.endpoint + path = mapping.path endpoint_id = mapping.endpoint_id data_type = mapping.value_type data_field = CQLUtils.type_to_db_column_name(data_type) diff --git a/tools/astarte_export/lib/astarte/fetchdata/queries/queries.ex b/tools/astarte_export/lib/astarte/fetchdata/queries/queries.ex index 2d493a6bf..17d406dec 100644 --- a/tools/astarte_export/lib/astarte/fetchdata/queries/queries.ex +++ b/tools/astarte_export/lib/astarte/fetchdata/queries/queries.ex @@ -13,7 +13,9 @@ defmodule Astarte.Export.FetchData.Queries do {:ok, xandra_conn} else {:error, reason} -> - Logger.error("DB connection setup failed: #{inspect(reason)}", tag: "db_connection_failed") + Logger.error("DB connection setup failed: #{inspect(reason)}", + tag: "db_connection_failed" + ) end end @@ -124,8 +126,8 @@ defmodule Astarte.Export.FetchData.Queries do options ) do properties_statement = """ - SELECT #{data_type}, reception_timestamp from #{realm}.individual_properties - where device_id=? AND interface_id=? AND endpoint_id=? AND path=? + SELECT #{data_type}, reception_timestamp from #{realm}.individual_properties + where device_id=? AND interface_id=? AND endpoint_id=? AND path=? """ params = [{"uuid", device_id}, {"uuid", interface_id}, {"uuid", endpoint_id}, {"text", path}] @@ -227,4 +229,60 @@ defmodule Astarte.Export.FetchData.Queries do {:error, :database_connection_error} end end + + def retrieve_all_endpoint_paths(conn, realm, interface_id, device_id, endpoint_id, aggregation) do + {all_paths_statement, params} = + case aggregation do + :object -> + { + """ + SELECT path + FROM #{realm}.individual_properties + WHERE device_id=? AND interface_id=? + """, + [{"uuid", device_id}, {"uuid", interface_id}] + } + + :individual -> + { + """ + SELECT path + FROM #{realm}.individual_properties + WHERE device_id=? AND interface_id=? AND endpoint_id=? + """, + [{"uuid", device_id}, {"uuid", interface_id}, {"uuid", endpoint_id}] + } + end + + with {:ok, result} <- + Xandra.execute(conn, all_paths_statement, params) do + rows = Enum.map(result, fn row -> row[:path] end) + + if rows == [] do + Logger.info("No paths found for interface_id: #{inspect(interface_id)}", tag: "no_paths_found") + else + {:ok, rows} + end + + {:ok, rows} + else + {:error, %Xandra.Error{message: message}} -> + Logger.error("database error: #{inspect(message)}.", + realm: realm, + device_id: device_id, + tag: "database_error" + ) + + {:error, :database_error} + + {:error, %Xandra.ConnectionError{} = err} -> + Logger.error("database connection error: #{inspect(err)}.", + realm: realm, + device_id: device_id, + tag: "database_connection_error" + ) + + {:error, :database_connection_error} + end + end end