Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queries edit for keyspace separation feature #85

Merged
merged 3 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
disconnected and not allowed to reconnect until deletion ends.
Inflight messages are discarded. After deletion, a device must be
registered again in order to connect to Astarte.
- Added support for multiple Astarte instances sharing the same database,
the following env variable has been added:
- `DOCKER_VERNEMQ_ASTARTE_VMQ_PLUGIN__ASTARTE_INSTANCE_ID`
(defaults to ``)

### Changed
- Update Elixir to 1.15.7.
Expand Down
4 changes: 4 additions & 0 deletions lib/astarte_vmq_plugin/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ defmodule Astarte.VMQ.Plugin.Config do
Application.get_env(:astarte_vmq_plugin, :registry_mfa)
end

def astarte_instance_id do
Application.get_env(:astarte_vmq_plugin, :astarte_instance_id, "") |> to_string()
end

def device_heartbeat_interval_ms do
Application.get_env(
:astarte_vmq_plugin,
Expand Down
7 changes: 6 additions & 1 deletion lib/astarte_vmq_plugin/queries.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ defmodule Astarte.VMQ.Plugin.Queries do

alias Astarte.Core.Device
alias Astarte.Core.Realm
alias Astarte.Core.CQLUtils
alias Astarte.VMQ.Plugin.Config

@doc """
Checks whether a device row exists in Astarte database (i.e. it has at least been registered).
Expand Down Expand Up @@ -130,8 +132,11 @@ defmodule Astarte.VMQ.Plugin.Queries do
end

defp use_realm(conn, realm) when is_binary(realm) do
keyspace_name =
CQLUtils.realm_name_to_keyspace_name(realm, Config.astarte_instance_id())

with :ok <- verify_realm(realm),
{:ok, %Xandra.SetKeyspace{}} <- Xandra.execute(conn, "USE #{realm}") do
{:ok, %Xandra.SetKeyspace{}} <- Xandra.execute(conn, "USE #{keyspace_name}") do
:ok
end
end
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%{
"amqp": {:hex, :amqp, "3.3.0", "056d9f4bac96c3ab5a904b321e70e78b91ba594766a1fc2f32afd9c016d9f43b", [:mix], [{:amqp_client, "~> 3.9", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "8d3ae139d2646c630d674a1b8d68c7f85134f9e8b2a1c3dd5621616994b10a8b"},
"amqp_client": {:hex, :amqp_client, "3.12.10", "dcc0d5d0037fa2b486c6eb8b52695503765b96f919e38ca864a7b300b829742d", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:rabbit_common, "3.12.10", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "16a23959899a82d9c2534ed1dcf1fa281d3b660fb7f78426b880647f0a53731f"},
"astarte_core": {:git, "https://github.com/astarte-platform/astarte_core.git", "685ca10c7a07cc9806f2c6fc7ec2ed1b4d23cbec", []},
"astarte_core": {:git, "https://github.com/astarte-platform/astarte_core.git", "dc964b7d9b3a3a4e20127b763705d9e53bd88890", []},
"astarte_rpc": {:git, "https://github.com/astarte-platform/astarte_rpc.git", "5adf50beffa0bac18d99ebe378bc677c7669a767", []},
"castore": {:hex, :castore, "0.1.22", "4127549e411bedd012ca3a308dede574f43819fe9394254ca55ab4895abfa1a2", [:mix], [], "hexpm", "c17576df47eb5aa1ee40cc4134316a99f5cad3e215d5c77b8dd3cfef12a22cac"},
"certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"},
Expand Down
6 changes: 6 additions & 0 deletions priv/astarte_vmq_plugin.schema
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,9 @@
Splitted = string:split(Nodes, ",", all),
lists:map(fun(E) -> string:trim(E, both) end, Splitted)
end}.

{mapping, "astarte_vmq_plugin.astarte_instance_id", "astarte_vmq_plugin.astarte_instance_id", [
{default, ""},
{datatype, string}
]}.

20 changes: 12 additions & 8 deletions test/support/database_test_helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,53 @@ defmodule Astarte.VMQ.Plugin.DatabaseTestHelper do
require Logger
alias Astarte.Core.Device
import ExUnit.Assertions
alias Astarte.Core.CQLUtils
alias Astarte.VMQ.Plugin.Config

@test_keyspace CQLUtils.realm_name_to_keyspace_name("test", Config.astarte_instance_id())

@create_test_keyspace """
CREATE KEYSPACE test
CREATE KEYSPACE #{@test_keyspace}
WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND
durable_writes = true;
"""

@create_devices_table """
CREATE TABLE test.devices (
CREATE TABLE #{@test_keyspace}.devices (
device_id uuid,
PRIMARY KEY (device_id)
);
"""

@create_deletion_in_progress_table """
CREATE TABLE test.deletion_in_progress (
CREATE TABLE #{@test_keyspace}.deletion_in_progress (
device_id uuid,
vmq_ack boolean,
PRIMARY KEY (device_id)
);
"""

@insert_device_into_devices """
INSERT INTO test.devices (device_id)
INSERT INTO #{@test_keyspace}.devices (device_id)
VALUES (:device_id);
"""

@insert_device_into_deletion_in_progress """
INSERT INTO test.deletion_in_progress (device_id, vmq_ack)
INSERT INTO #{@test_keyspace}.deletion_in_progress (device_id, vmq_ack)
VALUES (:device_id, :vmq_ack);
"""

@truncate_devices_table """
TRUNCATE test.devices;
TRUNCATE #{@test_keyspace}.devices;
"""

@truncate_deletion_in_progress_table """
TRUNCATE test.deletion_in_progress;
TRUNCATE #{@test_keyspace}.deletion_in_progress;
"""

@drop_test_keyspace """
DROP KEYSPACE test;
DROP KEYSPACE #{@test_keyspace};
"""

def setup_db!() do
Expand Down
Loading