diff --git a/src/_canary/finder.py b/src/_canary/finder.py index e2b78cda..098a9200 100644 --- a/src/_canary/finder.py +++ b/src/_canary/finder.py @@ -72,6 +72,7 @@ def discover(self, pedantic: bool = True) -> list[AbstractTestGenerator]: generators: set[AbstractTestGenerator] = set() for root, paths in self.roots.items(): found, e = config.pluginmanager.hook.canary_discover_generators(root=root, paths=paths) + print(found) generators.update(found) errors += e logger.debug(f"Found {len(found)} test files in {root}") diff --git a/src/_canary/session.py b/src/_canary/session.py index 5849adf5..9232d118 100644 --- a/src/_canary/session.py +++ b/src/_canary/session.py @@ -28,6 +28,7 @@ from .third_party.lock import LockError from .util import cpu_count from .util import logging +from .util import serialize from .util.filesystem import find_work_tree from .util.filesystem import force_remove from .util.filesystem import mkdirp @@ -283,9 +284,9 @@ def dump_testcases(self) -> None: [case.save() for case in self.cases] else: cpus = cpu_count() - args = ((case.getstate(), case.lockfile) for case in self.cases) + args = ((case.lockfile, case.getstate()) for case in self.cases) pool = multiprocessing.Pool(cpus) - pool.imap_unordered(json_dump, args, chunksize=len(self.cases) // cpus or 1) + pool.imap_unordered(dump_obj, args, chunksize=len(self.cases) // cpus or 1) pool.close() pool.join() index = {case.id: [dep.id for dep in case.dependencies] for case in self.cases} @@ -326,12 +327,7 @@ def load_testcases(self, ids: list[str] | None = None) -> list[TestCase]: continue # see TestCase.lockfile for file pattern file = self.db.join_path("cases", id[:2], id[2:], TestCase._lockfile) - with self.db.open(file) as fh: - try: - state = json.load(fh) - except json.JSONDecodeError: - logger.warning(f"Unable to load {file}!") - continue + state = serialize.load(file) state["properties"]["work_tree"] = self.work_tree for i, dep_state in enumerate(state["properties"]["dependencies"]): # assign dependencies from existing dependencies @@ -677,11 +673,9 @@ def handle_signal(sig, frame): raise SystemExit(sig) -def json_dump(args): - data, filename = args - mkdirp(os.path.dirname(filename)) - with open(filename, "w") as fh: - json.dump(data, fh, indent=2) +def dump_obj(args): + filename, obj = args + serialize.dump(filename, obj) def prefix_bits(byte_array: bytes, bits: int) -> int: diff --git a/src/_canary/testcase.py b/src/_canary/testcase.py index d9b05d3d..3d932893 100644 --- a/src/_canary/testcase.py +++ b/src/_canary/testcase.py @@ -41,8 +41,7 @@ from .status import Status from .util import filesystem as fs from .util import logging -from .util._json import safeload -from .util._json import safesave +from .util import serialize from .util.compression import compress_str from .util.executable import Executable from .util.filesystem import copyfile @@ -1419,13 +1418,13 @@ def copy_sources_to_workdir(self) -> None: def save(self): lockfile = self.lockfile - safesave(lockfile, self.getstate()) + serialize.dump(lockfile, self.getstate()) file = os.path.join(self.working_directory, self._lockfile) mkdirp(os.path.dirname(file)) fs.force_symlink(lockfile, file) def _load_lockfile(self) -> dict[str, Any]: - return safeload(self.lockfile) + return serialize.load(self.lockfile) def refresh(self, propagate: bool = True) -> None: try: @@ -1612,19 +1611,6 @@ def teardown(self) -> None: else: fs.force_remove(file) - def dump(self, fname: str | IO[Any]) -> None: - file: IO[Any] - own_fh = False - if isinstance(fname, str): - file = open(fname, "w") - own_fh = True - else: - file = fname - state = self.getstate() - json.dump(state, file, indent=2) - if own_fh: - file.close() - def getstate(self) -> dict[str, Any]: """Return a serializable dictionary from which the test case can be later loaded""" state: dict[str, Any] = {"type": self.__class__.__name__} @@ -1924,8 +1910,7 @@ def from_state(state: dict[str, Any]) -> TestCase | TestMultiCase: def from_lockfile(lockfile: str) -> TestCase | TestMultiCase: - with open(lockfile) as fh: - state = json.load(fh) + state = serialize.load(lockfile) return from_state(state) diff --git a/src/_canary/testinstance.py b/src/_canary/testinstance.py index 1b874aa0..c20f909c 100644 --- a/src/_canary/testinstance.py +++ b/src/_canary/testinstance.py @@ -14,8 +14,7 @@ from .testcase import TestCase from .testcase import TestMultiCase from .testcase import from_lockfile as testcase_from_lockfile -from .util._json import safeload -from .util._json import safesave +from .util import serialize key_type = tuple[str, ...] | str index_type = tuple[int, ...] | int @@ -271,10 +270,10 @@ def gpus(self) -> int: return len(self.gpu_ids) def set_attribute(self, **kwargs: Any) -> None: - state = safeload(self.lockfile) + state = serialize.load(self.lockfile) self.attributes.update(kwargs) state["properties"].setdefault("instance_attributes").update(self.attributes) - safesave(self.lockfile, state) + serialize.dump(self.lockfile, state) def get_dependency(self, **params: Any) -> "TestInstance | None": for dep in self.dependencies: diff --git a/src/_canary/util/_json.py b/src/_canary/util/_json.py deleted file mode 100644 index 5b5d78d0..00000000 --- a/src/_canary/util/_json.py +++ /dev/null @@ -1,41 +0,0 @@ -import json -import os -import time -from typing import Any - -from .filesystem import mkdirp -from .string import pluralize - - -def safesave(file: str, state: dict[str, Any]) -> None: - dirname, basename = os.path.split(file) - tmp = os.path.join(dirname, f".{basename}.tmp") - mkdirp(dirname) - try: - with open(tmp, "w") as fh: - json.dump(state, fh, indent=2) - os.replace(tmp, file) - finally: - if os.path.exists(tmp): - os.remove(tmp) - - -def safeload(file: str, attempts: int = 8) -> dict[str, Any]: - delay = 0.5 - attempt = 0 - while attempt <= attempts: - # Guard against race condition when multiple batches are running at once - attempt += 1 - try: - with open(file, "r") as fh: - return json.load(fh) - except Exception: - time.sleep(delay) - delay *= 2 - raise FailedToLoadError( - f"Failed to load {file} after {attempts} {pluralize('attempt', attempts)}" - ) - - -class FailedToLoadError(Exception): - pass diff --git a/src/_canary/util/serialize.py b/src/_canary/util/serialize.py new file mode 100644 index 00000000..226ed31e --- /dev/null +++ b/src/_canary/util/serialize.py @@ -0,0 +1,51 @@ +import json +import pickle # nosec B403 +import time +from pathlib import Path +from typing import Any + +from .string import pluralize + +PICKLE = 0 +JSON = 1 + +protocol = JSON + + +def dump(file: str | Path, obj: Any) -> None: + file = Path(file) + file.parent.mkdir(parents=True, exist_ok=True) + tmp = file.with_suffix(".tmp") + if protocol == JSON: + with open(tmp, "w") as fh: + json.dump(obj, fh, indent=2) + else: + with open(tmp, "wb") as fh: + pickle.dump(obj, fh) # nosec B301 + tmp.replace(file) + + +def load(file: str | Path, attempts: int = 8) -> Any: + file = Path(file) + delay = 0.5 + attempt = 0 + while attempt <= attempts: + # Guard against race condition when multiple batches are running at once + attempt += 1 + try: + if protocol == JSON: + with open(file, "r") as fh: + return json.load(fh) + else: + with open(file, "rb") as fh: + return pickle.load(fh) # nosec B301 + except Exception: + time.sleep(delay) + delay *= 2 + raise FailedToLoadError( + f"Failed to load {file.name} after {attempts} {pluralize('attempt', attempts)}" + ) + + +class FailedToLoadError(Exception): + pass