|
12 | 12 |
|
13 | 13 | import io |
14 | 14 | import logging |
| 15 | +import os |
15 | 16 | import os as os_lib |
16 | 17 | import random |
17 | 18 | import string |
18 | 19 | import time |
19 | 20 | import zipfile |
20 | | -from collections import defaultdict |
| 21 | +from pathlib import Path |
21 | 22 |
|
22 | 23 | import boto3 |
23 | 24 | import pytest |
24 | 25 | from assertpy import assert_that |
25 | 26 | from cfn_stacks_factory import CfnStack, CfnVpcStack |
| 27 | +from framework.fixture_utils import SharedFixture, xdist_session_fixture |
26 | 28 | from paramiko import Ed25519Key |
27 | 29 | from remote_command_executor import RemoteCommandExecutor |
28 | 30 | from retrying import retry |
29 | 31 | from time_utils import seconds |
30 | 32 | from utils import find_stack_by_tag, generate_stack_name, is_directory_supported, random_alphanumeric |
| 33 | +from xdist import get_xdist_worker_id |
31 | 34 |
|
32 | 35 | from tests.ad_integration.cluster_user import ClusterUser |
33 | 36 | from tests.common.utils import run_system_analyzer |
@@ -226,55 +229,122 @@ def _check_ssm_success(ssm_client, command_id, instance_id): |
226 | 229 | ).is_true() |
227 | 230 |
|
228 | 231 |
|
| 232 | +@xdist_session_fixture(autouse=True) |
| 233 | +def directory_shared_namespace(request): |
| 234 | + """ |
| 235 | + Session-scoped, cross-worker shared namespace (a directory path) for per-key SharedFixtures. |
| 236 | + IMPORTANT: This fixture does NOT create any AD stacks. It only returns a path that all workers can use. |
| 237 | + Returning a plain string keeps it picklable for SharedFixture. |
| 238 | + """ |
| 239 | + base_dir = Path(f"{request.config.getoption('output_dir', '')}/tmp/shared_fixtures") |
| 240 | + base_dir.mkdir(parents=True, exist_ok=True) |
| 241 | + # Return a simple, picklable value (string path) |
| 242 | + return str(base_dir) |
| 243 | + |
| 244 | + |
| 245 | +def _directory_stack_resource_generator( |
| 246 | + existing_directory_stack_name, |
| 247 | + directory_type, |
| 248 | + region, |
| 249 | + vpc_stack, |
| 250 | + cfn_stacks_factory, |
| 251 | + request, |
| 252 | +): |
| 253 | + """ |
| 254 | + Generator used by SharedFixture to provide a shared value (stack info) and run cleanup. |
| 255 | + Yields a picklable dict: {"name": <stack_name>, "managed": <bool>} |
| 256 | + """ |
| 257 | + managed = False |
| 258 | + if existing_directory_stack_name: |
| 259 | + name = existing_directory_stack_name |
| 260 | + else: |
| 261 | + # Try to reuse an existing tagged stack; if not found, create a new one. |
| 262 | + stack_prefix = f"integ-tests-MultiUserInfraStack{directory_type}" |
| 263 | + name = find_stack_by_tag("parallelcluster:integ-tests-ad-stack", region, stack_prefix) |
| 264 | + if not name: |
| 265 | + directory_stack = _create_directory_stack(cfn_stacks_factory, request, directory_type, region, vpc_stack) |
| 266 | + name = directory_stack.name |
| 267 | + managed = True |
| 268 | + |
| 269 | + try: |
| 270 | + yield {"name": name, "managed": managed} |
| 271 | + finally: |
| 272 | + # Only delete stacks created by this fixture, and only if not retained/no-delete. |
| 273 | + if managed and not (request.config.getoption("no_delete") or request.config.getoption("retain_ad_stack")): |
| 274 | + try: |
| 275 | + cfn_stacks_factory.delete_stack(name, region) |
| 276 | + logging.info("Deleted directory stack %s in %s", name, region) |
| 277 | + except Exception as e: |
| 278 | + logging.warning("Failed deleting directory stack %s: %s", name, e) |
| 279 | + |
| 280 | + |
229 | 281 | @pytest.fixture(scope="class") |
230 | | -def directory_factory(request, cfn_stacks_factory, vpc_stack): # noqa: C901 |
231 | | - # TODO: use external data file and file locking in order to share directories across processes |
232 | | - created_directory_stacks = defaultdict(dict) |
| 282 | +def directory_factory(directory_shared_namespace, vpc_stacks_shared, cfn_stacks_factory, request): |
| 283 | + """ |
| 284 | + Class-scoped factory: build a per-(region, directory_type) SharedFixture and acquire it on call. |
| 285 | + We purposely track only the last acquire (per worker) and release it once in teardown, |
| 286 | + based on the current assumption that each worker invokes this factory only once. |
| 287 | + """ |
| 288 | + base_dir = Path(directory_shared_namespace) # shared path provided by the session fixture |
| 289 | + last_shared_fixture = None |
233 | 290 |
|
234 | | - def _directory_factory( |
235 | | - existing_directory_stack_name, |
236 | | - directory_type, |
237 | | - region, |
238 | | - ): |
| 291 | + def _factory(existing_directory_stack_name: str, directory_type: str, region: str) -> str: |
| 292 | + xdist_worker_id = get_xdist_worker_id(request) |
| 293 | + nodeid = getattr(request.node, "nodeid", "N/A") |
| 294 | + logging.info( |
| 295 | + "directory_factory invoked: region=%s, directory_type=%s, existing_provided=%s, worker=%s, nodeid=%s", |
| 296 | + region, |
| 297 | + directory_type, |
| 298 | + bool(existing_directory_stack_name), |
| 299 | + xdist_worker_id, |
| 300 | + nodeid, |
| 301 | + ) |
| 302 | + |
| 303 | + # Use-only path: explicit stack name, no sharing/cleanup. |
239 | 304 | if existing_directory_stack_name: |
240 | | - directory_stack_name = existing_directory_stack_name |
241 | | - logging.info("Using pre-existing directory stack named %s", directory_stack_name) |
242 | | - elif created_directory_stacks.get(region, {}).get("directory"): |
243 | | - directory_stack_name = created_directory_stacks.get(region, {}).get("directory") |
244 | | - logging.info("Using directory stack named %s created by another test", directory_stack_name) |
245 | | - else: |
246 | | - stack_prefix = f"integ-tests-MultiUserInfraStack{directory_type}" |
247 | | - directory_stack_name = find_stack_by_tag("parallelcluster:integ-tests-ad-stack", region, stack_prefix) |
248 | | - |
249 | | - if not directory_stack_name: |
250 | | - directory_stack = _create_directory_stack( |
251 | | - cfn_stacks_factory, |
252 | | - request, |
253 | | - directory_type, |
254 | | - region, |
255 | | - vpc_stack, |
256 | | - ) |
257 | | - directory_stack_name = directory_stack.name |
258 | | - created_directory_stacks[region]["directory"] = directory_stack_name |
259 | | - if request.config.getoption("retain_ad_stack"): |
260 | | - add_tag_to_stack(vpc_stack.name, "DO-NOT-DELETE", "Retained for integration testing") |
261 | | - return directory_stack_name |
262 | | - |
263 | | - yield _directory_factory |
264 | | - |
265 | | - for region, stack_dict in created_directory_stacks.items(): |
266 | | - for stack_type in stack_dict: |
267 | | - stack_name = stack_dict[stack_type] |
268 | | - if request.config.getoption("no_delete") or request.config.getoption("retain_ad_stack"): |
269 | | - logging.info( |
270 | | - "Not deleting %s stack named %s in region %s because --no-delete option was specified", |
271 | | - stack_type, |
272 | | - stack_name, |
273 | | - region, |
274 | | - ) |
275 | | - else: |
276 | | - logging.info("Deleting %s stack named %s in region %s", stack_type, stack_name, region) |
277 | | - cfn_stacks_factory.delete_stack(stack_name, region) |
| 305 | + return existing_directory_stack_name |
| 306 | + |
| 307 | + if not is_directory_supported(region, directory_type): |
| 308 | + raise RuntimeError(f"Directory type {directory_type} not supported in {region}") |
| 309 | + |
| 310 | + pid = os.getpid() |
| 311 | + xdist_worker_id_and_pid = f"{xdist_worker_id}: {pid}" |
| 312 | + |
| 313 | + vpc_stack = vpc_stacks_shared[region] # one VPC per region (from vpc_stacks_shared) |
| 314 | + |
| 315 | + shared_fixture = SharedFixture( |
| 316 | + name=f"directory_stack_{region}_{directory_type}", |
| 317 | + shared_save_location=base_dir, |
| 318 | + fixture_func=_directory_stack_resource_generator, # generator does find-or-create + cleanup |
| 319 | + fixture_func_args=( |
| 320 | + existing_directory_stack_name, |
| 321 | + directory_type, |
| 322 | + region, |
| 323 | + vpc_stack, |
| 324 | + cfn_stacks_factory, |
| 325 | + request, |
| 326 | + ), |
| 327 | + fixture_func_kwargs={}, |
| 328 | + xdist_worker_id_and_pid=xdist_worker_id_and_pid, |
| 329 | + log_file=request.config.getoption("tests_log_file"), |
| 330 | + ) |
| 331 | + |
| 332 | + data = shared_fixture.acquire() |
| 333 | + nonlocal last_shared_fixture |
| 334 | + last_shared_fixture = shared_fixture |
| 335 | + payload = data.fixture_return_value or {} |
| 336 | + stack_name = payload.get("name") |
| 337 | + return stack_name |
| 338 | + |
| 339 | + yield _factory |
| 340 | + |
| 341 | + # Release once (per worker). If future changes call this factory multiple times per worker, |
| 342 | + # switch to tracking all acquisitions and releasing each. |
| 343 | + if last_shared_fixture: |
| 344 | + try: |
| 345 | + last_shared_fixture.release() |
| 346 | + except Exception as e: |
| 347 | + logging.warning("Error releasing shared fixture: %s", e) |
278 | 348 |
|
279 | 349 |
|
280 | 350 | def _run_user_workloads(users, test_datadir, shared_storage_mount_dirs): |
|
0 commit comments