Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions tests/core/grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,27 @@ def list_public_ip_ids(self) -> list[str]:

def delete_public_ip(self, *, public_ip_id: str) -> None:
self.call(service=f"{PUBLIC_API}.PublicIPs/Delete", data={"id": public_ip_id})

# PublicIPAttachment operations (public API)

def create_public_ip_attachment(self, *, name: str, public_ip: str, compute_instance: str) -> str:
response: dict[str, Any] = self.call(
service=f"{PUBLIC_API}.PublicIPAttachments/Create",
data={
"object": {
"metadata": {"name": name},
"spec": {"public_ip": public_ip, "compute_instance": compute_instance},
}
},
)
return response["object"]["id"]

def get_public_ip_attachment(self, *, attachment_id: str) -> dict[str, Any]:
return self.call(service=f"{PUBLIC_API}.PublicIPAttachments/Get", data={"id": attachment_id})

def list_public_ip_attachment_ids(self) -> list[str]:
response: dict[str, Any] = self.call(service=f"{PUBLIC_API}.PublicIPAttachments/List")
return [item["id"] for item in response.get("items", [])]

def delete_public_ip_attachment(self, *, attachment_id: str) -> None:
self.call(service=f"{PUBLIC_API}.PublicIPAttachments/Delete", data={"id": attachment_id})
49 changes: 49 additions & 0 deletions tests/core/helpers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
from __future__ import annotations

import re
import subprocess

import pytest

from tests.core.grpc_client import GRPCClient
from tests.core.k8s_client import K8sClient
from tests.core.runner import poll_until, run_unchecked


def assert_grpc_rejected(
exc_info: pytest.ExceptionInfo[subprocess.CalledProcessError],
code: str,
) -> None:
exc = exc_info.value
combined: str = (exc.stderr or "") + (exc.stdout or "")
assert re.search(rf"Code:\s*{code}", combined), f"Expected gRPC {code}, got: {combined.strip()}"


def wait_for_cr(*, k8s: K8sClient, uuid: str) -> str:
return poll_until(
fn=lambda: k8s.get_compute_instance_name(uuid=uuid, checked=False),
Expand Down Expand Up @@ -197,6 +211,41 @@ def wait_for_public_ip_deletion(*, k8s: K8sClient, name: str) -> None:
)


def wait_for_public_ip_attachment_cr(*, k8s: K8sClient, uuid: str) -> str:
return poll_until(
fn=lambda: k8s.get_public_ip_attachment_name(uuid=uuid, checked=False),
until=lambda v: v != "",
retries=30,
delay=1,
description=f"PublicIPAttachment CR for {uuid}",
)


def wait_for_public_ip_attachment_ready(*, k8s: K8sClient, name: str) -> None:
def _check_phase() -> str:
phase: str = k8s.get_public_ip_attachment_phase(name=name, checked=False)
assert phase != "Failed", f"{name} PublicIPAttachment entered Failed phase"
return phase

poll_until(
fn=_check_phase,
until=lambda v: v == "Ready",
retries=60,
delay=5,
description=f"{name} PublicIPAttachment Ready",
)


def wait_for_public_ip_attachment_deletion(*, k8s: K8sClient, name: str) -> None:
poll_until(
fn=lambda: not k8s.is_present(resource="publicipattachment", name=name),
until=lambda v: v is True,
retries=120,
delay=5,
description=f"{name} PublicIPAttachment deletion",
)


def wait_for_cluster_order_cr(*, k8s: K8sClient, uuid: str) -> str:
return poll_until(
fn=lambda: k8s.get_cluster_order_name(uuid=uuid, checked=False),
Expand Down
22 changes: 22 additions & 0 deletions tests/core/k8s_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,28 @@ def get_public_ip_state(self, *, name: str, checked: bool = True) -> str:
)
return output if rc == 0 else ""

# PublicIPAttachment queries

def get_public_ip_attachment_name(self, *, uuid: str, checked: bool = True) -> str:
output, rc = self._get(
"get",
"publicipattachment",
"-n",
self.namespace,
"-l",
f"osac.openshift.io/publicipattachment-uuid={uuid}",
"-o",
"jsonpath={.items[0].metadata.name}",
checked=checked,
)
return output if rc == 0 else ""

def get_public_ip_attachment_phase(self, *, name: str, checked: bool = True) -> str:
output, rc = self._get(
"get", "publicipattachment", name, "-n", self.namespace, "-o", "jsonpath={.status.phase}", checked=checked
)
return output if rc == 0 else ""

# ClusterOrder queries

def get_cluster_order_name(self, *, uuid: str, checked: bool = True) -> str:
Expand Down
40 changes: 40 additions & 0 deletions tests/vmaas/public_ip/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@

from tests.core.grpc_client import GRPCClient
from tests.core.helpers import (
wait_for_cr,
wait_for_deletion,
wait_for_public_ip_deletion,
wait_for_public_ip_pool_cr,
wait_for_public_ip_pool_deletion,
wait_for_public_ip_pool_ready,
wait_for_running,
)
from tests.core.k8s_client import K8sClient
from tests.core.osac_cli import OsacCLI
from tests.vmaas.public_ip.helpers import create_ip, get_random_subnet

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,3 +94,39 @@ def public_ip(
except subprocess.CalledProcessError as exc:
logger.warning("PublicIP %s gRPC delete failed in teardown: %s", ip_id, (exc.stderr or "").strip())
wait_for_public_ip_deletion(k8s=k8s_hub_client, name=ip_cr_name)


@pytest.fixture(scope="class")
def make_compute_instances(
cli: OsacCLI,
k8s_hub_client: K8sClient,
vm_template: str,
default_subnet: str,
) -> Generator[Callable[..., tuple[tuple[str, str], ...]], None, None]:
created: list[tuple[str, str]] = []

def _make(count: int = 2) -> tuple[tuple[str, str], ...]:
instances: list[tuple[str, str]] = []
for _ in range(count):
uuid = cli.create_compute_instance(
template=vm_template,
network_attachments=[{"subnet": default_subnet}],
)
name = wait_for_cr(k8s=k8s_hub_client, uuid=uuid)
created.append((uuid, name))
instances.append((uuid, name))
for _, name in instances:
wait_for_running(k8s=k8s_hub_client, name=name)
return tuple(instances)

yield _make

for ci_uuid, ci_name in reversed(created):
if not k8s_hub_client.is_present(resource="computeinstance", name=ci_name):
continue
try:
cli.delete_compute_instance(uuid=ci_uuid)
except subprocess.CalledProcessError as exc:
logger.warning("ComputeInstance %s teardown failed: %s", ci_uuid, (exc.stderr or "").strip())
continue
wait_for_deletion(k8s=k8s_hub_client, name=ci_name)
15 changes: 5 additions & 10 deletions tests/vmaas/public_ip/test_public_ip_pool_capacity.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest

from tests.core.grpc_client import GRPCClient
from tests.core.helpers import wait_for_public_ip_pool_deletion
from tests.core.helpers import assert_grpc_rejected, wait_for_public_ip_pool_deletion
from tests.core.k8s_client import K8sClient
from tests.core.runner import poll_until
from tests.vmaas.public_ip.helpers import create_ip, delete_ip, pool_status
Expand Down Expand Up @@ -60,13 +60,9 @@ def test_exhaustion_rejects_creation(
status = pool_status(private_grpc, pool_id)
assert status["available"] == 0, f"Pool should be full, available={status['available']}"

try:
ip_id = grpc.create_public_ip(name=f"test-ip-{uuid4().hex[:8]}", pool=pool_id)
created_ips.append((ip_id, ""))
pytest.fail("create_public_ip should have been rejected on a full pool")
except subprocess.CalledProcessError as exc:
combined = (exc.stderr or "") + (exc.stdout or "")
assert "FailedPrecondition" in combined
with pytest.raises(subprocess.CalledProcessError) as exc_info:
grpc.create_public_ip(name=f"test-ip-{uuid4().hex[:8]}", pool=pool_id)
assert_grpc_rejected(exc_info, "FailedPrecondition")

def test_release_restores_capacity(
self,
Expand Down Expand Up @@ -101,8 +97,7 @@ def test_pool_deletion_blocked_while_ips_allocated(
pool_id, _ = small_pool
with pytest.raises(subprocess.CalledProcessError) as exc_info:
private_grpc.delete_public_ip_pool(pool_id=pool_id)
combined = (exc_info.value.stderr or "") + (exc_info.value.stdout or "")
assert "FailedPrecondition" in combined
assert_grpc_rejected(exc_info, "FailedPrecondition")

def test_pool_deletion_succeeds_after_all_ips_released(
self,
Expand Down
Loading
Loading