diff --git a/tests/core/grpc_client.py b/tests/core/grpc_client.py index 22ae555..6aeac54 100644 --- a/tests/core/grpc_client.py +++ b/tests/core/grpc_client.py @@ -179,3 +179,27 @@ def list_public_ip_ids(self) -> list[str]: def delete_public_ip(self, *, public_ip_id: str) -> None: self.call(service=f"{PUBLIC_API}.PublicIPs/Delete", data={"id": public_ip_id}) + + # PublicIPAttachment operations (public API) + + def create_public_ip_attachment(self, *, name: str, public_ip: str, compute_instance: str) -> str: + response: dict[str, Any] = self.call( + service=f"{PUBLIC_API}.PublicIPAttachments/Create", + data={ + "object": { + "metadata": {"name": name}, + "spec": {"public_ip": public_ip, "compute_instance": compute_instance}, + } + }, + ) + return response["object"]["id"] + + def get_public_ip_attachment(self, *, attachment_id: str) -> dict[str, Any]: + return self.call(service=f"{PUBLIC_API}.PublicIPAttachments/Get", data={"id": attachment_id}) + + def list_public_ip_attachment_ids(self) -> list[str]: + response: dict[str, Any] = self.call(service=f"{PUBLIC_API}.PublicIPAttachments/List") + return [item["id"] for item in response.get("items", [])] + + def delete_public_ip_attachment(self, *, attachment_id: str) -> None: + self.call(service=f"{PUBLIC_API}.PublicIPAttachments/Delete", data={"id": attachment_id}) diff --git a/tests/core/helpers.py b/tests/core/helpers.py index 146b19b..85ec32e 100644 --- a/tests/core/helpers.py +++ b/tests/core/helpers.py @@ -1,10 +1,24 @@ from __future__ import annotations +import re +import subprocess + +import pytest + from tests.core.grpc_client import GRPCClient from tests.core.k8s_client import K8sClient from tests.core.runner import poll_until, run_unchecked +def assert_grpc_rejected( + exc_info: pytest.ExceptionInfo[subprocess.CalledProcessError], + code: str, +) -> None: + exc = exc_info.value + combined: str = (exc.stderr or "") + (exc.stdout or "") + assert re.search(rf"Code:\s*{code}", combined), f"Expected gRPC {code}, got: {combined.strip()}" + + def wait_for_cr(*, k8s: K8sClient, uuid: str) -> str: return poll_until( fn=lambda: k8s.get_compute_instance_name(uuid=uuid, checked=False), @@ -197,6 +211,41 @@ def wait_for_public_ip_deletion(*, k8s: K8sClient, name: str) -> None: ) +def wait_for_public_ip_attachment_cr(*, k8s: K8sClient, uuid: str) -> str: + return poll_until( + fn=lambda: k8s.get_public_ip_attachment_name(uuid=uuid, checked=False), + until=lambda v: v != "", + retries=30, + delay=1, + description=f"PublicIPAttachment CR for {uuid}", + ) + + +def wait_for_public_ip_attachment_ready(*, k8s: K8sClient, name: str) -> None: + def _check_phase() -> str: + phase: str = k8s.get_public_ip_attachment_phase(name=name, checked=False) + assert phase != "Failed", f"{name} PublicIPAttachment entered Failed phase" + return phase + + poll_until( + fn=_check_phase, + until=lambda v: v == "Ready", + retries=60, + delay=5, + description=f"{name} PublicIPAttachment Ready", + ) + + +def wait_for_public_ip_attachment_deletion(*, k8s: K8sClient, name: str) -> None: + poll_until( + fn=lambda: not k8s.is_present(resource="publicipattachment", name=name), + until=lambda v: v is True, + retries=120, + delay=5, + description=f"{name} PublicIPAttachment deletion", + ) + + def wait_for_cluster_order_cr(*, k8s: K8sClient, uuid: str) -> str: return poll_until( fn=lambda: k8s.get_cluster_order_name(uuid=uuid, checked=False), diff --git a/tests/core/k8s_client.py b/tests/core/k8s_client.py index 9e04803..0cb6eea 100644 --- a/tests/core/k8s_client.py +++ b/tests/core/k8s_client.py @@ -241,6 +241,28 @@ def get_public_ip_state(self, *, name: str, checked: bool = True) -> str: ) return output if rc == 0 else "" + # PublicIPAttachment queries + + def get_public_ip_attachment_name(self, *, uuid: str, checked: bool = True) -> str: + output, rc = self._get( + "get", + "publicipattachment", + "-n", + self.namespace, + "-l", + f"osac.openshift.io/publicipattachment-uuid={uuid}", + "-o", + "jsonpath={.items[0].metadata.name}", + checked=checked, + ) + return output if rc == 0 else "" + + def get_public_ip_attachment_phase(self, *, name: str, checked: bool = True) -> str: + output, rc = self._get( + "get", "publicipattachment", name, "-n", self.namespace, "-o", "jsonpath={.status.phase}", checked=checked + ) + return output if rc == 0 else "" + # ClusterOrder queries def get_cluster_order_name(self, *, uuid: str, checked: bool = True) -> str: diff --git a/tests/vmaas/public_ip/conftest.py b/tests/vmaas/public_ip/conftest.py index ef6f111..f091ba4 100644 --- a/tests/vmaas/public_ip/conftest.py +++ b/tests/vmaas/public_ip/conftest.py @@ -9,12 +9,16 @@ from tests.core.grpc_client import GRPCClient from tests.core.helpers import ( + wait_for_cr, + wait_for_deletion, wait_for_public_ip_deletion, wait_for_public_ip_pool_cr, wait_for_public_ip_pool_deletion, wait_for_public_ip_pool_ready, + wait_for_running, ) from tests.core.k8s_client import K8sClient +from tests.core.osac_cli import OsacCLI from tests.vmaas.public_ip.helpers import create_ip, get_random_subnet logger = logging.getLogger(__name__) @@ -90,3 +94,39 @@ def public_ip( except subprocess.CalledProcessError as exc: logger.warning("PublicIP %s gRPC delete failed in teardown: %s", ip_id, (exc.stderr or "").strip()) wait_for_public_ip_deletion(k8s=k8s_hub_client, name=ip_cr_name) + + +@pytest.fixture(scope="class") +def make_compute_instances( + cli: OsacCLI, + k8s_hub_client: K8sClient, + vm_template: str, + default_subnet: str, +) -> Generator[Callable[..., tuple[tuple[str, str], ...]], None, None]: + created: list[tuple[str, str]] = [] + + def _make(count: int = 2) -> tuple[tuple[str, str], ...]: + instances: list[tuple[str, str]] = [] + for _ in range(count): + uuid = cli.create_compute_instance( + template=vm_template, + network_attachments=[{"subnet": default_subnet}], + ) + name = wait_for_cr(k8s=k8s_hub_client, uuid=uuid) + created.append((uuid, name)) + instances.append((uuid, name)) + for _, name in instances: + wait_for_running(k8s=k8s_hub_client, name=name) + return tuple(instances) + + yield _make + + for ci_uuid, ci_name in reversed(created): + if not k8s_hub_client.is_present(resource="computeinstance", name=ci_name): + continue + try: + cli.delete_compute_instance(uuid=ci_uuid) + except subprocess.CalledProcessError as exc: + logger.warning("ComputeInstance %s teardown failed: %s", ci_uuid, (exc.stderr or "").strip()) + continue + wait_for_deletion(k8s=k8s_hub_client, name=ci_name) diff --git a/tests/vmaas/public_ip/test_public_ip_pool_capacity.py b/tests/vmaas/public_ip/test_public_ip_pool_capacity.py index a50683d..065ba8b 100644 --- a/tests/vmaas/public_ip/test_public_ip_pool_capacity.py +++ b/tests/vmaas/public_ip/test_public_ip_pool_capacity.py @@ -6,7 +6,7 @@ import pytest from tests.core.grpc_client import GRPCClient -from tests.core.helpers import wait_for_public_ip_pool_deletion +from tests.core.helpers import assert_grpc_rejected, wait_for_public_ip_pool_deletion from tests.core.k8s_client import K8sClient from tests.core.runner import poll_until from tests.vmaas.public_ip.helpers import create_ip, delete_ip, pool_status @@ -60,13 +60,9 @@ def test_exhaustion_rejects_creation( status = pool_status(private_grpc, pool_id) assert status["available"] == 0, f"Pool should be full, available={status['available']}" - try: - ip_id = grpc.create_public_ip(name=f"test-ip-{uuid4().hex[:8]}", pool=pool_id) - created_ips.append((ip_id, "")) - pytest.fail("create_public_ip should have been rejected on a full pool") - except subprocess.CalledProcessError as exc: - combined = (exc.stderr or "") + (exc.stdout or "") - assert "FailedPrecondition" in combined + with pytest.raises(subprocess.CalledProcessError) as exc_info: + grpc.create_public_ip(name=f"test-ip-{uuid4().hex[:8]}", pool=pool_id) + assert_grpc_rejected(exc_info, "FailedPrecondition") def test_release_restores_capacity( self, @@ -101,8 +97,7 @@ def test_pool_deletion_blocked_while_ips_allocated( pool_id, _ = small_pool with pytest.raises(subprocess.CalledProcessError) as exc_info: private_grpc.delete_public_ip_pool(pool_id=pool_id) - combined = (exc_info.value.stderr or "") + (exc_info.value.stdout or "") - assert "FailedPrecondition" in combined + assert_grpc_rejected(exc_info, "FailedPrecondition") def test_pool_deletion_succeeds_after_all_ips_released( self, diff --git a/tests/vmaas/public_ip/test_public_ip_pool_lifecycle.py b/tests/vmaas/public_ip/test_public_ip_pool_lifecycle.py index 047f066..1d528a2 100644 --- a/tests/vmaas/public_ip/test_public_ip_pool_lifecycle.py +++ b/tests/vmaas/public_ip/test_public_ip_pool_lifecycle.py @@ -1,8 +1,18 @@ from __future__ import annotations +import subprocess +from collections.abc import Callable +from uuid import uuid4 + +import pytest + from tests.core.grpc_client import GRPCClient from tests.core.helpers import ( + assert_grpc_rejected, wait_for_public_ip_allocated, + wait_for_public_ip_attachment_cr, + wait_for_public_ip_attachment_deletion, + wait_for_public_ip_attachment_ready, wait_for_public_ip_deletion, wait_for_public_ip_pool_deletion, ) @@ -10,40 +20,138 @@ from tests.core.runner import poll_until -def test_public_ip_pool_lifecycle( - public_ip_pool: tuple[str, str], - public_ip: tuple[str, str], - grpc: GRPCClient, - private_grpc: GRPCClient, - k8s_hub_client: K8sClient, -) -> None: - pool_id, pool_cr_name = public_ip_pool - ip_id, ip_cr_name = public_ip - - assert pool_id in private_grpc.list_public_ip_pool_ids() - - assert ip_id in grpc.list_public_ip_ids() - wait_for_public_ip_allocated(k8s=k8s_hub_client, name=ip_cr_name) - - # Delete the PublicIP first, then the pool - grpc.delete_public_ip(public_ip_id=ip_id) - wait_for_public_ip_deletion(k8s=k8s_hub_client, name=ip_cr_name) - poll_until( - fn=lambda: ip_id not in grpc.list_public_ip_ids(), - until=lambda v: v is True, - retries=30, - delay=5, - description=f"PublicIP {ip_id} removal from API", - ) - - # TODO Attach the PublicIP to a ComputeInstance and verify attach -> detach lifecycle - - private_grpc.delete_public_ip_pool(pool_id=pool_id) - wait_for_public_ip_pool_deletion(k8s=k8s_hub_client, name=pool_cr_name) - poll_until( - fn=lambda: pool_id not in private_grpc.list_public_ip_pool_ids(), - until=lambda v: v is True, - retries=30, - delay=5, - description=f"PublicIPPool {pool_id} removal from API", - ) +class TestPublicIPPoolLifecycle: + def test_attach_detach_reattach( + self, + public_ip_pool: tuple[str, str], + public_ip: tuple[str, str], + make_compute_instances: Callable[..., tuple[tuple[str, str], ...]], + grpc: GRPCClient, + private_grpc: GRPCClient, + k8s_hub_client: K8sClient, + ) -> None: + pool_id, pool_cr_name = public_ip_pool + ip_id, ip_cr_name = public_ip + (ci1_uuid, _ci1_name), (ci2_uuid, _ci2_name) = make_compute_instances(2) + + assert pool_id in private_grpc.list_public_ip_pool_ids() + assert ip_id in grpc.list_public_ip_ids() + wait_for_public_ip_allocated(k8s=k8s_hub_client, name=ip_cr_name) + + # --- Attach PublicIP to ComputeInstance 1 --- + att_id: str = grpc.create_public_ip_attachment( + name=f"test-att-{uuid4().hex[:8]}", + public_ip=ip_id, + compute_instance=ci1_uuid, + ) + att_cr_name: str = wait_for_public_ip_attachment_cr(k8s=k8s_hub_client, uuid=att_id) + wait_for_public_ip_attachment_ready(k8s=k8s_hub_client, name=att_cr_name) + + ip_obj = grpc.get_public_ip(public_ip_id=ip_id) + assert ip_obj["object"]["status"].get("attached") is True + attached_ip_address: str = ip_obj["object"]["status"]["address"] + assert attached_ip_address, "PublicIP should have an allocated address" + + # --- Detach (delete attachment) --- + grpc.delete_public_ip_attachment(attachment_id=att_id) + wait_for_public_ip_attachment_deletion(k8s=k8s_hub_client, name=att_cr_name) + + poll_until( + fn=lambda: grpc.get_public_ip(public_ip_id=ip_id)["object"]["status"].get("attached"), + until=lambda v: v is not True, + retries=30, + delay=5, + description=f"PublicIP {ip_id} detached", + ) + + # --- Re-attach same IP to ComputeInstance 2 --- + att2_id: str = grpc.create_public_ip_attachment( + name=f"test-att-{uuid4().hex[:8]}", + public_ip=ip_id, + compute_instance=ci2_uuid, + ) + att2_cr_name: str = wait_for_public_ip_attachment_cr(k8s=k8s_hub_client, uuid=att2_id) + wait_for_public_ip_attachment_ready(k8s=k8s_hub_client, name=att2_cr_name) + + ip_obj = grpc.get_public_ip(public_ip_id=ip_id) + assert ip_obj["object"]["status"]["address"] == attached_ip_address, ( + "Re-attached PublicIP should keep the same address" + ) + + # --- Cleanup: delete attachment, PublicIP, then pool --- + grpc.delete_public_ip_attachment(attachment_id=att2_id) + wait_for_public_ip_attachment_deletion(k8s=k8s_hub_client, name=att2_cr_name) + + grpc.delete_public_ip(public_ip_id=ip_id) + wait_for_public_ip_deletion(k8s=k8s_hub_client, name=ip_cr_name) + poll_until( + fn=lambda: ip_id not in grpc.list_public_ip_ids(), + until=lambda v: v is True, + retries=30, + delay=5, + description=f"PublicIP {ip_id} removal from API", + ) + + private_grpc.delete_public_ip_pool(pool_id=pool_id) + wait_for_public_ip_pool_deletion(k8s=k8s_hub_client, name=pool_cr_name) + poll_until( + fn=lambda: pool_id not in private_grpc.list_public_ip_pool_ids(), + until=lambda v: v is True, + retries=30, + delay=5, + description=f"PublicIPPool {pool_id} removal from API", + ) + + def test_validation_rejections( + self, + public_ip_pool: tuple[str, str], + public_ip: tuple[str, str], + make_compute_instances: Callable[..., tuple[tuple[str, str], ...]], + grpc: GRPCClient, + k8s_hub_client: K8sClient, + ) -> None: + _pool_id, _pool_cr_name = public_ip_pool + ip_id, ip_cr_name = public_ip + ci1_uuid, _ = make_compute_instances(1)[0] + + wait_for_public_ip_allocated(k8s=k8s_hub_client, name=ip_cr_name) + + # Nonexistent ComputeInstance + fake_ci_uuid: str = str(uuid4()) + with pytest.raises(subprocess.CalledProcessError) as exc_info: + grpc.create_public_ip_attachment( + name=f"test-att-{uuid4().hex[:8]}", + public_ip=ip_id, + compute_instance=fake_ci_uuid, + ) + assert_grpc_rejected(exc_info, "InvalidArgument") + + # Nonexistent PublicIP + fake_ip_uuid: str = str(uuid4()) + with pytest.raises(subprocess.CalledProcessError) as exc_info: + grpc.create_public_ip_attachment( + name=f"test-att-{uuid4().hex[:8]}", + public_ip=fake_ip_uuid, + compute_instance=ci1_uuid, + ) + assert_grpc_rejected(exc_info, "InvalidArgument") + + # Duplicate attachment — attach same PublicIP twice + att_id: str = grpc.create_public_ip_attachment( + name=f"test-att-{uuid4().hex[:8]}", + public_ip=ip_id, + compute_instance=ci1_uuid, + ) + att_cr_name: str = wait_for_public_ip_attachment_cr(k8s=k8s_hub_client, uuid=att_id) + wait_for_public_ip_attachment_ready(k8s=k8s_hub_client, name=att_cr_name) + + with pytest.raises(subprocess.CalledProcessError) as exc_info: + grpc.create_public_ip_attachment( + name=f"test-att-{uuid4().hex[:8]}", + public_ip=ip_id, + compute_instance=ci1_uuid, + ) + assert_grpc_rejected(exc_info, "AlreadyExists") + + grpc.delete_public_ip_attachment(attachment_id=att_id) + wait_for_public_ip_attachment_deletion(k8s=k8s_hub_client, name=att_cr_name)