Skip to content

Commit 3e21bdb

Browse files
hehe7318gmarciani
authored andcommitted
Return a picklable string in shared resources instead of a object to avoid _pickle.PicklingError error
1 parent e20d12b commit 3e21bdb

File tree

1 file changed

+40
-59
lines changed

1 file changed

+40
-59
lines changed

tests/integration-tests/tests/ad_integration/test_ad_integration.py

Lines changed: 40 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,19 @@ def _check_ssm_success(ssm_client, command_id, instance_id):
229229
).is_true()
230230

231231

232+
@xdist_session_fixture(autouse=True)
233+
def directory_shared_namespace(request):
234+
"""
235+
Session-scoped, cross-worker shared namespace (a directory path) for per-key SharedFixtures.
236+
IMPORTANT: This fixture does NOT create any AD stacks. It only returns a path that all workers can use.
237+
Returning a plain string keeps it picklable for SharedFixture.
238+
"""
239+
base_dir = Path(f"{request.config.getoption('output_dir', '')}/tmp/shared_fixtures")
240+
base_dir.mkdir(parents=True, exist_ok=True)
241+
# Return a simple, picklable value (string path)
242+
return str(base_dir)
243+
244+
232245
def _directory_stack_resource_generator(
233246
existing_directory_stack_name,
234247
directory_type,
@@ -239,13 +252,13 @@ def _directory_stack_resource_generator(
239252
):
240253
"""
241254
Generator used by SharedFixture to provide a shared value (stack info) and run cleanup.
242-
Yields: {"name": <stack_name>, "managed": <bool>} where managed=True iff we created it here.
255+
Yields a picklable dict: {"name": <stack_name>, "managed": <bool>}
243256
"""
244-
# Resolve name and whether we own its lifecycle (managed)
245257
managed = False
246258
if existing_directory_stack_name:
247259
name = existing_directory_stack_name
248260
else:
261+
# Try to reuse an existing tagged stack; if not found, create a new one.
249262
stack_prefix = f"integ-tests-MultiUserInfraStack{directory_type}"
250263
name = find_stack_by_tag("parallelcluster:integ-tests-ad-stack", region, stack_prefix)
251264
if not name:
@@ -265,97 +278,65 @@ def _directory_stack_resource_generator(
265278
logging.warning("Failed deleting directory stack %s: %s", name, e)
266279

267280

268-
class _LazyDirectoryRegistry:
281+
@pytest.fixture(scope="class")
282+
def directory_factory(directory_shared_namespace, vpc_stacks_shared, cfn_stacks_factory, request):
269283
"""
270-
Session-shared registry that lazily creates/acquires per-(region, directory_type) SharedFixture.
271-
Each worker acquires exactly once per key and caches the stack name.
284+
Class-scoped factory that lazily creates per-(region, directory_type) SharedFixtures on first use.
285+
For each key we 'acquire()' exactly once per worker, cache the stack name, and 'release()' at teardown.
272286
"""
287+
base_dir = Path(directory_shared_namespace) # shared path provided by the session fixture
288+
local_cache = {} # key -> (shared_fixture_obj, stack_name)
273289

274-
def __init__(self, shared_dir: Path, cfn_stacks_factory, request, vpc_stacks_shared):
275-
self._dir = shared_dir
276-
self._dir.mkdir(parents=True, exist_ok=True)
277-
self._cfn = cfn_stacks_factory
278-
self._request = request
279-
self._vpcs = vpc_stacks_shared
280-
self._local = {} # key -> (SharedFixture, stack_name)
281-
282-
def get_stack_name(self, existing_directory_stack_name: str, directory_type: str, region: str) -> str:
290+
def _factory(existing_directory_stack_name: str, directory_type: str, region: str) -> str:
291+
# Use-only path: explicit stack name, no sharing/cleanup
283292
if existing_directory_stack_name:
284293
return existing_directory_stack_name
285294

286295
if not is_directory_supported(region, directory_type):
287296
raise RuntimeError(f"Directory type {directory_type} not supported in {region}")
288297

289298
key = f"{region}:{directory_type}"
290-
if key in self._local:
291-
return self._local[key][1]
299+
if key in local_cache:
300+
return local_cache[key][1]
292301

293-
# Build one SharedFixture per key and acquire exactly once
294-
xdist_worker_id = get_xdist_worker_id(self._request)
302+
# Build a per-key SharedFixture with a stable name so all workers rendezvous on the same files.
303+
xdist_worker_id = get_xdist_worker_id(request)
295304
pid = os.getpid()
296305
xdist_worker_id_and_pid = f"{xdist_worker_id}: {pid}"
297306

298-
vpc_stack = self._vpcs[region] # one VPC per region (from vpc_stacks_shared)
307+
vpc_stack = vpc_stacks_shared[region] # one VPC per region (from vpc_stacks_shared)
299308

300309
shared_fixture = SharedFixture(
301310
name=f"directory_stack_{region}_{directory_type}",
302-
shared_save_location=self._dir,
311+
shared_save_location=base_dir,
303312
fixture_func=_directory_stack_resource_generator,
304313
fixture_func_args=(
305314
existing_directory_stack_name,
306315
directory_type,
307316
region,
308317
vpc_stack,
309-
self._cfn,
310-
self._request,
318+
cfn_stacks_factory,
319+
request,
311320
),
312321
fixture_func_kwargs={},
313322
xdist_worker_id_and_pid=xdist_worker_id_and_pid,
314-
log_file=self._request.config.getoption("tests_log_file"),
323+
log_file=request.config.getoption("tests_log_file"),
315324
)
316325

317326
data = shared_fixture.acquire()
318327
payload = data.fixture_return_value or {}
319328
stack_name = payload.get("name")
320-
self._local[key] = (shared_fixture, stack_name)
329+
local_cache[key] = (shared_fixture, stack_name)
321330
return stack_name
322331

323-
def release_all(self):
324-
# Release once per key so the last releaser triggers generator cleanup (deletion if managed=True).
325-
for shared_fixture, _ in self._local.values():
326-
try:
327-
shared_fixture.release()
328-
except Exception as e:
329-
logging.warning("Error releasing shared fixture: %s", e)
330-
331-
332-
@xdist_session_fixture(autouse=True)
333-
def directory_shared_registry(cfn_stacks_factory, request, vpc_stacks_shared):
334-
"""
335-
Session-scoped, cross-worker shared registry for directory stacks (lazy).
336-
Each (region, directory_type) is provisioned on first request.
337-
"""
338-
base_dir = Path(f"{request.config.getoption('output_dir', '')}/tmp/shared_fixtures")
339-
registry = _LazyDirectoryRegistry(
340-
shared_dir=base_dir,
341-
cfn_stacks_factory=cfn_stacks_factory,
342-
request=request,
343-
vpc_stacks_shared=vpc_stacks_shared,
344-
)
345-
try:
346-
yield registry
347-
finally:
348-
registry.release_all()
349-
350-
351-
@pytest.fixture(scope="class")
352-
def directory_factory(directory_shared_registry):
353-
"""Thin adapter to match existing call signature in tests."""
354-
355-
def _factory(existing_directory_stack_name: str, directory_type: str, region: str) -> str:
356-
return directory_shared_registry.get_stack_name(existing_directory_stack_name, directory_type, region)
332+
yield _factory
357333

358-
return _factory
334+
# release once per key so the last releaser triggers generator cleanup when managed=True
335+
for shared_fixture, _ in local_cache.values():
336+
try:
337+
shared_fixture.release()
338+
except Exception as e:
339+
logging.warning("Error releasing shared fixture: %s", e)
359340

360341

361342
def _run_user_workloads(users, test_datadir, shared_storage_mount_dirs):

0 commit comments

Comments
 (0)