Skip to content

Commit

Permalink
Merge pull request #1046 from osmanhadzic/feature/astarte-export-by-d…
Browse files Browse the repository at this point in the history
…evice-id

Astarte export: astarte export by device_id
  • Loading branch information
Annopaolo authored Jan 16, 2025
2 parents e6a0933 + c711396 commit 5926acc
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 29 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
the `detailed=true` parameter
- [astarte_import] Added support for data types: `doublearray`, `integerarray`,
`booleanarray`, `longintegerarray`, `stringarray`, `datetimearray`, `binaryblobarray`.
- [astarte_export] Added a new command for exporting by device_id.
`mix astarte.export $REALM $FILE_XML $DEVICE_ID`

## [1.2.1] - Unreleased
### Changed
Expand Down
53 changes: 49 additions & 4 deletions tools/astarte_export/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
# astarte_export

## ⚠ Warning

This tool is still in alpha phase, don't rely on it for critical migrations.

Astarte Export is an easy to use tool that allows to exporting all the devices and data from an existing Astarte realm to XML format.

```iex
Expand All @@ -15,6 +11,55 @@ level=info ts=2020-02-03T03:57:21.437+00:00 msg="Export Completed." module=Astar
{:ok, :export_completed}
iex([email protected])7>
```

## Environment variables

``` bash
export export CASSANDRA_DB_HOST="127.0.0.1"
export CASSANDRA_DB_PORT=9042
export CASSANDRA_NODES="localhost:9042"
```
## Exporting Data with Astarte

You can export data from an Astarte realm using the following commands.

Export Data for All Devices in a Realm
To export data for all devices in a realm:

```bash
mix astarte.export <REALM> <FILE_XML>
```
- `<REALM>`: The name of the Astarte realm.
- `<FILE_XML>`: The output file path where the exported data will be saved.

Export Data for All Devices in a Realm
To export data for all devices in a realm:

## Export Data for a Specific Device

To export data for a single device in a realm:

```bash
mix astarte.export <REALM> <FILE_XML> <DEVICE_ID>
```

- `<REALM>`: The name of the Astarte realm.
- `<FILE_XML>`: The output file path where the exported data will be saved.
- `<DEVICE_ID>`: The unique identifier of the device (e.g., "ogmcilZpRDeDWwuNfJr0yA").

## Example Commands

Export all devices in the realm example_realm:

``` bash
mix astarte.export example_realm devices_data.xml
```
Export data for a specific device yKA3CMd07kWaDyj6aMP4Dg in the realm example_realm:

``` bash
mix astarte.export example_realm device_data.xml yKA3CMd07kWaDyj6aMP4Dg
```

The exported realm data is captured in xml_format as below.

```xml
Expand Down
21 changes: 11 additions & 10 deletions tools/astarte_export/lib/astarte/export.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,28 @@ defmodule Astarte.Export do
the arguments are
- realm-name -> This is a string format of input
- file -> file where to export the realm data.
- options -> options to export the realm data.
"""

@spec export_realm_data(String.t(), String.t()) ::
@spec export_realm_data(String.t(), String.t(), keyword()) ::
:ok | {:error, :invalid_parameters} | {:error, any()}

def export_realm_data(realm, file) do
def export_realm_data(realm, file, opts \\ []) do
file = Path.expand(file) |> Path.absname()

with {:ok, fd} <- File.open(file, [:write]) do
generate_xml(realm, fd)
generate_xml(realm, fd, opts)
end
end

defp generate_xml(realm, fd) do
defp generate_xml(realm, fd, opts \\ []) do
Logger.info("Export started.", realm: realm, tag: "export_started")

with {:ok, state} <- XMLGenerate.xml_write_default_header(fd),
{:ok, state} <- XMLGenerate.xml_write_start_tag(fd, {"astarte", []}, state),
{:ok, state} <- XMLGenerate.xml_write_start_tag(fd, {"devices", []}, state),
{:ok, conn} <- FetchData.db_connection_identifier(),
{:ok, state} <- process_devices(conn, realm, fd, state),
{:ok, state} <- process_devices(conn, realm, fd, state, opts),
{:ok, state} <- XMLGenerate.xml_write_end_tag(fd, state),
{:ok, _state} <- XMLGenerate.xml_write_end_tag(fd, state),
:ok <- File.close(fd) do
Expand All @@ -67,20 +68,20 @@ defmodule Astarte.Export do
end
end

defp process_devices(conn, realm, fd, state) do
defp process_devices(conn, realm, fd, state, opts \\ []) do
tables_page_configs = Application.get_env(:xandra, :cassandra_table_page_sizes, [])
page_size = Keyword.get(tables_page_configs, :device_table_page_size, 100)
options = [page_size: page_size]
process_devices(conn, realm, fd, state, options)
process_devices(conn, realm, fd, state, options, opts)
end

defp process_devices(conn, realm, fd, state, options) do
defp process_devices(conn, realm, fd, state, options, opts) do
with {:more_data, device_list, updated_options} <-
FetchData.fetch_device_data(conn, realm, options),
FetchData.fetch_device_data(conn, realm, options, opts),
{:ok, state} <- process_device_list(conn, realm, device_list, fd, state),
{:ok, paging_state} when paging_state != nil <-
Keyword.fetch(updated_options, :paging_state) do
process_devices(conn, realm, fd, state, updated_options)
process_devices(conn, realm, fd, state, updated_options, opts)
else
{:ok, nil} -> {:ok, state}
{:ok, :completed} -> {:ok, state}
Expand Down
6 changes: 3 additions & 3 deletions tools/astarte_export/lib/astarte/fetchdata/fetchdata.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ defmodule Astarte.Export.FetchData do
end
end

def fetch_device_data(conn, realm, opts) do
case Queries.stream_devices(conn, realm, opts) do
def fetch_device_data(conn, realm, options, device_options \\ []) do
case Queries.stream_devices(conn, realm, options, device_options) do
{:ok, result} ->
result_list = Enum.to_list(result)

if result_list == [] do
{:ok, :completed}
else
updated_options = Keyword.put(opts, :paging_state, result.paging_state)
updated_options = Keyword.put(options, :paging_state, result.paging_state)
{:more_data, result_list, updated_options}
end

Expand Down
41 changes: 35 additions & 6 deletions tools/astarte_export/lib/astarte/fetchdata/queries/queries.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
defmodule Astarte.Export.FetchData.Queries do
require IEx
alias Astarte.Core.InterfaceDescriptor
alias Astarte.Core.Mapping
alias Astarte.Core.Device

require Logger

def get_connection() do
Expand Down Expand Up @@ -57,12 +60,27 @@ defmodule Astarte.Export.FetchData.Queries do
end
end

def stream_devices(conn, realm, options) do
devices_statement = """
SELECT * from #{realm}.devices
"""
def stream_devices(conn, realm, options, device_options \\ []) do
device_id = device_id_to_uuid(device_options[:device_id])

params = []
{devices_statement, params} =
case device_id do
nil ->
{
"""
SELECT * from #{realm}.devices
""",
[]
}

device_uuid ->
{
"""
SELECT * from #{realm}.devices WHERE device_id=?
""",
[{"uuid", device_id}]
}
end

options = options ++ [uuid_format: :binary, timestamp_format: :datetime]

Expand Down Expand Up @@ -259,7 +277,9 @@ defmodule Astarte.Export.FetchData.Queries do
rows = Enum.map(result, fn row -> row[:path] end)

if rows == [] do
Logger.info("No paths found for interface_id: #{inspect(interface_id)}", tag: "no_paths_found")
Logger.info("No paths found for interface_id: #{inspect(interface_id)}",
tag: "no_paths_found"
)
else
{:ok, rows}
end
Expand All @@ -285,4 +305,13 @@ defmodule Astarte.Export.FetchData.Queries do
{:error, :database_connection_error}
end
end

defp device_id_to_uuid(device_id) when is_nil(device_id) do
nil
end

defp device_id_to_uuid(device_id) do
{:ok, device_uuid, _} = Device.decode_extended_device_id(device_id)
device_uuid
end
end
32 changes: 26 additions & 6 deletions tools/astarte_export/lib/mix/tasks/astarte_export.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,33 @@ defmodule Mix.Tasks.Astarte.Export do

@impl Mix.Task
@shortdoc "export data from an existing Astarte realm"
def run([realm, filename]) do
case Application.ensure_all_started(:astarte_export) do
{:ok, _} ->
Export.export_realm_data(realm, filename)
def run(args) do
case args do
[realm, file_name] ->
Logger.info("Exporting data from realm #{realm} to file #{file_name}")

{:error, reason} ->
Logger.error("Cannot start applications: #{inspect(reason)}")
case Application.ensure_all_started(:astarte_export) do
{:ok, _} ->
Export.export_realm_data(realm, file_name)

{:error, reason} ->
Logger.error("Cannot start applications: #{inspect(reason)}")
end

[realm, file_name, device_id] ->
Logger.info(
"Exporting data for device #{device_id} from realm #{realm} to file #{file_name}"
)

options = [device_id: device_id]

case Application.ensure_all_started(:astarte_export) do
{:ok, _} ->
Export.export_realm_data(realm, file_name, options)

{:error, reason} ->
Logger.error("Cannot start applications: #{inspect(reason)}")
end
end
end
end

0 comments on commit 5926acc

Please sign in to comment.