Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 30 additions & 16 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,27 @@ def service_account() -> str:


@pytest.fixture(scope="session")
def grpc(fulfillment_address: str, namespace: str, service_account: str) -> GRPCClient:
token: str = run(
"oc", "create", "token", service_account, "-n", namespace, "--duration", "1h", "--as", "system:admin"
def grpc(fulfillment_address: str, keycloak_url: str, jwt_password: str) -> GRPCClient:
return GRPCClient(
address=fulfillment_address,
token_factory=lambda: get_jwt(
keycloak_url=keycloak_url, realm="osac", client_id="osac-cli",
username="tenant1_admin", password=jwt_password,
),
)
return GRPCClient(address=fulfillment_address, token=token)


@pytest.fixture(scope="session")
def private_grpc(fulfillment_private_address: str, namespace: str, service_account: str) -> GRPCClient:
token: str = run(
"oc", "create", "token", service_account, "-n", namespace, "--duration", "1h", "--as", "system:admin"
def private_grpc(
fulfillment_private_address: str, namespace: str, service_account: str
) -> GRPCClient:
return GRPCClient(
address=fulfillment_private_address,
token_factory=lambda: run(
"oc", "create", "token", service_account, "-n", namespace,
"--duration", "1h", "--as", "system:admin",
),
)
return GRPCClient(address=fulfillment_private_address, token=token)


@pytest.fixture(scope="session", autouse=True)
Expand All @@ -62,11 +70,11 @@ def k8s_hub_client(namespace: str) -> K8sClient:


@pytest.fixture(scope="session")
def cli(namespace: str, fulfillment_address: str, service_account: str) -> OsacCLI:
def cli(namespace: str, fulfillment_address: str, keycloak_url: str, jwt_password: str) -> OsacCLI:
return OsacCLI(
binary=env("OSAC_CLI_PATH", "osac"),
address=f"https://{fulfillment_address.rsplit(':', 1)[0]}",
token_script=f"oc create token -n {namespace} {service_account} --as system:admin",
token_script=_make_jwt_token_script(keycloak_url, "tenant1_admin", jwt_password),
namespace=namespace,
)

Expand Down Expand Up @@ -112,15 +120,21 @@ def jwt_cli_admin(namespace: str, fulfillment_address: str, keycloak_url: str, j

@pytest.fixture(scope="session")
def jwt_grpc_tenant1(fulfillment_address: str, keycloak_url: str, jwt_password: str) -> GRPCClient:
token: str = get_jwt(
keycloak_url=keycloak_url, realm="osac", client_id="osac-cli", username="tenant1_user", password=jwt_password
return GRPCClient(
address=fulfillment_address,
token_factory=lambda: get_jwt(
keycloak_url=keycloak_url, realm="osac", client_id="osac-cli",
username="tenant1_user", password=jwt_password,
),
)
return GRPCClient(address=fulfillment_address, token=token)


@pytest.fixture(scope="session")
def jwt_grpc_tenant2(fulfillment_address: str, keycloak_url: str, jwt_password: str) -> GRPCClient:
token: str = get_jwt(
keycloak_url=keycloak_url, realm="osac", client_id="osac-cli", username="tenant2_user", password=jwt_password
return GRPCClient(
address=fulfillment_address,
token_factory=lambda: get_jwt(
keycloak_url=keycloak_url, realm="osac", client_id="osac-cli",
username="tenant2_user", password=jwt_password,
),
)
return GRPCClient(address=fulfillment_address, token=token)
29 changes: 26 additions & 3 deletions tests/core/grpc_client.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,46 @@
from __future__ import annotations

import base64
import json
import re
import subprocess
import time
from collections.abc import Callable
from typing import Any

from tests.core.runner import run

PUBLIC_API: str = "osac.public.v1"
PRIVATE_API: str = "osac.private.v1"

_TOKEN_REFRESH_MARGIN_SECONDS: int = 30


def _jwt_exp(token: str) -> float:
"""Extract the ``exp`` claim from a JWT without verifying the signature."""
payload = token.split(".")[1]
# Pad base64url to a multiple of 4
padded = payload + "=" * (-len(payload) % 4)
claims: dict[str, Any] = json.loads(base64.urlsafe_b64decode(padded))
return float(claims["exp"])
Comment thread
coderabbitai[bot] marked this conversation as resolved.


class GRPCClient:
def __init__(self, *, address: str, token: str) -> None:
def __init__(self, *, address: str, token_factory: Callable[[], str]) -> None:
self.address: str = address
self.token: str = token
self._token_factory: Callable[[], str] = token_factory
self._cached_token: str | None = None
self._token_exp: float = 0.0

@property
def _token(self) -> str:
if self._cached_token is None or time.time() >= self._token_exp - _TOKEN_REFRESH_MARGIN_SECONDS:
self._cached_token = self._token_factory()
self._token_exp = _jwt_exp(self._cached_token)
return self._cached_token
Comment on lines +35 to +40

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | ⚖️ Poor tradeoff

Thread-safety concern in token refresh logic - Risk: Low

The _token property performs a check-then-act pattern without synchronization. In a multi-threaded test scenario, concurrent calls could trigger multiple simultaneous token refreshes.

Impact: Not critical for test code, but could cause unnecessary Keycloak requests if tests run with threading. Since this is test infrastructure with session-scoped fixtures, the practical risk is minimal.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/core/grpc_client.py` around lines 35 - 40, The _token property in the
token refresh logic performs a check-then-act pattern without synchronization.
Multiple threads could simultaneously evaluate the token expiration condition
and all trigger a refresh via _token_factory(), causing unnecessary concurrent
requests. Add thread synchronization (such as a lock) around the token refresh
logic between the expiration check and the _cached_token assignment to ensure
only one thread refreshes the token at a time when the expiration condition is
detected.


def call(self, *, service: str, data: dict[str, Any] | None = None) -> dict[str, Any]:
args: list[str] = ["grpcurl", "-insecure", "-H", f"Authorization: Bearer {self.token}"]
args: list[str] = ["grpcurl", "-insecure", "-H", f"Authorization: Bearer {self._token}"]
if data is not None:
args.extend(["-d", json.dumps(data)])
args.extend([self.address, service])
Expand Down
4 changes: 2 additions & 2 deletions tests/vmaas/public_ip/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ def pool_status(private_grpc: GRPCClient, pool_id: str) -> dict[str, Any]:


def create_ip(
grpc: GRPCClient, k8s: K8sClient, pool_id: str
grpc: GRPCClient, k8s: K8sClient, pool: str
) -> tuple[str, str]:
ip_name: str = f"test-ip-{uuid4().hex[:8]}"
ip_id: str = grpc.create_public_ip(name=ip_name, pool=pool_id)
ip_id: str = grpc.create_public_ip(name=ip_name, pool=pool)
ip_cr_name: str = wait_for_public_ip_cr(k8s=k8s, uuid=ip_id)
wait_for_public_ip_allocated(k8s=k8s, name=ip_cr_name)
return ip_id, ip_cr_name
Expand Down
Loading