diff --git a/.coveragerc b/.coveragerc index 7091fa23..43f1bd27 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,5 +1,11 @@ +[run] +# Trace only odev/ so temp plugin files (e.g. test_16 cycle manifests) are not recorded. +source = odev + [report] omit = */tests/* +fail_under = 60 +show_missing = true exclude_lines = pragma: no cover raise NotImplementedError diff --git a/.github/workflows/odev.yml b/.github/workflows/odev.yml index a3ed95a9..48f4f7f7 100644 --- a/.github/workflows/odev.yml +++ b/.github/workflows/odev.yml @@ -6,6 +6,10 @@ on: - opened - reopened - synchronize + push: + branches: + - beta + workflow_dispatch: jobs: @@ -36,6 +40,14 @@ jobs: uses: actions/setup-python@v5.0.0 with: python-version: '3.10' + cache: pip + cache-dependency-path: .pre-commit-config.yaml + + - name: cache-pre-commit + uses: actions/cache@v4 + with: + path: ~/.cache/pre-commit + key: pre-commit-${{ runner.os }}-${{ hashFiles('.pre-commit-config.yaml') }} - name: run-pre-commit uses: pre-commit/action@v3.0.0 @@ -54,19 +66,33 @@ jobs: - "3.10" - "3.11" - "3.12" + - "3.13" steps: + - name: checkout-repository + uses: actions/checkout@v4 + with: + fetch-depth: 0 + ref: ${{ github.event.pull_request.head.sha }} + - name: setup-python + id: setup-python uses: actions/setup-python@v5.0.0 with: + # Matrix Python runs pytest. Extra interpreters are available if tests or + # tooling need them; database command tests mock odoo-bin and do not clone Odoo. python-version: | - 3.10 ${{ matrix.python-version }} + 3.10 3.12 architecture: x64 + cache: pip + cache-dependency-path: | + requirements.txt + requirements-dev.txt - name: setup-system-dependencies - uses: awalsh128/cache-apt-pkgs-action@latest + uses: awalsh128/cache-apt-pkgs-action@v1.5.3 with: packages: postgresql postgresql-client python3-pip libldap2-dev libpq-dev libsasl2-dev build-essential python3-dev libffi-dev version: 1.1 @@ -75,46 +101,36 @@ jobs: run: | sudo service postgresql start sudo -u postgres createuser -s $USER + for i in {1..10}; do pg_isready -h localhost -p 5432 && exit 0; sleep 1; done + exit 1 - - name: checkout-repository - uses: actions/checkout@v4 - with: - fetch-depth: 0 - ref: ${{ github.event.pull_request.head.sha }} - - - name: restore-odoo-repositories - id: restore-odoo-repositories - uses: actions/cache/restore@v4 - with: - path: ~/odoo/repositories - key: odoo-repositories-${{ matrix.python-version }} - - - name: clone-odoo-repositories - if: steps.restore-odoo-repositories.outputs.cache-hit != 'true' - run: | - git clone --depth 1 https://github.com/odoo/odoo ~/odoo/repositories/odoo/odoo --branch master - cd ~/odoo/repositories/odoo/odoo - git config remote.origin.fetch "+refs/heads/*:refs/remotes/origin/*" - git fetch --depth 1 origin 18.0 - - - name: save-odoo-repositories - if: steps.restore-odoo-repositories.outputs.cache-hit != 'true' - id: save-odoo-repositories - uses: actions/cache/save@v4 + - name: cache-python-venv + id: cache-venv + uses: actions/cache@v4 with: - path: ~/odoo/repositories - key: odoo-repositories-${{ matrix.python-version }} + path: .venv-ci + key: ${{ runner.os }}-py${{ steps.setup-python.outputs.python-version }}-venv-${{ hashFiles('requirements.txt', 'requirements-dev.txt') }} - name: setup-python-requirements run: | - python -m ensurepip --upgrade - if [ -f requirements.txt ]; then pip install -r requirements.txt; fi - if [ -f requirements-dev.txt ]; then pip install -r requirements-dev.txt; fi + if [ "${{ steps.cache-venv.outputs.cache-hit }}" != 'true' ]; then + python -m venv .venv-ci + .venv-ci/bin/python -m pip install --upgrade pip + if [ -f requirements.txt ]; then .venv-ci/bin/pip install -r requirements.txt; fi + if [ -f requirements-dev.txt ]; then .venv-ci/bin/pip install -r requirements-dev.txt; fi + fi + echo "${{ github.workspace }}/.venv-ci/bin" >> "$GITHUB_PATH" - name: run-unit-tests id: unit-tests env: - POSTGRES_HOST: postgres + POSTGRES_HOST: localhost POSTGRES_PORT: 5432 + PYTHONPATH: ${{ github.workspace }} run: | - coverage run -m pytest ./tests --exitfirst + if [ "${{ matrix.python-version }}" = "3.12" ]; then + coverage run -m pytest + coverage report + else + pytest + fi diff --git a/.gitignore b/.gitignore index 9ed155b4..30a1c6aa 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,7 @@ tests/plugins/* # --- Temporary and testing files _*/ tmp/ + +# --- AI files +.cursor/ +.agents/ diff --git a/docs/CONTRIBUTING.md b/docs/CONTRIBUTING.md index 91e5c4e2..a5cc2369 100644 --- a/docs/CONTRIBUTING.md +++ b/docs/CONTRIBUTING.md @@ -65,11 +65,11 @@ Start with your changes! ### Test your changes Happy with your modifications to the odev codebase? Then it's time to test it and make sure everything still works as -expected! Run `coverage run -m pytest tests` in your terminal, if any of the tests fails you will need to correct your -code until it passes. +expected! Run `coverage run -m pytest tests && coverage report` in your terminal, if any of the tests fails you will need +to correct your code until it passes. You implemented a brand new feature? Then it's probably good to implement new tests for it! Check what's inside the -[tests](./_tests/) directory for examples. +[tests](../tests/) directory for examples. If you want to check the coverage of your code, you can now run `coverage html` and open the file `./htmlcov/index.html` in your favorite browser. diff --git a/odev/__main__.py b/odev/__main__.py index 5df45d48..78a1afbd 100644 --- a/odev/__main__.py +++ b/odev/__main__.py @@ -35,7 +35,7 @@ def main(): odev = init_framework() odev.start(start_time) logger.debug(f"Framework started in {monotonic() - start_time:.3f} seconds") - odev.dispatch() + sys.exit(0 if odev.dispatch() else 1) except OdevError as error: logger.error(error) diff --git a/odev/_version.py b/odev/_version.py index 1c533a3d..73bc621f 100644 --- a/odev/_version.py +++ b/odev/_version.py @@ -22,4 +22,4 @@ # or merged change. # ------------------------------------------------------------------------------ -__version__ = "4.24.0" +__version__ = "4.27.1" diff --git a/odev/commands/database/cloc.py b/odev/commands/database/cloc.py index 1c4d4bba..7facb90b 100644 --- a/odev/commands/database/cloc.py +++ b/odev/commands/database/cloc.py @@ -40,7 +40,7 @@ def run(self): process = self.odoobin.run(args=self.args.odoo_args, subcommand=self._name, stream=False) - if process is None: + if process is None or process.returncode: raise self.error("Failed to fetch cloc result.") headers = [ diff --git a/odev/commands/database/create.py b/odev/commands/database/create.py index aae9971d..413fe74f 100644 --- a/odev/commands/database/create.py +++ b/odev/commands/database/create.py @@ -39,6 +39,7 @@ class CreateCommand(OdoobinTemplateCommand): ) version_argument = args.String( name="version", + aliases=["-V", "--version"], description="""The Odoo version to use for the new database. If not specified and a template is provided, the version of the template database will be used. Otherwise, the version will default to "master". @@ -184,7 +185,7 @@ def initialize_database(self) -> None: process.with_worktree(self.worktree) try: - run_process = process.run(args=args, progress=self.odoobin_progress) + run_process = process.run(args=args, progress=self.odoobin_progress, prepare=True) self.console.print() except OdevError as error: logger.error(str(error)) diff --git a/odev/commands/database/delete.py b/odev/commands/database/delete.py index 5509d366..548ec6a4 100644 --- a/odev/commands/database/delete.py +++ b/odev/commands/database/delete.py @@ -76,7 +76,7 @@ def run(self): databases_list: str = string.join_and([f"{db!r}" for db in databases]) logger.warning(f"You are about to delete the following databases: {databases_list}") - if not self.console.confirm("Are you sure?", default=False): + if not self.console.confirm("Are you sure?", default=self.args.bypass_prompt): raise self.error("Command aborted") tracker = progress.Progress() @@ -84,7 +84,7 @@ def run(self): tracker.start() for database in databases: - with silence_loggers(__name__): + with silence_loggers(__name__), self.console.force_bypass_prompt(force=True): self.delete_one(LocalDatabase(database)) tracker.update(task, advance=1) diff --git a/odev/commands/database/run.py b/odev/commands/database/run.py index 764a3348..3ae8a81d 100644 --- a/odev/commands/database/run.py +++ b/odev/commands/database/run.py @@ -76,4 +76,7 @@ def run(self): if self.odoobin.is_running: raise self.error(f"Database {self._database.name!r} is already running") - self.odoobin.run(args=self.args.odoo_args, progress=self.odoobin_progress) + process = self.odoobin.run(args=self.args.odoo_args, progress=self.odoobin_progress) + + if process and process.returncode: + raise self.error("Odoo process failed") diff --git a/odev/commands/database/test.py b/odev/commands/database/test.py index 4edae165..cc96bc61 100644 --- a/odev/commands/database/test.py +++ b/odev/commands/database/test.py @@ -38,6 +38,11 @@ class TestCommand(OdoobinCommand): description="Comma-separated list of modules to install for testing. If not set, install the base module.", ) + @property + def _database_exists_required(self) -> bool: + """Return True if a database has to exist for the command to work.""" + return not bool(self.args.version) + def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.test_files: list[str] = [] @@ -84,8 +89,8 @@ def create_test_database(self): """Return the arguments to pass to the create command.""" args = ["--bare"] - if self._database.version is not None: - args.extend(["--version", str(self._database.version)]) + if self.version is not None: + args.extend(["--version", str(self.version)]) args.append(self.test_database.name) self.odev.run_command("create", *args) @@ -109,8 +114,14 @@ def run_test_database(self): self.create_test_database() odoobin = self.test_database.process or OdoobinProcess(self.test_database) - odoobin.with_version(self._database.version) - odoobin.with_edition(self._database.edition) + odoobin.with_version(self.version) + + edition = ( + "enterprise" + if self.args.enterprise or (self._database.exists and self._database.edition == "enterprise") + else "community" + ) + odoobin.with_edition(edition) odoobin.with_venv(self.venv) odoobin.with_worktree(self.worktree) @@ -126,16 +137,20 @@ def run_test_database(self): def odoobin_progress(self, line: str): """Handle odoo-bin output and fetch information real-time.""" - if re.match(r"^(i?pu?db)?>+", line): + if re.match(r"^(?:ipdb|pudb|pdb)>+|^\(Pdb\)|(?:^>\s+.*\.(?:py|js)\(\d+\))", line): raise self.error("Debugger detected in odoo-bin output, remove breakpoints and try again") problematic_test_levels = ("warning", "error", "critical") match = self._parse_progress_log_line(line) - if match is None: - if self.last_level in problematic_test_levels: + if match is None or not self.args.pretty: + if match is None and self.last_level in problematic_test_levels: self.test_buffer.append(line) + if not self.args.pretty: + self.print(line, highlight=False, soft_wrap=False) + return + color = f"logging.level.{self.last_level}" if self.last_level in problematic_test_levels else "color.black" self.print(string.stylize(line, color), highlight=False, soft_wrap=False) return diff --git a/odev/commands/git/fetch.py b/odev/commands/git/fetch.py index 846fb55b..02598e5a 100644 --- a/odev/commands/git/fetch.py +++ b/odev/commands/git/fetch.py @@ -64,6 +64,10 @@ def grouped_changes(self) -> dict[str, list[tuple[str, int, int]]]: changes: dict[str, list[tuple[str, int, int]]] = {} for repository in self.repositories: + try: + repository.prune_worktrees() + except Exception as e: # noqa: BLE001 + logger.debug(f"Failed to prune worktrees for {repository.name!r}: {e}") repository.fetch(detached=False) for name, worktrees in self.grouped_worktrees.items(): @@ -75,7 +79,7 @@ def grouped_changes(self) -> dict[str, list[tuple[str, int, int]]]: if not changes: if self.args.worktree: - raise self.error(f"Worktree with name {self.args.name!r} does not exist") + raise self.error(f"Worktree with name {self.args.worktree!r} does not exist") raise self.error("No worktrees found") return changes diff --git a/odev/commands/git/worktree.py b/odev/commands/git/worktree.py index 9711c68b..1fda8292 100644 --- a/odev/commands/git/worktree.py +++ b/odev/commands/git/worktree.py @@ -90,7 +90,8 @@ def create_worktree(self): self.__check_name() if self.args.name in self.grouped_worktrees: - raise self.error(f"Worktree with name '{self.args.name}' already exists") + logger.info(f"Worktree with name '{self.args.name}' already exists") + return with progress.spinner(f"Creating worktree {self.args.name}"): for repository in self.repositories: diff --git a/odev/common/bash.py b/odev/common/bash.py index 1546fa7f..2e952eeb 100644 --- a/odev/common/bash.py +++ b/odev/common/bash.py @@ -36,7 +36,9 @@ # --- Helpers ------------------------------------------------------------------ -def __run_command(command: str, capture: bool = True, sudo_password: str | None = None) -> CompletedProcess[bytes]: +def __run_command( + command: str, capture: bool = True, sudo_password: str | None = None, env: dict[str, str] | None = None +) -> CompletedProcess[bytes]: """Execute a command as a subprocess. If `sudo_password` is provided and not `None`, the command will be executed with elevated privileges. @@ -45,6 +47,7 @@ def __run_command(command: str, capture: bool = True, sudo_password: str | None :param bool capture: Whether to capture the output of the command. :param str sudo_password: The password to use when executing the command with elevated privileges. + :param dict env: The environment variables to use when executing the command. :return: The result of the command execution. :rtype: CompletedProcess """ @@ -58,6 +61,7 @@ def __run_command(command: str, capture: bool = True, sudo_password: str | None check=True, capture_output=capture, input=sudo_password.encode() if sudo_password is not None else None, + env=env, ) @@ -80,7 +84,9 @@ def __raise_or_log(exception: CalledProcessError, do_raise: bool) -> None: # --- Public API --------------------------------------------------------------- -def execute(command: str, sudo: bool = False, raise_on_error: bool = True) -> CompletedProcess[bytes] | None: +def execute( + command: str, sudo: bool = False, raise_on_error: bool = True, env: dict[str, str] | None = None +) -> CompletedProcess[bytes] | None: """Execute a command in the operating system and wait for it to complete. Output of the command will be captured and returned after the execution completes. @@ -97,7 +103,7 @@ def execute(command: str, sudo: bool = False, raise_on_error: bool = True) -> Co """ try: logger.debug(f"Running process: {shlex.quote(command)}") - process_result = __run_command(command) + process_result = __run_command(command, env=env) except CalledProcessError as exception: # If already running as root, sudo will not work if not sudo or not os.geteuid(): @@ -112,7 +118,7 @@ def execute(command: str, sudo: bool = False, raise_on_error: bool = True) -> Co return None try: - process_result = __run_command(command, sudo_password=sudo_password) + process_result = __run_command(command, sudo_password=sudo_password, env=env) except CalledProcessError as exception: sudo_password = None __raise_or_log(exception, raise_on_error) @@ -121,15 +127,16 @@ def execute(command: str, sudo: bool = False, raise_on_error: bool = True) -> Co return process_result -def run(command: str) -> CompletedProcess: +def run(command: str, env: dict[str, str] | None = None) -> CompletedProcess: """Execute a command in the operating system and wait for it to complete. Output of the command will not be captured and will be printed to the console in real-time. :param str command: The command to execute. + :param dict env: The environment variables to use when executing the command. """ logger.debug(f"Running process: {shlex.quote(command)}") - return __run_command(command, capture=False) + return __run_command(command, capture=False, env=env) def detached(command: str) -> Popen[bytes]: @@ -141,15 +148,16 @@ def detached(command: str) -> Popen[bytes]: return Popen(command, shell=True, start_new_session=True, stdout=DEVNULL, stderr=DEVNULL) # noqa: S602 - intentional use of shell=True -def stream(command: str) -> Generator[str, None, None]: # noqa: PLR0912 +def stream(command: str, env: dict[str, str] | None = None) -> Generator[str, None, None]: # noqa: PLR0912 """Execute a command in the operating system and stream its output line by line. :param str command: The command to execute. + :param dict env: The environment variables to use when executing the command. """ logger.debug(f"Streaming process: {shlex.quote(command)}") if not sys.stdin.isatty(): logger.warning("STDIN is not a TTY, running command in non-interactive mode") - exec_process = execute(command) + exec_process = execute(command, env=env) if not exec_process: yield "" @@ -163,14 +171,17 @@ def stream(command: str) -> Generator[str, None, None]: # noqa: PLR0912 tty.setraw(sys.stdin.fileno()) master, slave = pty.openpty() + process: Popen | None = None + try: - process = Popen( # noqa: S603 - shlex.split(command), + process = Popen( # noqa: S602 + command, stdout=slave, stderr=slave, stdin=slave, start_new_session=True, - universal_newlines=True, + shell=True, + env=env, ) received_buffer: bytes = b"" @@ -217,5 +228,6 @@ def stream(command: str) -> Generator[str, None, None]: # noqa: PLR0912 os.close(slave) os.close(master) termios.tcsetattr(sys.stdin, termios.TCSADRAIN, original_tty) - if process.returncode: + + if process is not None and process.returncode: raise CalledProcessError(process.returncode, command) diff --git a/odev/common/cache.py b/odev/common/cache.py new file mode 100644 index 00000000..c981f1e0 --- /dev/null +++ b/odev/common/cache.py @@ -0,0 +1,32 @@ +"""Caching implementations for various purposes.""" + +from datetime import datetime +from typing import Any + + +class TTLCache: + def __init__(self, ttl: int): + self.ttl = ttl + self.cache = {} + + def get(self, key: str) -> Any | None: + if key not in self.cache: + return None + + if (datetime.now() - self.cache[key]["timestamp"]).total_seconds() > self.ttl: + del self.cache[key] + return None + + return self.cache[key]["value"] + + def set(self, key: str, value: Any) -> None: + self.cache[key] = {"value": value, "timestamp": datetime.now()} + + def __contains__(self, key: str) -> bool: + return key in self.cache + + def __len__(self) -> int: + return len(self.cache) + + def __repr__(self) -> str: + return f"TTLCache(ttl={self.ttl}, cached={len(self.cache)})" diff --git a/odev/common/commands/git.py b/odev/common/commands/git.py index 811562c9..c6570509 100644 --- a/odev/common/commands/git.py +++ b/odev/common/commands/git.py @@ -4,8 +4,11 @@ from odev.common import args from odev.common.commands import Command from odev.common.connectors import GitConnector, GitWorktree +from odev.common.logging import logging from odev.common.odoobin import odoo_repositories -from odev.common.version import OdooVersion + + +logger = logging.getLogger(__name__) class GitCommand(Command, ABC): @@ -23,10 +26,12 @@ def worktrees(self) -> Generator[GitWorktree, None, None]: """Iterate over worktrees in Odoo repositories.""" for repository in self.repositories: for worktree in repository.worktrees(): - if not worktree.detached and ( - not self.args.version or OdooVersion(worktree.branch) == OdooVersion(self.args.version) - ): - yield worktree + if not worktree.path.exists(): + logger.debug(f"Skipping missing worktree {worktree.name!r} at {worktree.path!s}") + continue + if hasattr(self, "args") and self.args.version and worktree.name != self.args.version: + continue + yield worktree @property def grouped_worktrees(self) -> dict[str, list[GitWorktree]]: diff --git a/odev/common/commands/odoobin.py b/odev/common/commands/odoobin.py index b3f88405..51409df1 100644 --- a/odev/common/commands/odoobin.py +++ b/odev/common/commands/odoobin.py @@ -142,6 +142,12 @@ def version(self) -> OdooVersion: if self._database.version: return self._database.version + if self._database.exists and not self._database.is_odoo: + logger.warning( + f"Database {self._database.name!r} is not an Odoo database. Defaulting to 'master'. " + f"Consider using 'odev create -V {self._database.name}' to initialize it properly." + ) + return OdooVersion("master") @property diff --git a/odev/common/config.py b/odev/common/config.py index 806cbf23..b17bdc37 100644 --- a/odev/common/config.py +++ b/odev/common/config.py @@ -83,6 +83,17 @@ def dumps(self) -> Path: def dumps(self, value: str | Path): self.set("dumps", value.as_posix() if isinstance(value, Path) else value) + @property + def upgrade(self) -> Path: + """Path to the directory where Odoo Enterprise migration scripts are stored. + Defaults to ~/odoo/repositories/odoo/upgrade. + """ + return Path(cast(str, self.get("upgrade", "~/odoo/repositories/odoo/upgrade"))).expanduser() + + @upgrade.setter + def upgrade(self, value: str | Path): + self.set("upgrade", value.as_posix() if isinstance(value, Path) else value) + class UpdateSection(Section): """Configuration for odev auto-updates.""" diff --git a/odev/common/connectors/git.py b/odev/common/connectors/git.py index 4401021d..3c805672 100644 --- a/odev/common/connectors/git.py +++ b/odev/common/connectors/git.py @@ -213,10 +213,18 @@ def pending_changes(self) -> tuple[int, int]: :return: A tuple of commits behind and ahead. :rtype: Tuple[int, int] """ + if self.detached: + return 0, 0 repo = Repo(self.path) - rev_list: str = repo.git.rev_list("--left-right", "--count", "@{u}...HEAD") - commits_behind, commits_ahead = (int(commits) for commits in rev_list.split("\t")) - return commits_behind, commits_ahead + try: + rev_list: str = repo.git.rev_list("--left-right", "--count", "@{u}...HEAD") + commits_behind, commits_ahead = (int(commits) for commits in rev_list.split("\t")) + except GitCommandError as e: + if "no upstream configured" in str(e) or "does not point to a branch" in str(e): + return 0, 0 + raise + else: + return commits_behind, commits_ahead class GitConnector(Connector): @@ -244,10 +252,13 @@ def __init__(self, repo: str, path: Path | None = None): """ super().__init__() + # If repo looks like an absolute path, use it as the path + if not path and repo and Path(repo).is_absolute(): + path = Path(repo) + self._path: Path | None = path - """Forced path to the git repository on the local system.""" - if path: + if path and path.joinpath(".git").exists(): repo_url = Repo(path).remote().url self._organization, self._repository = repo_url.removesuffix(".git").split("/")[-2:] @@ -363,6 +374,16 @@ def branch(self) -> str | None: return self.repository.active_branch.name.split("/")[-1] + @property + def is_dirty(self) -> bool: + """Whether the repository has uncommitted changes.""" + return self.repository.is_dirty(untracked_files=True) if self.exists and self.repository else False + + @property + def is_protected_branch(self) -> bool: + """Whether the current branch is a protected branch (main or master).""" + return self.branch in ("main", "master") + @property def requirements_path(self) -> Path: """Path to the requirements.txt path of the repo, if present.""" diff --git a/odev/common/connectors/postgres.py b/odev/common/connectors/postgres.py index 73c7073a..92904bfa 100644 --- a/odev/common/connectors/postgres.py +++ b/odev/common/connectors/postgres.py @@ -41,7 +41,7 @@ def transaction(self): except Exception as e: self.execute("ROLLBACK") raise e from e - finally: + else: self.execute("COMMIT") @@ -107,8 +107,10 @@ def invalidate_cache(self, database_name: str | None = None): def nocache(self): """Context manager to disable caching of SQL queries.""" self.__class__._nocache = True - yield - self.__class__._nocache = False + try: + yield + finally: + self.__class__._nocache = False def query( self, diff --git a/odev/common/connectors/rest.py b/odev/common/connectors/rest.py index 75c63e06..2fd2eb4b 100644 --- a/odev/common/connectors/rest.py +++ b/odev/common/connectors/rest.py @@ -44,6 +44,9 @@ class RestConnector(Connector, ABC): _bypass_cache: ClassVar[bool] = False """Whether to bypass the cache for the current request.""" + _default_timeout: ClassVar[float] = 30.0 + """Default timeout in seconds for outbound HTTP requests.""" + def __init__(self, url: str): """Initialize the connector. :param url: The URL of the endpoint. @@ -133,6 +136,19 @@ def _load_cookies(self): if cookie: self._connection.cookies.set(key, cookie, domain=domain) + def _get_cookie_header(self) -> str: + """Return the cookies as a string suitable for the 'Cookie' header.""" + if self._connection is None: + return "" + + cookies = [] + for domain in self.session_domains: + for key in self.session_cookies: + value = self._connection.cookies.get(key, domain=domain) + if value: + cookies.append(f"{key}={value}") + return "; ".join(cookies) + def _save_cookies(self): """Save session cookies to the secrets vault.""" if self._connection is None: @@ -202,8 +218,10 @@ def nocache(self): """Context manager to disable caching of HTTP requests.""" bypass_cache = RestConnector._bypass_cache RestConnector._bypass_cache = True - yield - RestConnector._bypass_cache = bypass_cache + try: + yield + finally: + RestConnector._bypass_cache = bypass_cache def _request( self, @@ -244,6 +262,7 @@ def _request( url = self.url + path kwargs.setdefault("allow_redirects", True) + kwargs.setdefault("timeout", self._default_timeout) params = kwargs.pop("params", {}) obfuscate_params = obfuscate_params or [] obfuscated = {k: "xxxxx" for k in obfuscate_params if k in params} @@ -269,7 +288,7 @@ def _request( logger_message + f" -> [{response.status_code}] {response.reason} ({response.elapsed.total_seconds():.3f} seconds)" ) - except RequestsConnectionError as error: + except (RequestsConnectionError, ConnectionResetError) as error: if retry_on_error: logger.debug(error) return self._request( @@ -283,17 +302,18 @@ def _request( raise ConnectorError(f"Could not connect to {self.name}", self) from error + self.cache(cache_key, response) + self._save_cookies() + if raise_for_status: response.raise_for_status() - self.cache(cache_key, response) - self._save_cookies() return response @abstractmethod def request( self, - method: Literal["GET"] | Literal["POST"], + method: Literal["GET", "POST", "HEAD"], path: str, authenticate: bool = True, params: dict | None = None, @@ -334,6 +354,18 @@ def post(self, path: str, params: dict | None = None, authenticate: bool = True, """ return self.request("POST", path, params=params, authenticate=authenticate, **kwargs) + def head(self, path: str, params: dict | None = None, authenticate: bool = True, **kwargs) -> Response: + """Perform a HEAD request to the endpoint. + Authentication is handled automatically using the Odoo credentials stored in the secrets vault. + + :param path: The path to the resource. + :param params: The parameters to pass to the request. + :param kwargs: Additional keyword arguments to pass to the request. + :return: The response from the endpoint. + :rtype: requests.Response + """ + return self.request("HEAD", path, params=params, authenticate=authenticate, **kwargs) + def download(self, path: str, file_path: Path, progress_message: str = "Downloading", **kwargs) -> Path: """Download a file from the endpoint. diff --git a/odev/common/console.py b/odev/common/console.py index a39b7f31..0a37d4aa 100644 --- a/odev/common/console.py +++ b/odev/common/console.py @@ -28,6 +28,7 @@ from rich.theme import Theme from odev.common import string +from odev.common.deprecation import deprecated __all__ = ["Colors", "console"] @@ -270,11 +271,18 @@ def is_live(self, value: bool): """Set the is_live property.""" Console._is_live = value + @deprecated("Use `force_bypass_prompt` instead.") @contextmanager def no_bypass_prompt(self): """Context manager to temporarily disable prompt bypassing.""" + with self.force_bypass_prompt(force=False): + yield + + @contextmanager + def force_bypass_prompt(self, force: bool = False): + """Context manager to temporarily force prompt bypassing.""" original_bypass = self.bypass_prompt - self.bypass_prompt = False + self.bypass_prompt = force yield self.bypass_prompt = original_bypass diff --git a/odev/common/databases/local.py b/odev/common/databases/local.py index b574839b..6e83a506 100644 --- a/odev/common/databases/local.py +++ b/odev/common/databases/local.py @@ -125,7 +125,7 @@ def venv(self) -> PythonEnv: if self._venv is None: info = self.store.databases.get(self) - if info is None: + if info is None or not info.virtualenv: if self.version is None: return PythonEnv() diff --git a/odev/common/deprecation.py b/odev/common/deprecation.py new file mode 100644 index 00000000..78909e92 --- /dev/null +++ b/odev/common/deprecation.py @@ -0,0 +1,35 @@ +"""Compatibility helpers for deprecation APIs that vary by Python version.""" + +from __future__ import annotations + +import functools +import sys +import warnings +from collections.abc import Callable +from typing import Any, TypeVar + + +__all__ = ["deprecated"] + +if sys.version_info >= (3, 13): + from warnings import deprecated +else: + _DeprecatedFunc = TypeVar("_DeprecatedFunc", bound=Callable[..., Any]) + + def deprecated( + message: str, + /, + *, + category: type[Warning] = DeprecationWarning, + ) -> Callable[[_DeprecatedFunc], _DeprecatedFunc]: + """Shim for ``warnings.deprecated`` (Python 3.13+).""" + + def decorator(func: _DeprecatedFunc) -> _DeprecatedFunc: + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + warnings.warn(message, category, stacklevel=2) + return func(*args, **kwargs) + + return wrapper # type: ignore[return-value] + + return decorator diff --git a/odev/common/odev.py b/odev/common/odev.py index fb2a73d8..0f4e737f 100644 --- a/odev/common/odev.py +++ b/odev/common/odev.py @@ -28,12 +28,11 @@ cast, ) -import networkx as nx from git import GitCommandError, NoSuchPathError, Repo +from networkx import DiGraph, NetworkXUnfeasible, simple_cycles, topological_sort from packaging import version from odev._version import __version__ -from odev.commands.database.delete import DeleteCommand from odev.common import progress, string from odev.common.commands import CommandType from odev.common.commands.database import DatabaseType @@ -259,8 +258,10 @@ def start(self, start_time: float | None = None) -> None: self.start_time = start_time self.plugins_path.mkdir(parents=True, exist_ok=True) - self.check_release() - self.update() + + if self.__should_update_now(): + self.check_release() + self.update() with progress.spinner("Loading commands"): self.register_commands() @@ -274,15 +275,15 @@ def update(self, restart: bool = True, upgrade: bool = False) -> bool: :param restart: Whether to restart the framework after updating. :param upgrade: Whether to force the upgrade process. """ - upgrade |= self.check_upgrade() - logger.debug(f"Checking for updates in {self.name!r}") - upgrade |= self._update(self.path) + repo_updated = self._update(self.path) logger.debug("Checking for updates in plugins") plugins_upgrade = any(self._update(path, plugin) for plugin, path, _ in self.plugins) - if upgrade or plugins_upgrade: + updated = repo_updated or plugins_upgrade or upgrade + + if updated: self.config.update.date = datetime.now(UTC) self._set_version_after_update() self.upgrade() @@ -290,7 +291,7 @@ def update(self, restart: bool = True, upgrade: bool = False) -> bool: if restart: self.restart() - return upgrade + return updated def _update(self, path: Path, plugin: str | None = None) -> bool: """Check for updates in the odev repository and download them if necessary. @@ -316,9 +317,8 @@ def _update(self, path: Path, plugin: str | None = None) -> bool: if git.repository is None: raise OdevError(f"Repository for {self.name!r} not found at {path.as_posix()}") - if not self.__date_check_interval() or not self.__git_branch_behind(git.repository): - git.fetch() - return False + if not self.__git_branch_behind(git.repository): + git.fetch(detached=False) prompt_name = f"plugin {plugin}" if plugin else self.name logger.debug(f"Checking for updates in {git.name!r}") @@ -446,6 +446,7 @@ def prune_databases(self) -> None: logger.debug(f"Last pruning of databases was {last_pruning} days ago") if last_pruning >= PRUNING_INTERVAL: + from odev.commands.database.delete import DeleteCommand # noqa: PLC0415 from odev.common.databases.local import LocalDatabase # noqa: PLC0415 delete_command_cls = cast(type[CommandType], self.commands.get("delete")) @@ -587,8 +588,6 @@ def _register_plugin_commands(self) -> None: f"Loading plugin {plugin.name!r} version {string.stylize(plugin.manifest['version'], 'repr.version')}" ) - self._install_plugin_requirements(plugin.path) - try: importlib.import_module(f"odev.plugins.{plugin.path.name}") except ImportError as error: @@ -597,13 +596,13 @@ def _register_plugin_commands(self) -> None: for command_class in self.import_commands(plugin.path.glob("commands/**")): command_names = [command_class._name] + (list(command_class._aliases) or []) base_command_class = self.commands.get(command_class._name) + action = ( + "Registering" + if base_command_class is None or issubclass(base_command_class, command_class) + else "Patching" + ) - if base_command_class is None or issubclass(base_command_class, command_class): - action = "Registering new command" - else: - action = "Patching existing command" - - logger.debug(f"{action} {command_class._name!r} from plugin {plugin.name!r}") + logger.debug(f"{action} command {command_class._name!r}") if ( command_class._name in self.commands @@ -669,7 +668,7 @@ def install_plugin(self, plugin: str, as_dependency: bool = False) -> None: plugin_path.symlink_to(repository.path, target_is_directory=True) self._load_config() - self._install_plugin_requirements(plugin_path) + PythonEnv().install_requirements(plugin_path) self._setup_plugin(plugin_path, plugin) except Exception as error: plugin_path.unlink(missing_ok=True) @@ -725,16 +724,6 @@ def uninstall_plugin(self, plugin: str) -> None: self._load_config() self._plugins_dependency_tree.cache_clear() - def _install_plugin_requirements(self, plugin_path: Path) -> None: - """Install the requirements of a plugin. - - :param plugin_path: Path to the plugin to install requirements for - """ - python = PythonEnv() - - if any(python.missing_requirements(plugin_path, raise_if_error=False)): - python.install_requirements(plugin_path) - def _setup_plugin(self, plugin_path: Path, plugin: str | None = None) -> None: """Run the setup script of a plugin if it exists. @@ -789,7 +778,7 @@ def _plugins_dependency_tree(self) -> list[str]: """Order plugins by mutual dependencies, the first one in the returned list being the first one that needs to be imported to respect the dependency graph. """ - graph = nx.DiGraph() + graph = DiGraph() for plugin_path in self.plugins_path.iterdir(): manifest = self._load_plugin_manifest(plugin_path) @@ -799,9 +788,18 @@ def _plugins_dependency_tree(self) -> list[str]: graph.add_edge(dependency, manifest["name"]) try: - resolved_graph: list[str] = list(nx.topological_sort(graph)) + resolved_graph: list[str] = list(topological_sort(graph)) logger.debug(f"Resolved plugins dependency tree:\n{join_bullet(resolved_graph)}") - except nx.NetworkXUnfeasible as exception: + except NetworkXUnfeasible as exception: + cycles = list(simple_cycles(graph))[:20] + if cycles: + parts: list[str] = [] + for c in cycles: + if len(c) == 1: + parts.append(f"{c[0]} depends on itself") + else: + parts.append(" → ".join([*c, c[0]])) + raise OdevError("Circular dependency detected in plugins: " + "; ".join(parts)) from exception raise OdevError("Circular dependency detected in plugins") from exception return resolved_graph @@ -886,9 +884,10 @@ def run_command( return not command_errored - def dispatch(self, argv: list[str] | None = None) -> None: + def dispatch(self, argv: list[str] | None = None) -> bool: """Handle commands and arguments as received from the terminal. :param argv: Optional list of command-line arguments used to override arguments received from the CLI. + :return: True if the command were executed successfully, False otherwise. """ argv = (argv or sys.argv)[1:] @@ -906,7 +905,7 @@ def dispatch(self, argv: list[str] | None = None) -> None: logger.debug("Help argument or no command provided, falling back to help command") argv.insert(0, "help") - self.run_command(argv[0], *argv[1:], history=True) + return self.run_command(argv[0], *argv[1:], history=True) def check_release(self) -> None: """Check if a new release is available.""" @@ -957,7 +956,7 @@ def __checkout_release_channel(self, repo: GitConnector, branch: str) -> None: with contextlib.suppress(GitCommandError): repo.checkout(branch) - self._install_plugin_requirements(repo.path) + PythonEnv().install_requirements(repo.path) def __filter_commands(self, attribute: Any) -> bool: """Filter module attributes to extract commands. @@ -1059,7 +1058,7 @@ def __requirements_changed(self, repository: Repo) -> bool: return bool(diff) - def __date_check_interval(self) -> bool: + def __should_update_now(self) -> bool: """Check whether the last check date is older than today minus the check interval. :return: Whether the last check date is older than today minus the check interval diff --git a/odev/common/odoobin.py b/odev/common/odoobin.py index bf8372ad..980c3fe7 100644 --- a/odev/common/odoobin.py +++ b/odev/common/odoobin.py @@ -10,14 +10,15 @@ from subprocess import CalledProcessError, CompletedProcess from typing import ( TYPE_CHECKING, + ClassVar, Literal, cast, ) -from cachetools.func import ttl_cache from packaging.version import Version from odev.common import bash, string +from odev.common.cache import TTLCache from odev.common.connectors import GitConnector, GitWorktree from odev.common.databases import Branch, Repository from odev.common.databases.remote import RemoteDatabase @@ -48,6 +49,9 @@ ODOO_ENTERPRISE_REPOSITORIES: list[str] = ["odoo/enterprise"] +ODOO_UPGRADE_REPOSITORY: str = "odoo/upgrade" + + ODOO_PYTHON_VERSIONS: Mapping[int, str] = { 19: "3.12", 16: "3.10", @@ -72,6 +76,8 @@ def odoo_repositories(enterprise: bool = True) -> Generator[GitConnector, None, class OdoobinProcess(OdevFrameworkMixin): """Class to manage an odoo-bin process.""" + cache_ps_process: ClassVar[TTLCache] = TTLCache(ttl=1) + def __init__( self, database: "LocalDatabase", @@ -273,7 +279,7 @@ def odoo_addons_paths(self) -> list[Path]: """Return the list of Odoo addons paths.""" return [ worktree.path / addon - for addon in ["", "addons", "odoo/addons", "openerp/addons"] + for addon in ["", "addons"] for worktree in self.odoo_worktrees if OdoobinProcess.check_addons_path(worktree.path / addon) ] @@ -329,17 +335,29 @@ def with_worktree(self, worktree: str) -> "OdoobinProcess": self._forced_worktree_name = worktree return self - @ttl_cache(ttl=1) def _get_ps_process(self) -> str | None: """Return the process currently running odoo, if any. Grep-ed `ps aux` output. """ - process = bash.execute( - f"ps aux | grep -E 'odoo-bin\\s+(-d|--database)(\\s+|=){self.database.name}(\\s+|$)' || echo -n ''" - ) + if (output := self.cache_ps_process.get("ps aux")) is not None: + return self._parse_ps_process(output) + + process = bash.execute("ps aux | grep -E 'odoo-bin\\s+(-d|--database)(\\s+|=)' || echo -n ''") if process is not None: - return process.stdout.decode() + output = process.stdout.decode() + self.cache_ps_process.set("ps aux", output) + return self._parse_ps_process(output) + + return None + + def _parse_ps_process(self, output: str) -> str | None: + """Parse the output of `ps aux` to find the process currently running odoo, if any.""" + regex = re.compile(rf"\s+(-d|--database)(\s+|=){self.database.name}(\s+|$)") + + for line in output.splitlines(): + if regex.search(line): + return line return None @@ -516,7 +534,7 @@ def deploy( except CalledProcessError as error: error_message: str = error.stderr.strip().decode().rstrip(".").replace("ERROR: ", "") logger.error(f"Odoo exited with an error: {error_message}") - return None + return CompletedProcess(error.cmd, error.returncode, error.stdout, error.stderr) else: return process @@ -527,7 +545,7 @@ def run( # noqa: PLR0913 subcommand_input: str | None = None, stream: bool = True, progress: Callable[[str], None] | None = None, - prepare: bool = True, + prepare: bool = False, ) -> CompletedProcess | None: """Run Odoo on the current database. @@ -536,14 +554,14 @@ def run( # noqa: PLR0913 :param subcommand_input: Input to pipe to the subcommand. :param stream: Whether to stream the output of the process. :param progress: Callback to call on each line outputted by the process. Ignored if `stream` is False. - :param prepare: Whether to prepare the environment before running the process. + :param prepare: Whether to prepare the environment before running. A missing venv is always prepared. :return: The return result of the process after completion. :rtype: subprocess.CompletedProcess """ if self.is_running and subcommand is None: raise OdevError("Odoo is already running on this database") - if prepare: + if prepare or not self.venv.exists: with spinner(f"Preparing odoo-bin version {str(self.version)!r} for database {self.database.name!r}"): self.prepare_odoobin() @@ -585,7 +603,7 @@ def run( # noqa: PLR0913 logger.error(f"STDERR: {error.stderr.decode()}") logger.error("Odoo exited with an error, check the output above for more information") - return None + return CompletedProcess(error.cmd, error.returncode, error.stdout, error.stderr) else: return process diff --git a/odev/common/postgres.py b/odev/common/postgres.py index 3c7afd9d..4f21f79b 100644 --- a/odev/common/postgres.py +++ b/odev/common/postgres.py @@ -158,25 +158,24 @@ def __init__(self, database: PostgresDatabase, name: str | None = None): self.name: str = name or self.name """Name of the table in which data is stored, must be set in subclass.""" - with self.database: - self.prepare_database_table() - self.database.tables[self.name] = self def prepare_database_table(self): """Prepare the table and ensures it has the correct definition and constraints applied.""" if self._columns is not None: - if not self.database.table_exists(self.name): - logger.debug(f"Creating table {self.name!r} in database {self.database!r}") - self.database.create_table(self.name, self._columns) - elif missing_columns := self.database.columns_exist(self.name, list(self._columns.keys())): + logger.debug(f"Creating table {self.name!r} in database {self.database!r}") + self.database.create_table(self.name, self._columns) + + if missing_columns := self.database.columns_exist(self.name, list(self._columns.keys())): for column in missing_columns: self.__add_missing_column(column) - if self._constraints is not None: - for name, definition in self._constraints.items(): - logger.debug(f"Creating constraint {name!r} on table {self.name!r} in database {self.database.name!r}") - self.database.constraint(self.name, name, definition) + if self._constraints is not None: + for name, definition in self._constraints.items(): + logger.debug( + f"Creating constraint {name!r} on table {self.name!r} in database {self.database.name!r}" + ) + self.database.constraint(self.name, name, definition) def clear(self): """Clear the table.""" diff --git a/odev/common/progress.py b/odev/common/progress.py index 6bcab3a1..899ea7f0 100644 --- a/odev/common/progress.py +++ b/odev/common/progress.py @@ -176,7 +176,7 @@ def spinner(message: str) -> StackedStatus: :param message: The message to display. :type message: str """ - if DEBUG_MODE: + if DEBUG_MODE or not console.is_interactive: logger.info(message) status = StackedStatus(console.render_str(message), console=console, spinner="arc") diff --git a/odev/common/python.py b/odev/common/python.py index 2f26f2ff..5ca3b42e 100644 --- a/odev/common/python.py +++ b/odev/common/python.py @@ -8,12 +8,13 @@ from functools import lru_cache from pathlib import Path from subprocess import CalledProcessError, CompletedProcess +from typing import ClassVar import virtualenv -from cachetools.func import ttl_cache from packaging.version import InvalidVersion, Version, parse as parse_version from odev.common import bash, progress, string +from odev.common.cache import TTLCache from odev.common.console import console from odev.common.errors import OdevError from odev.common.logging import logging, silence_loggers @@ -82,6 +83,9 @@ class PythonEnv: a virtual environment. """ + pip_freeze_cache: ClassVar[TTLCache] = TTLCache(ttl=60) + """Cache for pip freeze output.""" + def __init__(self, path: Path | str | None = None, version: str | None = None): """Initialize a python environment. @@ -114,7 +118,7 @@ def __str__(self) -> str: if self._global: return "Global Interpreter" - return f"{self.name} - Python {self.version}" if self.exists else "" + return f"{self.name}" if self.exists else "" def __repr__(self) -> str: return f"PythonEnv(name={self.name!r}, version={self.version})" @@ -262,6 +266,9 @@ def install_requirements(self, path: Path | str): """Install packages from a requirements.txt file. :param path: Path to the requirements.txt file or the containing directory. """ + if not any(self.missing_requirements(path, raise_if_error=False)): + return + requirements_path = self.__check_requirements_path(path) self.__pip_install_progress( options=f"-r '{requirements_path}'", @@ -348,14 +355,17 @@ def __pip_install_progress(self, options: str, message: str = "Installing packag else: logger.info("All python packages are already installed and up-to-date") - @ttl_cache(ttl=60) def __pip_freeze_all(self) -> CompletedProcess: """Run pip freeze to list all installed packages.""" + if (packages := self.pip_freeze_cache.get("pip freeze")) is not None: + return packages + packages = bash.execute(f"{self.pip} freeze --all") if packages is None: raise OdevError("Failed to run pip freeze") + self.pip_freeze_cache.set("pip freeze", packages) return packages def __package_spec(self, package: str) -> tuple[str, str]: @@ -541,7 +551,7 @@ def run_script( command = f"{self.python} {script_path} {' '.join(args)}" if script_input is not None: - command = f"{script_input} | {command}" + command = f"printf '%s' {shlex.quote(script_input)} | {command}" if not stream: return bash.execute(command) @@ -549,10 +559,17 @@ def run_script( if progress is None: return bash.run(command) - for line in bash.stream(command): - progress(line) - - return CompletedProcess(command, 0) + output = [] + returncode = 0 + try: + for line in bash.stream(command): + output.append(line) + progress(line) + except CalledProcessError as error: + returncode = error.returncode + + # Note: bash.stream mixes stdout and stderr via PTY, so we return the combined output as stdout. + return CompletedProcess(command, returncode, stdout="\n".join(output).encode()) def run(self, command: str) -> CompletedProcess | None: """Run a python command. diff --git a/odev/common/store/datastore.py b/odev/common/store/datastore.py index 83296381..33dc23ef 100644 --- a/odev/common/store/datastore.py +++ b/odev/common/store/datastore.py @@ -27,8 +27,14 @@ def __init__(self, name: str = "odev"): self.databases = DatabaseStore(self) self.history = HistoryStore(self) self.secrets = SecretStore(self) + self.__load_tables() self.__load_plugins_tables() + def __load_tables(self): + for table in self.tables.values(): + if not self.table_exists(table.name): + table.prepare_database_table() + def __load_plugins_tables(self): odev_path = Path(__file__).parents[2] plugins = [path for path in (odev_path / "plugins").glob("*/datastore") if path.is_dir()] diff --git a/odev/common/store/tables/secrets.py b/odev/common/store/tables/secrets.py index 97d4f4f0..614aa5d1 100644 --- a/odev/common/store/tables/secrets.py +++ b/odev/common/store/tables/secrets.py @@ -1,3 +1,4 @@ +import os from base64 import b64decode, b64encode from collections.abc import Sequence from dataclasses import dataclass @@ -71,7 +72,7 @@ def _list_ssh_keys(cls) -> list[AgentKey]: """List all SSH keys available in the ssh-agent.""" keys = list(SSHAgent().get_keys()) - if not keys: + if not keys and not os.environ.get("ODEV_NO_SSH_AGENT"): raise OdevError("No SSH keys found in ssh-agent, or ssh-agent is not running.") fingerprint = cls.config.security.encryption_key @@ -93,8 +94,12 @@ def encrypt(cls, plaintext: str) -> str: :rtype: str """ ciphered: str | None = None + keys = cls._list_ssh_keys() + + if not keys: + return plaintext - for key in cls._list_ssh_keys(): + for key in keys: try: ciphered = str(b64encode(ssh_encrypt(plaintext, ssh_key=key)).decode()) if plaintext else "" except SSHException as e: @@ -117,8 +122,12 @@ def decrypt(cls, ciphertext: str) -> str: :rtype: str """ deciphered: str | None = None + keys = cls._list_ssh_keys() - for key in cls._list_ssh_keys(): + if not keys: + return ciphertext + + for key in keys: key_desc = f"{key.fingerprint} ({key.name}, {key.comment})" try: @@ -200,7 +209,7 @@ def get( # noqa: PLR0913 if field == "password": value = console.secret(prompt_label) else: - with console.no_bypass_prompt(): + with console.force_bypass_prompt(): value = console.text(prompt_label, default=current_value) setattr(secret, field, value) @@ -266,7 +275,13 @@ def _get( return None logger.debug(f"Secret '{name}:{scope}:{platform}' retrieved from storage") - return Secret(name, result[0][0], SecretStore.decrypt(result[0][1]), scope, platform) + try: + password = SecretStore.decrypt(result[0][1]) + except OdevError: + logger.debug(f"Failed to decrypt secret '{name}:{scope}:{platform}', treating as missing") + return None + + return Secret(name, result[0][0], password, scope, platform) def _set(self, secret: Secret): """Save a secret to the vault. diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 00000000..e6a6d3c5 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,5 @@ +[pytest] +testpaths = tests +addopts = --durations=25 +markers = + integration: tests requiring external services or full non-mocked stacks (unused by default) diff --git a/tests/fixtures/capture.py b/tests/fixtures/capture.py index fd103633..37297bd2 100644 --- a/tests/fixtures/capture.py +++ b/tests/fixtures/capture.py @@ -74,5 +74,5 @@ def stderr(self): if self._stderr and not self._stderr.closed: self._stderr_value = self._stderr.getvalue() - self._stderr_value = re.sub(r"\x1b[^m]*m", "", self._stdout_value) + self._stderr_value = re.sub(r"\x1b[^m]*m", "", self._stderr_value) return self._stderr_value diff --git a/tests/fixtures/case.py b/tests/fixtures/case.py index 7ca8d5a8..7334ff2e 100644 --- a/tests/fixtures/case.py +++ b/tests/fixtures/case.py @@ -207,6 +207,7 @@ def __patch_odev(cls): ("upgrades_path", cls.odev.tests_path / "resources" / "upgrades"), ("setup_path", cls.odev.tests_path / "resources" / "setup"), ("scripts_path", cls.odev.tests_path / "resources" / "scripts"), + ("plugins_path", cls.run_path / "plugins"), ], ) diff --git a/tests/fixtures/odoobin_run_mock.py b/tests/fixtures/odoobin_run_mock.py new file mode 100644 index 00000000..f0d06870 --- /dev/null +++ b/tests/fixtures/odoobin_run_mock.py @@ -0,0 +1,148 @@ +"""Helpers to mock Odoo script execution and assert on argv / interpreter paths.""" + +from __future__ import annotations + +import sys +from pathlib import Path +from subprocess import CompletedProcess +from unittest.mock import patch + +from odev.common import odev as odev_module +from odev.common.databases import LocalDatabase +from odev.common.python import PythonEnv + + +TESTS_ROOT = Path(__file__).resolve().parent.parent +FAKE_ODOO_ROOT = (TESTS_ROOT / "resources" / "fake_odoo").resolve() +FAKE_ODOOBIN_PATH = FAKE_ODOO_ROOT / "odoo-bin" + + +def ensure_fake_venvs() -> None: + """Symlink bin/python under odev HOME_PATH/virtualenvs/ so versioned venvs exist for tests.""" + home_path = Path(odev_module.HOME_PATH).resolve() + real_python = Path(sys.executable).resolve() + for ver in ("18.0", "17.0", "master"): + bindir = home_path / "virtualenvs" / ver / "bin" + bindir.mkdir(parents=True, exist_ok=True) + py = bindir / "python" + if not py.exists(): + py.symlink_to(real_python) + + +def stub_minimal_odoo_pg_metadata(database: LocalDatabase, odoo_version: str) -> None: + """Create minimal ir_module_module rows so LocalDatabase.is_odoo / version work without running odoo-bin.""" + database.query( + """CREATE TABLE IF NOT EXISTS ir_module_module ( + id SERIAL PRIMARY KEY, + name VARCHAR, + latest_version VARCHAR, + state VARCHAR, + license VARCHAR + )""" + ) + database.query("DELETE FROM ir_module_module WHERE name = 'base'") + database.query( + f"INSERT INTO ir_module_module (name, latest_version, state) VALUES ('base', '{odoo_version}', 'installed')" + ) + database.query( + """CREATE TABLE IF NOT EXISTS ir_config_parameter ( + id SERIAL PRIMARY KEY, + key VARCHAR, + value TEXT + )""" + ) + database.query( + """CREATE TABLE IF NOT EXISTS res_users_log ( + id SERIAL PRIMARY KEY, + create_date TIMESTAMP + )""" + ) + + +def _stdout_for_odoo_argv(argv: list[str]) -> bytes: + """Minimal stdout so command code paths (e.g. cloc.parse) succeed without a real odoo-bin.""" + if argv and argv[0] == "cloc": + # Matches ClocCommand.parse: skip first two lines, body lines, last line is total (see re_line_details). + return b"header1\nheader2\nstub 1 1 1\n 1 1 1\n" + return b"" + + +def _recording_run_script( # noqa: PLR0913 + call_log: list, self_pyenv, script, args=None, stream=False, progress=None, script_input=None +): + script_path = Path(script).resolve() + argv = list(args or []) + call_log.append((self_pyenv.python.resolve(), script_path, argv, stream, script_input)) + cmd = f"{self_pyenv.python} {script_path} {' '.join(argv)}" + out = _stdout_for_odoo_argv(argv) + return CompletedProcess(cmd, 0, out, b"") + + +def start_run_script_recorder(call_log: list): + """Patch PythonEnv.run_script to record calls and return success without subprocess.""" + + def fake_run_script( # noqa: PLR0913 + self, script, args=None, stream=False, progress=None, script_input=None + ): + return _recording_run_script(call_log, self, script, args, stream, progress, script_input) + + p = patch.object(PythonEnv, "run_script", fake_run_script) + p.start() + return p + + +def iter_odoobin_calls(call_log: list): + """Yield recorded (python_path, script_path, argv, stream, script_input) for odoo-bin / odoo.py only.""" + for row in call_log: + name = row[1].name + if name in ("odoo-bin", "odoo.py"): + yield row + + +def assert_argv_contains(test_case, argv: list[str], fragments: list[str], msg: str = ""): + for frag in fragments: + test_case.assertIn(frag, argv, msg) + + +def assert_last_odoobin_invocation( # noqa: PLR0913 + test_case, + call_log: list, + *, + database_name: str | None = None, + argv_contains: list[str] | None = None, + subcommand: str | None = None, + interpreter_under_virtualenvs: bool = False, +): + odoos = list(iter_odoobin_calls(call_log)) + test_case.assertTrue(odoos, "expected at least one odoo-bin run_script call") + _interp, _script, argv, _stream, _inp = odoos[-1] + test_case.assertEqual(_script.resolve(), FAKE_ODOOBIN_PATH.resolve()) + interp = _interp.resolve() + sys_py = Path(sys.executable).resolve() + if interpreter_under_virtualenvs: + test_case.assertIn("virtualenvs", interp.as_posix()) + else: + test_case.assertTrue( + "virtualenvs" in interp.as_posix() or interp == sys_py, + f"unexpected interpreter {interp} (expected test venv or {sys_py})", + ) + if database_name is not None: + test_case.assertIn("--database", argv) + idx = argv.index("--database") + test_case.assertEqual(argv[idx + 1], database_name) + if subcommand is not None: + test_case.assertEqual(argv[0], subcommand) + assert_argv_contains(test_case, argv, argv_contains or []) + + +def assert_any_odoobin_invocation( + test_case, + call_log: list, + *, + predicate, +): + """Assert at least one odoo call matches predicate(argv).""" + for _i, _s, argv, _st, _in in iter_odoobin_calls(call_log): + if predicate(argv): + return + test_case.fail("no odoo-bin invocation matched predicate") diff --git a/tests/resources/fake_odoo/addons/z_stub/__init__.py b/tests/resources/fake_odoo/addons/z_stub/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/resources/fake_odoo/addons/z_stub/__manifest__.py b/tests/resources/fake_odoo/addons/z_stub/__manifest__.py new file mode 100644 index 00000000..73a401b4 --- /dev/null +++ b/tests/resources/fake_odoo/addons/z_stub/__manifest__.py @@ -0,0 +1 @@ +{"name": "z_stub", "version": "18.0.1.0"} diff --git a/tests/resources/fake_odoo/odoo-bin b/tests/resources/fake_odoo/odoo-bin new file mode 100644 index 00000000..b4bf43b8 --- /dev/null +++ b/tests/resources/fake_odoo/odoo-bin @@ -0,0 +1,2 @@ +#!/usr/bin/env python3 +# Placeholder for tests; real execution is mocked via PythonEnv.run_script. diff --git a/tests/tests/commands/test_database.py b/tests/tests/commands/test_database.py index 75fa4ca2..1606b2d7 100644 --- a/tests/tests/commands/test_database.py +++ b/tests/tests/commands/test_database.py @@ -1,11 +1,21 @@ +from typing import cast +from unittest.mock import PropertyMock, patch + from odev.common.commands.odoobin import TEMPLATE_SUFFIX as ODOO_DB_TEMPLATE_SUFFIX -from odev.common.config import Config from odev.common.connectors.git import GitConnector -from odev.common.databases import LocalDatabase +from odev.common.databases import LocalDatabase, Repository from odev.common.odoobin import OdoobinProcess from tests.fixtures import OdevCommandTestCase -from tests.fixtures.matchers import OdoobinMatch +from tests.fixtures.odoobin_run_mock import ( + FAKE_ODOO_ROOT, + assert_any_odoobin_invocation, + assert_last_odoobin_invocation, + ensure_fake_venvs, + iter_odoobin_calls, + start_run_script_recorder, + stub_minimal_odoo_pg_metadata, +) ODOO_DB_VERSION = "18.0" @@ -13,15 +23,37 @@ class TestDatabaseCommands(OdevCommandTestCase): - """Set up a test database for database-related command tests.""" + """Database-related command tests (PostgreSQL real; odoo-bin execution mocked).""" + + _odoobin_run_script_calls: list = [] @classmethod def setUpClass(cls): super().setUpClass() cls.__patch_odoobin_prep() + ensure_fake_venvs() + cls._odoobin_run_script_calls = [] + cls._patches.append(start_run_script_recorder(cls._odoobin_run_script_calls)) + + def _noop(_self): + return None + + for _name in ("prepare_odoobin", "update_worktrees"): + pr = patch.object(OdoobinProcess, _name, _noop) + cls._patches.append(pr) + pr.start() + + prp = patch.object(OdoobinProcess, "odoo_path", new_callable=PropertyMock, return_value=FAKE_ODOO_ROOT) + cls._patches.append(prp) + prp.start() + cls.database_name = cls.run_name cls.template_name = cls.database_name + ODOO_DB_TEMPLATE_SUFFIX + def setUp(self): + super().setUp() + self._odoobin_run_script_calls.clear() + @classmethod def tearDownClass(cls): for database_name in (cls.database_name, cls.template_name): @@ -41,7 +73,9 @@ def tearDownClass(cls): @classmethod def __patch_odoobin_prep(cls): """Patch methods to allow the preparation of odoo-bin in the test environment.""" - cls.odev.config.paths.repositories = Config().paths.repositories + repos = cls.run_path / "repositories" + repos.mkdir(parents=True, exist_ok=True) + cls.odev.config.paths.repositories = repos cls._patch_object(GitConnector, [("_get_clone_options", ["--depth", "1", "--no-single-branch"])]) cls._patch_object(OdoobinProcess, [], [("odoo_repositories", [GitConnector("odoo/odoo")])]) cls._patch_object(LocalDatabase, [("pg_vector", True)]) @@ -118,21 +152,8 @@ def assertDatabaseVersionEqual(self, name: str, version: str): # noqa: N802 """Assert that a database has a specific Odoo version.""" self.__assertDatabaseVersion(name, version) - def assertCalledWithOdoobin( # noqa: N802 - self, - stream_mock, - database_name: str, - args: list[str] | None = None, - subcommand: str | None = None, - ): - """Assert that a mock was called with the expected odoo-bin command. - - :param stream_mock: The mock to assert. - :param database_name: The name of the database. - :param args: Additional arguments passed to odoo-bin. - :param subcommand: The subcommand passed to odoo-bin. - """ - stream_mock.assert_called_with(OdoobinMatch(database_name, args, subcommand)) + def assertNoOdoobinRun(self): # noqa: N802 + self.assertFalse(list(iter_odoobin_calls(self._odoobin_run_script_calls))) # -------------------------------------------------------------------------- # Test cases @@ -142,32 +163,32 @@ def test_01_create_bare(self): """Command `odev create --bare` should create a new database but should not initialize it with Odoo.""" self.assertDatabaseNotExist(self.database_name) - with self.wrap("odev.common.bash", "stream") as stream: - self.dispatch_command("create", "--bare", self.database_name) - stream.assert_not_called() + self.dispatch_command("create", "--bare", self.database_name) + self.assertNoOdoobinRun() self.assertDatabaseExist(self.database_name) self.assertDatabaseIsNotOdoo(self.database_name) def test_02_create_odoo(self): - """Command `odev create` should create a new database and initialize it with Odoo.""" - with self.wrap("odev.common.bash", "stream") as stream: - stdout, _ = self.dispatch_command( - "create", - "--version", - ODOO_DB_VERSION, - self.database_name, - "--without-demo", - "all", - ) - self.assertCalledWithOdoobin( - stream, - self.database_name, - ["--without-demo", "all", "--init", "base", "--stop-after-init"], - ) + """Command `odev create` should create a new database and invoke odoo-bin with init arguments.""" + stdout, _ = self.dispatch_command( + "create", + "--version", + ODOO_DB_VERSION, + self.database_name, + "--without-demo", + "all", + ) + assert_last_odoobin_invocation( + self, + self._odoobin_run_script_calls, + database_name=self.database_name, + argv_contains=["--without-demo", "all", "--init", "base", "--stop-after-init"], + ) self.assertIn(f"Running 'odoo-bin' in version '{ODOO_DB_VERSION}' on database", stdout) self.assertDatabaseExist(self.database_name) + stub_minimal_odoo_pg_metadata(LocalDatabase(self.database_name), ODOO_DB_VERSION) self.assertDatabaseIsOdoo(self.database_name) self.assertDatabaseVersionEqual(self.database_name, ODOO_DB_VERSION) @@ -178,9 +199,8 @@ def test_03_create_new_template(self): self.assertDatabaseVersionEqual(self.database_name, ODOO_DB_VERSION) self.assertDatabaseNotExist(self.template_name) - with self.wrap("odev.common.bash", "stream") as stream: - self.dispatch_command("create", "--create-template", self.database_name) - stream.assert_not_called() + self.dispatch_command("create", "--create-template", self.database_name) + self.assertNoOdoobinRun() self.assertDatabaseExist(self.template_name) self.assertDatabaseIsOdoo(self.template_name) @@ -197,14 +217,13 @@ def test_04_create_from_template(self): self.assertDatabaseNotExist(self.database_name) - with self.wrap("odev.common.bash", "stream") as stream: - self.dispatch_command( - "create", - "--from-template", - self.template_name, - self.database_name, - ) - stream.assert_not_called() + self.dispatch_command( + "create", + "--from-template", + self.template_name, + self.database_name, + ) + self.assertNoOdoobinRun() self.assertDatabaseExist(self.database_name) self.assertDatabaseIsOdoo(self.database_name) @@ -223,23 +242,26 @@ def test_05_create_from_template_no_value(self): self.assertDatabaseNotExist(self.database_name) - with self.wrap("odev.common.bash", "stream") as stream: - self.dispatch_command("create", "--from-template", "", self.database_name) - stream.assert_not_called() + self.dispatch_command("create", "--from-template", "", self.database_name) + self.assertNoOdoobinRun() self.assertDatabaseExist(self.database_name) self.assertDatabaseIsOdoo(self.database_name) self.assertDatabaseVersionEqual(self.database_name, ODOO_DB_VERSION) def test_06_run(self): - """Command `odev run` should run Odoo in a database.""" + """Command `odev run` should invoke odoo-bin for a database.""" self.assertDatabaseExist(self.database_name) self.assertDatabaseIsOdoo(self.database_name) self.assertDatabaseVersionEqual(self.database_name, ODOO_DB_VERSION) - with self.wrap("odev.common.bash", "stream") as stream: - stdout, _ = self.dispatch_command("run", self.database_name, "--stop-after-init") - self.assertCalledWithOdoobin(stream, self.database_name, ["--stop-after-init"]) + stdout, _ = self.dispatch_command("run", self.database_name, "--stop-after-init") + assert_last_odoobin_invocation( + self, + self._odoobin_run_script_calls, + database_name=self.database_name, + argv_contains=["--stop-after-init"], + ) self.assertIn(f"Running 'odoo-bin' in version '{ODOO_DB_VERSION}' on database '{self.database_name}'", stdout) @@ -255,15 +277,19 @@ def test_07_run_from_template(self): database.query("CREATE TABLE test_table (id SERIAL PRIMARY KEY);") self.assertTrue(database.table_exists("test_table")) - with self.wrap("odev.common.bash", "stream") as stream: - stdout, _ = self.dispatch_command( - "run", - "--from-template", - self.template_name, - self.database_name, - "--stop-after-init", - ) - self.assertCalledWithOdoobin(stream, self.database_name, ["--stop-after-init"]) + stdout, _ = self.dispatch_command( + "run", + "--from-template", + self.template_name, + self.database_name, + "--stop-after-init", + ) + assert_last_odoobin_invocation( + self, + self._odoobin_run_script_calls, + database_name=self.database_name, + argv_contains=["--stop-after-init"], + ) self.assertDatabaseExist(self.database_name) self.assertDatabaseIsOdoo(self.database_name) @@ -287,9 +313,13 @@ def test_08_run_from_template_no_value(self): database.query("CREATE TABLE test_table (id SERIAL PRIMARY KEY);") self.assertTrue(database.table_exists("test_table")) - with self.wrap("odev.common.bash", "stream") as stream: - stdout, _ = self.dispatch_command("run", "--from-template", "", self.database_name, "--stop-after-init") - self.assertCalledWithOdoobin(stream, self.database_name, ["--stop-after-init"]) + stdout, _ = self.dispatch_command("run", "--from-template", "", self.database_name, "--stop-after-init") + assert_last_odoobin_invocation( + self, + self._odoobin_run_script_calls, + database_name=self.database_name, + argv_contains=["--stop-after-init"], + ) self.assertDatabaseExist(self.database_name) self.assertDatabaseIsOdoo(self.database_name) @@ -310,15 +340,14 @@ def test_09_run_from_invalid_template(self): database.query("CREATE TABLE test_table (id SERIAL PRIMARY KEY);") self.assertTrue(database.table_exists("test_table")) - with self.wrap("odev.common.bash", "stream") as stream: - stdout, _ = self.dispatch_command( - "run", - "--from-template", - invalid_name, - self.database_name, - "--stop-after-init", - ) - stream.assert_not_called() + stdout, _ = self.dispatch_command( + "run", + "--from-template", + invalid_name, + self.database_name, + "--stop-after-init", + ) + self.assertNoOdoobinRun() self.assertDatabaseExist(self.database_name) self.assertDatabaseIsOdoo(self.database_name) @@ -329,18 +358,17 @@ def test_09_run_from_invalid_template(self): self.assertTrue(database.table_exists("test_table")) def test_10_run_tests(self): - """Command `odev test` should run tests on a database.""" + """Command `odev test` should invoke odoo-bin with test arguments on a scratch database.""" self.assertDatabaseExist(self.database_name) self.assertDatabaseIsOdoo(self.database_name) self.assertDatabaseVersionEqual(self.database_name, ODOO_DB_VERSION) - with self.wrap("odev.common.bash", "stream") as stream: - stdout, _ = self.dispatch_command("test", "--tags", ":TestSafeEval.test_expr", self.database_name) - self.assertCalledWithOdoobin( - stream, - self.database_name, - ["--stop-after-init", "--test-enable", "--test-tags", ":TestSafeEval.test_expr", "--init", "base"], - ) + stdout, _ = self.dispatch_command("test", "--tags", ":TestSafeEval.test_expr", self.database_name) + + def _is_test_run(argv: list[str]) -> bool: + return "--test-enable" in argv and ":TestSafeEval.test_expr" in argv + + assert_any_odoobin_invocation(self, self._odoobin_run_script_calls, predicate=_is_test_run) self.assertRegex(stdout, rf"Created database '{self.database_name}-[a-z0-9]{{8}}'") self.assertRegex(stdout, rf"Dropped database '{self.database_name}-[a-z0-9]{{8}}'") @@ -348,21 +376,25 @@ def test_10_run_tests(self): self.assertIn("No failing tests", stdout) def test_11_cloc(self): - """Command `odev cloc` should print line of codes count for modules installed in a database.""" + """Command `odev cloc` should invoke odoo-bin cloc for a database.""" self.assertDatabaseExist(self.database_name) self.assertDatabaseIsOdoo(self.database_name) self.assertDatabaseVersionEqual(self.database_name, ODOO_DB_VERSION) stdout, _ = self.dispatch_command("cloc", self.database_name) + assert_last_odoobin_invocation( + self, + self._odoobin_run_script_calls, + database_name=self.database_name, + subcommand="cloc", + ) self.assertIn( f"Running 'odoo-bin cloc' in version '{ODOO_DB_VERSION}' on database '{self.database_name}'", stdout, ) def test_12_run_with_addons_path(self): - """Command `odev run` should run Odoo in a database, recursively detect additional addons paths - and store the value of the repository for future usage. - """ + """Command `odev run` should pass detected addons paths and persist the repository on the database.""" self.assertDatabaseExist(self.database_name) self.assertDatabaseIsOdoo(self.database_name) database = LocalDatabase(self.database_name) @@ -382,27 +414,136 @@ def test_12_run_with_addons_path(self): ): stdout, _ = self.dispatch_command("run", self.database_name, "--stop-after-init") + assert_last_odoobin_invocation( + self, + self._odoobin_run_script_calls, + database_name=self.database_name, + argv_contains=["--stop-after-init"], + ) + _interp, _script, argv, _st, _inp = list(iter_odoobin_calls(self._odoobin_run_script_calls))[-1] + joined = " ".join(argv) + self.assertIn(addons_path_end, joined) + self.assertGreaterEqual(joined.count(addons_path_end), 2) + self.assertIn(f"Running 'odoo-bin' in version '{ODOO_DB_VERSION}' on database '{self.database_name}'", stdout) - self.assertRegex(stdout, rf"--addons-path [^\s]+?{addons_path_end},[^\s]+?{addons_path_end}/submodule") - self.assertEqual(database.repository.full_name, addon) + repository = cast(Repository, database.repository) + self.assertEqual(repository.full_name, addon) def test_13_run_with_version(self): - """Command `odev run` should run Odoo in a database with a specific version.""" + """Command `odev run` should invoke odoo-bin using the requested Odoo version.""" self.assertDatabaseExist(self.database_name) self.assertDatabaseIsOdoo(self.database_name) self.assertDatabaseVersionEqual(self.database_name, ODOO_DB_VERSION) version = "17.0" - with self.wrap("odev.common.bash", "stream") as stream: + with self.patch(OdoobinProcess, "_get_python_version", return_value=None): stdout, _ = self.dispatch_command("run", "--version", version, self.database_name, "--stop-after-init") - self.assertCalledWithOdoobin(stream, self.database_name, ["--stop-after-init"]) + + assert_last_odoobin_invocation( + self, + self._odoobin_run_script_calls, + database_name=self.database_name, + argv_contains=["--stop-after-init"], + ) self.assertIn(f"Running 'odoo-bin' in version '{version}' on database '{self.database_name}'", stdout) self.assertDatabaseExist(self.database_name) self.assertDatabaseIsOdoo(self.database_name) self.assertDatabaseVersionEqual(self.database_name, ODOO_DB_VERSION) + def test_14_run_empty_db_warning(self): + """Command `odev run` should warn the user when running on an empty database without a version.""" + empty_db = f"{self.database_name}-empty" + LocalDatabase(empty_db).create() + try: + with ( + self.patch("odev.common.bash", "stream", return_value=iter([])), + self.patch("odev.common.bash", "run", return_value=None), + ): + # odoo-bin would fail on an empty DB, but we've mocked bash to avoid the error + stdout, stderr = self.dispatch_command("run", empty_db, "--stop-after-init") + + # Logging goes to stdout in these tests + combined_output = stdout + stderr + self.assertIn(f"Database {empty_db!r} is not an Odoo database. Defaulting to 'master'.", combined_output) + self.assertIn( + f"Consider using 'odev create -V {empty_db}' to initialize it properly.", combined_output + ) + finally: + LocalDatabase(empty_db).drop() + + def test_15_test_non_existent_db(self): + """Command `odev test` should work even if the target database does not exist, provided a version is given.""" + non_existent_db = f"{self.database_name}-non-existent" + self.assertDatabaseNotExist(non_existent_db) + + # Mock run_command to avoid actually running 'create' or 'test' + with ( + self.patch("odev.common.bash", "stream", return_value=iter([])), + self.patch("odev.common.odev.Odev", "run_command"), + ): + # This should not raise SystemExit or any exception + self.dispatch_command("test", "-V", ODOO_DB_VERSION, "--tags", ":base", non_existent_db) + + def test_16_info(self): + """Command `odev info` should print details about a local Odoo database.""" + stdout, _ = self.dispatch_command("info", self.database_name) + self.assertIn("Database Information", stdout) + self.assertIn("Local Process", stdout) + + def test_17_neutralize(self): + """Command `odev neutralize` should neutralize the target database.""" + with self.patch(LocalDatabase, "neutralize") as neutralize: + stdout, _ = self.dispatch_command("neutralize", self.database_name) + + neutralize.assert_called_once_with() + self.assertIn("has been neutralized", stdout) + + def test_16_dump(self): + """Command `odev dump` should report where the dump was saved.""" + dump_file = self.run_path / f"{self.database_name}.zip" + with self.patch(LocalDatabase, "dump", return_value=dump_file): + stdout, _ = self.dispatch_command("dump", self.database_name, "--filestore") + + self.assertIn(f"dumped to {dump_file}", stdout) + + # -------------------------------------------------------------------------- + # Test cases - additional database commands (error / guard paths) + # -------------------------------------------------------------------------- + + def test_90_restore_invalid_dump_file(self): + """`odev restore` should reject a missing backup path without touching PostgreSQL.""" + missing = self.run_path / "does-not-exist.zip" + stdout, stderr = self.dispatch_command("restore", self.database_name, str(missing)) + self.assertIn("Invalid dump file", stdout + stderr) + + def test_91_kill_not_running(self): + """`odev kill` should fail when the database process is not running.""" + _, stderr = self.dispatch_command("kill", self.database_name) + self.assertIn("is not running", stderr) + + def test_92_deploy_requires_running_database(self): + """`odev deploy` should refuse when the local database is not running.""" + module_root = self.run_path / "fake_module" + module_root.mkdir(parents=True, exist_ok=True) + (module_root / "__manifest__.py").write_text("{'name': 'fake', 'version': '1.0'}", encoding="utf-8") + stdout, stderr = self.dispatch_command("deploy", self.database_name, str(module_root)) + self.assertIn("must be running", stdout + stderr) + + def test_93_standardize_requires_odoo_database(self): + """`odev standardize` should reject a non-Odoo (e.g. bare) database.""" + bare_name = f"{self.run_name}-bare-std" + try: + self.dispatch_command("create", "--bare", bare_name) + self.assertDatabaseExist(bare_name) + _, stderr = self.dispatch_command("standardize", bare_name) + self.assertIn("must be an Odoo database", stderr) + finally: + db = LocalDatabase(bare_name) + if db.exists: + db.drop() + # -------------------------------------------------------------------------- # Test cases - delete # Keep at the end to avoid interference with other tests and to cleanup @@ -428,7 +569,12 @@ def test_99_delete_expression(self): self.assertDatabaseExist(self.database_name) with self.patch(self.odev.console, "confirm", return_value=True): - stdout, _ = self.dispatch_command("delete", "--expression", "^odev-test-[a-z0-9]{8}") + stdout, _ = self.dispatch_command( + "delete", + "--expression", + "^odev-test-[a-z0-9]{8}", + "--include-whitelisted", + ) self.assertDatabaseNotExist(self.database_name) self.assertDatabaseNotExist(self.template_name) diff --git a/tests/tests/commands/test_git_and_scripts.py b/tests/tests/commands/test_git_and_scripts.py new file mode 100644 index 00000000..25b3864a --- /dev/null +++ b/tests/tests/commands/test_git_and_scripts.py @@ -0,0 +1,46 @@ +from argparse import Namespace + +from odev.commands.scripts.assets import PathfinderCommand as AssetsCommand +from odev.commands.scripts.pathfinder import PathfinderCommand +from odev.common.connectors.git import GitConnector + +from tests.fixtures import OdevCommandTestCase, OdevTestCase + + +class TestGitCommands(OdevCommandTestCase): + def test_01_clone_requires_target(self): + _, stderr = self.dispatch_command("clone") + self.assertIn("You must specify a database or repository to clone", stderr) + + def test_02_fetch_missing_worktree(self): + with self.patch(GitConnector, "fetch", return_value=None): + _, stderr = self.dispatch_command("fetch", "--worktree", "missing") + self.assertIn("does not exist", stderr) + + def test_03_pull_missing_worktree(self): + with self.patch(GitConnector, "fetch", return_value=None): + _, stderr = self.dispatch_command("pull", "--worktree", "missing") + self.assertIn("does not exist", stderr) + + def test_04_worktree_name_required_for_create(self): + _, stderr = self.dispatch_command("worktree", "--create") + self.assertIn("provide a name for the worktree", stderr) + + +class TestScriptCommands(OdevTestCase): + def test_01_assets_script_run_after(self): + command = AssetsCommand.__new__(AssetsCommand) + self.assertEqual(command.script_run_after, "regenerate_assets(env)") + + def test_02_pathfinder_formats_output(self): + command = PathfinderCommand.__new__(PathfinderCommand) + command.args = Namespace(origin="res.partner", destination="sale.order") + command._framework = self.odev + table_calls = [] + command.table = lambda headers, rows, title=None: table_calls.append((headers, rows, title)) + + with self.patch(self.odev.console, "clear_line"): + command.run_script_handle_result("[[('res.partner','partner','many2one'),('sale.order','order','')]]") + + self.assertEqual(len(table_calls), 1) + self.assertIn("partner.order", table_calls[0][2]) diff --git a/tests/tests/commands/test_utilities.py b/tests/tests/commands/test_utilities.py index d1ac9a68..7974c2a7 100644 --- a/tests/tests/commands/test_utilities.py +++ b/tests/tests/commands/test_utilities.py @@ -11,15 +11,13 @@ GIT_PATH = "odev.common.connectors.git.GitConnector" -class TestCommandUtilitiesVersion(OdevCommandTestCase): - def test_01_no_argument(self): +class TestCommandUtilities(OdevCommandTestCase): + def test_version_01_no_argument(self): """Command `odev version` should print the version of the application.""" stdout, _ = self.dispatch_command("version") self.assertIn(f"Odev-test version {__version__}", stdout) - -class TestCommandUtilitiesConfig(OdevCommandTestCase): - def test_01_no_argument(self): + def test_config_01_no_argument(self): """Run the command without arguments.""" stdout, _ = self.dispatch_command("config") self.assertIn(" mode ", stdout) @@ -29,7 +27,7 @@ def test_01_no_argument(self): self.assertIn(" dumps ", stdout) self.assertIn(" repositories ", stdout) - def test_02_print_section(self): + def test_config_02_print_section(self): """Run the command with a specific section and no key.""" stdout, _ = self.dispatch_command("config", "paths") self.assertNotIn(" mode ", stdout) @@ -39,7 +37,7 @@ def test_02_print_section(self): self.assertIn(" dumps ", stdout) self.assertIn(" repositories ", stdout) - def test_03_print_key(self): + def test_config_03_print_key(self): """Run the command with a specific section and key.""" stdout, _ = self.dispatch_command("config", "update.mode") self.assertIn(" mode ", stdout) @@ -50,41 +48,39 @@ def test_03_print_key(self): self.assertNotIn(" repositories ", stdout) self.assertRegex(stdout, rf"mode\s+{self.odev.config.update.mode}") - def test_04_invalid_key(self): + def test_config_04_invalid_key(self): """Run the command with an invalid section and key combination.""" _, stderr = self.dispatch_command("config", "invalid.test") self.assertIn(f"'invalid' is not a valid section in config {self.odev.config.name!r}", stderr) - def test_05_set_value(self): + def test_config_05_set_value(self): """Run the command with a section and key combination, set a value.""" new_update_mode = "always" stdout, _ = self.dispatch_command("config", "update.mode", new_update_mode) self.assertIn(" mode ", stdout) self.assertEqual(self.odev.config.update.mode, new_update_mode) - def test_06_set_invalid_value(self): + def test_config_06_set_invalid_value(self): """Run the command with a section and no key.""" _, stderr = self.dispatch_command("config", "update", "invalid") self.assertIn("You must specify a key to set a value", stderr) - -class TestCommandUtilitiesHelp(OdevCommandTestCase): - def test_01_no_argument(self): + def test_help_01_no_argument(self): """Run the command without arguments, display all available commands.""" stdout, _ = self.dispatch_command("help") self.assertIn("The following commands are provided:", stdout) - def test_02_command(self): + def test_help_02_command(self): """Run the command with a command argument, display detailed help for a specific command.""" stdout, _ = self.dispatch_command("help", "version") self.assertIn(self.odev.commands["version"]._help, stdout) - def test_03_invalid_command(self): + def test_help_03_invalid_command(self): """Run the command with an invalid command argument, print an error message.""" _, stderr = self.dispatch_command("help", "invalid") self.assertIn("Cannot display help for inexistent command 'invalid'", stderr) - def test_04_names_only(self): + def test_help_04_names_only(self): """Run the command with the `--names-only` flag, display only the names of the available commands.""" stdout, _ = self.dispatch_command("help", "--names-only") self.assertNotIn("The following commands are provided:", stdout) @@ -92,9 +88,7 @@ def test_04_names_only(self): for command in {c._name for c in self.odev.commands.values()}: self.assertIn(f"{command}\n", stdout) - -class TestCommandUtilitiesHistory(OdevCommandTestCase): - def test_01_clear(self): + def test_history_01_clear(self): """Run the command with the `--clear` flag.""" with self.patch(self.odev.store.history, "clear") as patched_clear: stdout, _ = self.dispatch_command("history", "--clear") @@ -102,32 +96,30 @@ def test_01_clear(self): self.assertIn("Clearing history", stdout) patched_clear.assert_called_once_with() - def test_02_no_argument(self): + def test_history_02_no_argument(self): """Run the command without arguments, print th history of commands without filter.""" stdout, _ = self.dispatch_command("history") self.assertRegex(stdout, r"ID\s+Command\s+Date") self.assertGreater(len(stdout.splitlines()), 4, "there should be at least one line in the history") - def test_03_filter_command(self): + def test_history_03_filter_command(self): """Run the command with a command argument, print the history of a specific command.""" stdout, _ = self.dispatch_command("history", "--command", "history") self.assertIn(" history --clear ", stdout) - def test_04_no_history(self): + def test_history_04_no_history(self): """Run the command when there is no history, print an error message.""" self.odev.store.history.clear() _, stderr = self.dispatch_command("history") self.assertIn("No history available for all commands", stderr) - -class TestCommandUtilitiesList(OdevCommandTestCase): - def test_01_list_all(self): + def test_list_01_list_all(self): """Run the command, list all existing databases.""" stdout, _ = self.dispatch_command("list", "--all") self.assertRegex(stdout, r"^Listing databases") self.assertGreater(len(stdout.splitlines()), 7, "there should be at least 1 lines in the list") - def test_02_names_only(self): + def test_list_02_names_only(self): """Run the command with the `--names-only` flag, display only the names of the databases.""" with self.patch(POSTGRES_PATH, "query", [("test1",), ("test2",)]): stdout, _ = self.dispatch_command("list", "--all", "--names-only") @@ -135,14 +127,14 @@ def test_02_names_only(self): self.assertRegex(stdout, r"^Listing databases") self.assertIn("test1\ntest2\n", stdout) - def test_03_no_result(self): + def test_list_03_no_result(self): """Run the command when there are no databases, print an error message.""" with self.patch(POSTGRES_PATH, "query", []): _, stderr = self.dispatch_command("list") self.assertIn("No database found", stderr) - def test_04_expression(self): + def test_list_04_expression(self): """Run the command with the `--expression` flag, filter the databases.""" with self.patch(POSTGRES_PATH, "query", [("test1",), ("test2",)]): stdout, _ = self.dispatch_command("list", "--expression", "test1", "--names-only", "--all") @@ -150,28 +142,24 @@ def test_04_expression(self): self.assertIn("test1\n", stdout) self.assertNotIn("test2\n", stdout) - def test_05_expression_no_result(self): + def test_list_05_expression_no_result(self): """Run the command with the `--expression` flag, filter the databases, display an error if no result.""" with self.patch(POSTGRES_PATH, "query", [("test1",), ("test2",)]): _, stderr = self.dispatch_command("list", "--expression", "test3") self.assertIn("No database found matching pattern 'test3'", stderr) - -class TestCommandUtilitiesSetup(OdevCommandTestCase): - def test_01_no_argument(self): + def test_setup_cmd_01_no_argument(self): """Run the command without arguments, run all scripts.""" stdout, _ = self.dispatch_command("setup") self.assertEqual("Hello, odev from script 1!\nHello, odev from script 2!\n", stdout) - def test_02_script(self): + def test_setup_cmd_02_script(self): """Run the command with an argument, run only the selected script.""" stdout, _ = self.dispatch_command("setup", "setup_script_1") self.assertEqual("Hello, odev from script 1!\n", stdout) - -class TestCommandUtilitiesUpdate(OdevCommandTestCase): - def test_01_no_argument(self): + def test_update_01_no_argument(self): """Run the command without arguments, update the application.""" self.odev.config.update.date = "1995-12-21 00:00:00" self.odev.config.update.interval = 1 @@ -188,39 +176,7 @@ def upgrade(): self.assertIn("Current version: 3.0.0", stdout) self.assertIn(f"Updated to {__version__}!", stdout) - -class TestCommandUtilitiesVenv(OdevCommandTestCase): - @classmethod - def setUpClass(cls): - super().setUpClass() - cls.venv = PythonEnv(cls.odev.venvs_path / "test") - cls.venv.create() - - @classmethod - def tearDownClass(cls): - shutil.rmtree(cls.run_path.as_posix(), ignore_errors=True) - super().tearDownClass() - - def test_01_invalid_name(self): - """Run the command with an invalid virtual environment name.""" - _, stderr = self.dispatch_command("venv", "invalid", "--command", "print('test')") - self.assertRegex(stderr, r"Virtual environment 'invalid' does not exist") - - def test_02_python_command(self): - """Run the command with a name valid argument and a python command.""" - stdout, _ = self.dispatch_command("venv", self.venv.path.as_posix(), "--command", "print('test')") - self.assertRegex( - stdout, r"python[\d.?]*\s-c\s[\\'\"]+print[\\'\"\(]+test[\\'\"\)]+ in virtual environment \'test\'" - ) - - def test_03_pip_command(self): - """Run the command with a pip command.""" - stdout, _ = self.dispatch_command("venv", self.venv.path.as_posix(), "--command", "pip --version") - self.assertRegex(stdout, r"python[\d.?]*\s-m\spip\s--version\' in virtual environment \'test\'") - - -class TestCommandUtilitiesPlugin(OdevCommandTestCase): - def test_01_single(self): + def test_plugin_01_single(self): """Run the command to enable or disable a plugin.""" plugin = "test/test-plugin" plugin_link = Path(self.odev.plugins_path) / "test_plugin" @@ -255,14 +211,14 @@ def test_01_single(self): self.assertIn(f"Plugin '{plugin}' is not installed", stdout) self.assertFalse(self.odev._plugin_is_installed(plugin)) - def test_02_enable_invalid(self): + def test_plugin_02_enable_invalid(self): """Run the command with an invalid plugin to enable.""" plugin = "odoo-odev/invalid" _, stderr = self.dispatch_command("plugin", "--enable", plugin) self.assertIn(f"Failed to clone repository '{plugin}'", stderr) self.assertFalse(self.odev._plugin_is_installed(plugin)) - def test_03_enable_dependencies(self): + def test_plugin_03_enable_dependencies(self): """Run the command to enable a plugin with dependencies.""" plugin = "test/test-plugin" dependent = "test/test-plugin-dep" @@ -283,3 +239,33 @@ def test_03_enable_dependencies(self): self.assertIn(f"Uninstalling plugin {plugin!r} will also uninstall the following dependent plugins", stdout) self.assertFalse(self.odev._plugin_is_installed(plugin)) self.assertFalse(self.odev._plugin_is_installed(dependent)) + + +class TestCommandUtilitiesVenv(OdevCommandTestCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.venv = PythonEnv(cls.odev.venvs_path / "test") + cls.venv.create() + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.run_path.as_posix(), ignore_errors=True) + super().tearDownClass() + + def test_01_invalid_name(self): + """Run the command with an invalid virtual environment name.""" + _, stderr = self.dispatch_command("venv", "invalid", "--command", "print('test')") + self.assertRegex(stderr, r"Virtual environment 'invalid' does not exist") + + def test_02_python_command(self): + """Run the command with a name valid argument and a python command.""" + stdout, _ = self.dispatch_command("venv", self.venv.path.as_posix(), "--command", "print('test')") + self.assertRegex( + stdout, r"python[\d.?]*\s-c\s[\\'\"]+print[\\'\"\(]+test[\\'\"\)]+ in virtual environment \'test\'" + ) + + def test_03_pip_command(self): + """Run the command with a pip command.""" + stdout, _ = self.dispatch_command("venv", self.venv.path.as_posix(), "--command", "pip --version") + self.assertRegex(stdout, r"python[\d.?]*\s-m\spip\s--version\' in virtual environment \'test\'") diff --git a/tests/tests/common/test_arguments.py b/tests/tests/common/test_arguments.py new file mode 100644 index 00000000..68c8a8df --- /dev/null +++ b/tests/tests/common/test_arguments.py @@ -0,0 +1,104 @@ +import re +from argparse import ArgumentParser, Namespace +from pathlib import Path + +from odev.common import args as arg_defs +from odev.common.actions import ACTIONS_MAPPING, IntAction, ListAction + +from tests.fixtures import OdevTestCase + + +class TestArgumentToDict(OdevTestCase): + def test_string_argument_to_dict(self): + spec = arg_defs.String(name="label", aliases=["-l"], description="A label", default="x") + d = spec.to_dict("ignored") + self.assertEqual(d["name"], "label") + self.assertEqual(d["action"], "store") + self.assertEqual(d["default"], "x") + self.assertIn("-l", d["aliases"]) + + def test_integer_argument_to_dict(self): + spec = arg_defs.Integer(default=3) + d = spec.to_dict("count") + self.assertEqual(d["name"], "count") + self.assertEqual(d["action"], "store_int") + + def test_flag_default_false_uses_store_true(self): + spec = arg_defs.Flag(default=False) + d = spec.to_dict("verbose") + self.assertEqual(d["action"], "store_true") + + def test_flag_default_true_uses_store_false(self): + spec = arg_defs.Flag(default=True) + d = spec.to_dict("quiet") + self.assertEqual(d["action"], "store_false") + + def test_list_argument_to_dict(self): + spec = arg_defs.List(default=["a"]) + d = spec.to_dict("items") + self.assertEqual(d["action"], "store_list") + + def test_path_argument_to_dict(self): + spec = arg_defs.Path() + d = spec.to_dict("out") + self.assertEqual(d["action"], "store_path") + + def test_regex_argument_to_dict(self): + spec = arg_defs.Regex() + d = spec.to_dict("pattern") + self.assertEqual(d["action"], "store_regex") + + def test_eval_argument_to_dict(self): + spec = arg_defs.Eval() + d = spec.to_dict("literal") + self.assertEqual(d["action"], "store_eval") + + +class TestArgparseCustomActions(OdevTestCase): + def test_store_int_action(self): + parser = ArgumentParser() + parser.add_argument("--n", action=ACTIONS_MAPPING["store_int"]) + ns = parser.parse_args(["--n", "42"]) + self.assertEqual(ns.n, 42) + + def test_store_list_action(self): + parser = ArgumentParser() + parser.add_argument("--tags", action=ACTIONS_MAPPING["store_list"]) + ns = parser.parse_args(["--tags", "a,b,c"]) + self.assertEqual(ns.tags, ["a", "b", "c"]) + + def test_store_path_action(self): + parser = ArgumentParser() + parser.add_argument("--p", action=ACTIONS_MAPPING["store_path"]) + ns = parser.parse_args(["--p", str(self.run_path)]) + self.assertIsInstance(ns.p, Path) + self.assertTrue(ns.p.is_absolute()) + + def test_store_regex_action(self): + parser = ArgumentParser() + parser.add_argument("--re", action=ACTIONS_MAPPING["store_regex"]) + ns = parser.parse_args(["--re", "^foo$"]) + self.assertIsInstance(ns.re, re.Pattern) + self.assertTrue(ns.re.match("foo")) + + def test_store_eval_action(self): + parser = ArgumentParser() + parser.add_argument("--e", action=ACTIONS_MAPPING["store_eval"]) + ns = parser.parse_args(["--e", "{'k': 1}"]) + self.assertEqual(ns.e, {"k": 1}) + + def test_int_action_invalid_value_raises(self): + action = IntAction(option_strings=("--x",), dest="x") + with self.assertRaises(ValueError): + action(None, Namespace(), "not-an-int", "--x") + + def test_list_action_wraps_invalid_transform(self): + action = ListAction(option_strings=("--x",), dest="x") + + def bad_transform_one(value): + raise ValueError("nope") + + action._transform_one = bad_transform_one # type: ignore[method-assign] + with self.assertRaises(ValueError) as ctx: + action(None, Namespace(), "a,b", "--x") + self.assertIn("Invalid value(s) for x", str(ctx.exception)) diff --git a/tests/tests/common/test_connectors.py b/tests/tests/common/test_connectors.py new file mode 100644 index 00000000..c9a3d5d4 --- /dev/null +++ b/tests/tests/common/test_connectors.py @@ -0,0 +1,108 @@ +from typing import cast + +from requests import Session +from requests.exceptions import ConnectionError as RequestsConnectionError + +from odev.common.connectors.postgres import Cursor, PostgresConnector +from odev.common.connectors.rest import RestConnector + +from tests.fixtures import OdevTestCase + + +class DummyResponse: + def __init__(self, status_code: int = 200): + self.status_code = status_code + self.reason = "OK" + self.elapsed = type("Elapsed", (), {"total_seconds": lambda self: 0.01})() + + def raise_for_status(self): + return None + + +class DummySession: + def __init__(self): + self.headers = {"User-Agent": ""} + self.cookies = type( + "Cookies", + (), + { + "set": lambda *args, **kwargs: None, + "get": lambda *args, **kwargs: None, + "clear": lambda *args, **kwargs: None, + }, + )() + self.calls = [] + + def request(self, method, url, params=None, **kwargs): + self.calls.append((method, url, params, kwargs)) + return DummyResponse() + + def close(self): + return None + + +class DummyRestConnector(RestConnector): + @property + def exists(self) -> bool: + return True + + def request(self, method, path, authenticate=True, params=None, **kwargs): + return self._request(method, path, params=params, **kwargs) + + +class TestConnectors(OdevTestCase): + def test_01_rest_request_sets_default_timeout(self): + connector = DummyRestConnector("https://example.com") + session = DummySession() + connector._connection = cast(Session, session) + + connector._request("GET", "/health") + + _, _, _, kwargs = session.calls[-1] + self.assertEqual(kwargs["timeout"], 30.0) + + def test_02_rest_nocache_restores_state_on_error(self): + connector = DummyRestConnector("https://example.com") + + with self.assertRaises(RuntimeError), connector.nocache(): + raise RuntimeError("boom") + + self.assertFalse(RestConnector._bypass_cache) + + def test_03_rest_request_retries_connection_error_once(self): + connector = DummyRestConnector("https://example.com") + session = DummySession() + + def fail_then_succeed(*args, **kwargs): + if len(session.calls) == 0: + session.calls.append(("ERR", "", None, {})) + raise RequestsConnectionError("temporary") + return DummyResponse() + + session.request = fail_then_succeed + connector._connection = cast(Session, session) + response = connector._request("GET", "/ok") + self.assertEqual(response.status_code, 200) + + def test_04_postgres_nocache_restores_state_on_error(self): + connector = PostgresConnector("postgres") + + with self.assertRaises(RuntimeError), connector.nocache(): + raise RuntimeError("boom") + + self.assertFalse(PostgresConnector._nocache) + + def test_05_cursor_transaction_does_not_commit_on_error(self): + class DummyCursor: + def __init__(self): + self.calls = [] + + def execute(self, statement): + self.calls.append(statement) + + cursor = DummyCursor() + + with self.assertRaises(RuntimeError), Cursor.transaction(cursor): # type: ignore[arg-type] + raise RuntimeError("boom") + + self.assertEqual(cursor.calls, ["BEGIN", "ROLLBACK"]) diff --git a/tests/tests/common/test_git_connector.py b/tests/tests/common/test_git_connector.py new file mode 100644 index 00000000..f91e7b9f --- /dev/null +++ b/tests/tests/common/test_git_connector.py @@ -0,0 +1,27 @@ +from odev.common.connectors.git import GitConnector +from odev.common.errors import ConnectorError + +from tests.fixtures import OdevTestCase + + +class TestGitConnectorInit(OdevTestCase): + def test_https_github_url(self): + g = GitConnector("https://github.com/acme/myrepo") + self.assertEqual(g.name, "acme/myrepo") + + def test_git_ssh_url(self): + g = GitConnector("git@github.com:acme/myrepo.git") + self.assertEqual(g.name, "acme/myrepo") + + def test_org_slash_repo(self): + g = GitConnector("acme/myrepo") + self.assertEqual(g.name, "acme/myrepo") + + def test_ssh_prefix_stripped_before_parse(self): + g = GitConnector("git@github.com:org/repo") + self.assertEqual(g.name, "org/repo") + + def test_invalid_repo_format_raises(self): + with self.assertRaises(ConnectorError) as ctx: + GitConnector("onlyonepart") + self.assertIn("Invalid repository format", str(ctx.exception)) diff --git a/tests/tests/common/test_main.py b/tests/tests/common/test_main.py new file mode 100644 index 00000000..c093555f --- /dev/null +++ b/tests/tests/common/test_main.py @@ -0,0 +1,85 @@ +from bdb import BdbQuit +from signal import SIGINT, SIGTERM +from unittest.mock import MagicMock + +from odev import __main__ as main_module +from odev.common.errors.odev import OdevError + +from tests.fixtures import OdevTestCase + + +class TestMainEntrypoint(OdevTestCase): + def test_01_main_success(self): + with ( + self.patch("odev.__main__.os", "geteuid", return_value=1000), + self.patch("odev.__main__", "signal") as mock_signal, + self.patch("odev.common", "init_framework", return_value=self.odev) as mock_init_framework, + self.patch(self.odev, "dispatch", return_value=True), + self.assertRaises(SystemExit) as exited, + ): + main_module.main() + + self.assertEqual(exited.exception.code, 0) + self.assertEqual(mock_signal.call_count, 2) + registered_signals = {call.args[0] for call in mock_signal.call_args_list} + self.assertEqual(registered_signals, {SIGINT, SIGTERM}) + mock_init_framework.assert_called_once_with() + + def test_02_main_as_root_exits(self): + with ( + self.patch("odev.__main__.os", "geteuid", return_value=0), + self.patch("odev.__main__.sys", "exit", side_effect=SystemExit(1)), + self.assertRaises(SystemExit), + ): + main_module.main() + + def test_03_main_keyboard_interrupt(self): + with ( + self.patch("odev.__main__.os", "geteuid", return_value=1000), + self.patch("odev.common", "init_framework", side_effect=KeyboardInterrupt), + self.patch("odev.common.signal_handling", "signal_handler_exit") as mock_signal_handler_exit, + ): + main_module.main() + + mock_signal_handler_exit.assert_called_once_with(SIGINT, None) + + def test_04_main_bdb_quit_exits(self): + with ( + self.patch("odev.__main__.os", "geteuid", return_value=1000), + self.patch("odev.common", "init_framework", side_effect=BdbQuit), + self.patch("odev.__main__.sys", "exit", side_effect=SystemExit(1)), + self.assertRaises(SystemExit), + ): + main_module.main() + + def test_05_main_odev_error_exits(self): + mock_logger = MagicMock() + with ( + self.patch("odev.__main__.os", "geteuid", return_value=1000), + self.patch("odev.common", "init_framework", side_effect=OdevError("expected failure")), + self.patch("odev.common.logging.logging", "getLogger", return_value=mock_logger), + self.patch("odev.__main__.sys", "exit", side_effect=SystemExit(1)), + self.assertRaises(SystemExit), + ): + main_module.main() + + mock_logger.error.assert_called_once() + self.assertIsInstance(mock_logger.error.call_args[0][0], OdevError) + + def test_06_main_unhandled_exception_exits(self): + mock_logger = MagicMock() + with ( + self.patch("odev.__main__.os", "geteuid", return_value=1000), + self.patch("odev.common", "init_framework", side_effect=RuntimeError("unexpected")), + self.patch("odev.common.logging.logging", "getLogger", return_value=mock_logger), + self.patch("odev.__main__.sys", "exit", side_effect=SystemExit(1)), + self.assertRaises(SystemExit), + ): + main_module.main() + + mock_logger.error.assert_called_once() + err_arg = mock_logger.error.call_args[0][0] + self.assertIn("Execution failed", err_arg) + self.assertIn("RuntimeError", err_arg) + tb_logged = any(call.args and "Traceback" in str(call.args[0]) for call in mock_logger.debug.call_args_list) + self.assertTrue(tb_logged) diff --git a/tests/tests/common/test_odev.py b/tests/tests/common/test_odev.py index 3bbc2b5c..4c6ae4f6 100644 --- a/tests/tests/common/test_odev.py +++ b/tests/tests/common/test_odev.py @@ -1,3 +1,4 @@ +import shutil import sys from pathlib import Path @@ -38,7 +39,11 @@ def test_03_restart(self): def test_04_restart_on_update(self): """Odev should restart itself when updated.""" - with self.patch(self.odev, "_update", return_value=True), self.patch(self.odev, "restart") as mock_restart: + with ( + self.patch(self.odev, "_Odev__should_update_now", return_value=True), + self.patch(self.odev, "_update", return_value=True), + self.patch(self.odev, "restart") as mock_restart, + ): self.odev.commands.clear() self.odev._started = False self.odev.start() @@ -128,3 +133,51 @@ def test_12_dispatch_command_error(self): self.odev.dispatch() mock_error.assert_called_once_with("Cannot display help for inexistent command 'invalid-command'") + + def test_14_dispatch_version(self): + """Odev should display its version when called with 'version' command.""" + sys.argv = ["odev", "version"] + + with CaptureOutput() as output: + self.odev.dispatch() + + # VersionCommand output includes name, version, and release channel info + self.assertIn(self.odev.version, output.stdout) + self.assertIn(self.odev.name.capitalize(), output.stdout) + + def test_15_register_plugin_commands_retries_after_failure(self): + """Plugin command registration should retry once after plugin updates.""" + with ( + self.patch_property(type(self.odev), "plugins", []), + self.patch( + self.odev, "_register_plugin_commands", side_effect=[RuntimeError("boom"), None] + ) as register_mock, + self.patch(logger, "error") as logger_error, + ): + self.odev.register_plugin_commands() + + self.assertEqual(register_mock.call_count, 2) + logger_error.assert_called_once() + + def test_16_plugins_dependency_tree_cycle_raises(self): + """Circular plugin dependencies should raise an explicit framework error.""" + cycle_root = self.run_path / "cycle-plugins" + plugin_a = cycle_root / "test_plugin_cycle_a" + plugin_b = cycle_root / "test_plugin_cycle_b" + plugin_a.mkdir(parents=True, exist_ok=True) + plugin_b.mkdir(parents=True, exist_ok=True) + + plugin_a_manifest = "__version__ = '1.0.0'\ndepends = ['cycle-plugins/test_plugin_cycle_b']\n" + plugin_b_manifest = "__version__ = '1.0.0'\ndepends = ['cycle-plugins/test_plugin_cycle_a']\n" + (plugin_a / "__manifest__.py").write_text(plugin_a_manifest) + (plugin_b / "__manifest__.py").write_text(plugin_b_manifest) + + try: + self.odev._plugins_dependency_tree.cache_clear() + with ( + self.patch_property(type(self.odev), "plugins_path", cycle_root), + self.assertRaisesRegex(Exception, "Circular dependency detected in plugins"), + ): + self.odev._plugins_dependency_tree() + finally: + shutil.rmtree(cycle_root, ignore_errors=True) diff --git a/tests/tests/common/test_python_env.py b/tests/tests/common/test_python_env.py new file mode 100644 index 00000000..03062cdf --- /dev/null +++ b/tests/tests/common/test_python_env.py @@ -0,0 +1,57 @@ +from pathlib import Path +from subprocess import CalledProcessError, CompletedProcess +from unittest.mock import MagicMock, patch + +from odev.common import bash +from odev.common.python import PythonEnv + +from tests.fixtures import OdevTestCase + + +class TestPythonEnv(OdevTestCase): + """Test the PythonEnv class.""" + + @classmethod + def setUpClass(cls): + with patch("odev.common.odev.Odev.start", return_value=None): + super().setUpClass() + + def test_run_script_streaming_success(self): + """Test run_script with a streaming process that succeeds.""" + env = PythonEnv(path="/tmp/fake_venv", version="3.10") # noqa: S108 + + with ( + self.patch(Path, "exists", return_value=True), + self.patch(PythonEnv, "exists", return_value=True), + self.patch(PythonEnv, "python", Path("/tmp/fake_venv/bin/python")), # noqa: S108 + self.patch(bash, "stream", return_value=iter(["line 1", "line 2"])), + ): + progress_mock = MagicMock() + result = env.run_script("fake_script.py", stream=True, progress=progress_mock) + + self.assertIsInstance(result, CompletedProcess) + self.assertEqual(result.returncode, 0) + self.assertEqual(progress_mock.call_count, 2) + progress_mock.assert_any_call("line 1") + progress_mock.assert_any_call("line 2") + + def test_run_script_streaming_failure(self): + """Test run_script with a streaming process that fails.""" + env = PythonEnv(path="/tmp/fake_venv", version="3.10") # noqa: S108 + + def streaming_failure(command): + yield "line 1" + raise CalledProcessError(1, command) + + with ( + self.patch(Path, "exists", return_value=True), + self.patch(PythonEnv, "exists", return_value=True), + self.patch(PythonEnv, "python", Path("/tmp/fake_venv/bin/python")), # noqa: S108 + self.patch(bash, "stream", side_effect=streaming_failure), + ): + progress_mock = MagicMock() + result = env.run_script("fake_script.py", stream=True, progress=progress_mock) + + self.assertIsInstance(result, CompletedProcess) + self.assertEqual(result.returncode, 1) + progress_mock.assert_called_once_with("line 1") diff --git a/tests/tests/common/test_rpc_connector.py b/tests/tests/common/test_rpc_connector.py new file mode 100644 index 00000000..858170d0 --- /dev/null +++ b/tests/tests/common/test_rpc_connector.py @@ -0,0 +1,76 @@ +from unittest.mock import MagicMock + +from odev.common.connectors.rpc import Model, RpcConnector +from odev.common.errors import ConnectorError + +from tests.fixtures import OdevTestCase + + +def _mock_database(*, url: str = "https://example.com:8069", rpc_port: int = 8069, running: bool = True): + db = MagicMock() + db.name = "db1" + db.url = url + db.rpc_port = rpc_port + db.running = running + plat = MagicMock() + plat.display = "Remote" + plat.name = "remote" + db.platform = plat + return db + + +class TestRpcConnectorProperties(OdevTestCase): + def test_url_host_without_explicit_port(self): + rpc = RpcConnector(_mock_database(url="https://example.com:8069")) + self.assertEqual(rpc.url, "example.com") + + def test_url_missing_raises(self): + rpc = RpcConnector(_mock_database(url="")) + with self.assertRaises(ConnectorError) as ctx: + _ = rpc.url + self.assertIn("URL not set", str(ctx.exception)) + + def test_port_missing_raises(self): + db = _mock_database() + db.rpc_port = None + rpc = RpcConnector(db) + with self.assertRaises(ConnectorError) as ctx: + _ = rpc.port + self.assertIn("RPC port not set", str(ctx.exception)) + + def test_protocol_jsonrpcs_on_https_port(self): + rpc = RpcConnector(_mock_database(rpc_port=443)) + self.assertEqual(rpc.protocol, "jsonrpcs") + + def test_protocol_jsonrpc_non_https(self): + rpc = RpcConnector(_mock_database(rpc_port=8069)) + self.assertEqual(rpc.protocol, "jsonrpc") + + def test_user_id_without_connection_raises(self): + rpc = RpcConnector(_mock_database()) + rpc._connection = None + with self.assertRaises(ConnectorError) as ctx: + _ = rpc.user_id + self.assertIn("user_id", str(ctx.exception)) + + def test_disconnect_clears_connection(self): + rpc = RpcConnector(_mock_database()) + rpc._connection = object() + rpc.disconnect() + self.assertIsNone(rpc._connection) + + +class TestRpcModel(OdevTestCase): + def test_read_group_requires_groupby(self): + inner = MagicMock() + inner.model_name = "res.partner" + odoo_conn = MagicMock() + odoo_conn.get_model.return_value = inner + conn = MagicMock() + conn.connected = True + conn._connection = odoo_conn + + model = Model(conn, "res.partner") + with self.assertRaises(ValueError) as ctx: + model.read_group([], fields=None, groupby=None) + self.assertIn("groupby", str(ctx.exception)) diff --git a/tests/tests/common/test_store.py b/tests/tests/common/test_store.py new file mode 100644 index 00000000..36bfa575 --- /dev/null +++ b/tests/tests/common/test_store.py @@ -0,0 +1,8 @@ +from tests.fixtures import OdevTestCase + + +class TestDataStore(OdevTestCase): + def test_store_exposes_table_helpers(self): + self.assertEqual(self.odev.store.databases.name, "databases") + self.assertEqual(self.odev.store.history.name, "history") + self.assertEqual(self.odev.store.secrets.name, "secrets") diff --git a/tests/tests/setup/test_setup.py b/tests/tests/setup/test_setup.py index 01b291c8..148f7051 100644 --- a/tests/tests/setup/test_setup.py +++ b/tests/tests/setup/test_setup.py @@ -6,8 +6,8 @@ from tests.fixtures import OdevTestCase -class TestSetupCompletion(OdevTestCase): - def test_01_completion(self): +class TestSetup(OdevTestCase): + def test_completion_01_completion(self): """Test the setup script responsible of creating a symlink to the bash completion script of odev. A symlink should be created on the file system. """ @@ -16,9 +16,7 @@ def test_01_completion(self): self.assertTrue(Path("~/.local/share/bash-completion/completions/complete_odev.sh").expanduser().is_symlink()) - -class TestSetupSymlink(OdevTestCase): - def test_01_symlink(self): + def test_symlink_01_symlink(self): """Test the setup script responsible of creating a symlink to odev. A symlink should be created to map the "odev" command to the main file of this application. @@ -28,6 +26,23 @@ def test_01_symlink(self): self.assertTrue(Path("~/.local/bin/odev").expanduser().is_symlink()) + def test_update_01_update(self): + """Test the setup script responsible of setting the auto-update values for odev. + The configuration file should be updated with the new values. + """ + self.odev.config.reset("update") + self.assertEqual(self.odev.config.update.mode, "ask", "should have a default value") + self.assertEqual(self.odev.config.update.interval, 1, "should have a default value") + + with ( + self.patch(update.console, "select", return_value="never"), + self.patch(update.console, "integer", return_value=5), + ): + update.setup(self.odev) + + self.assertEqual(self.odev.config.update.mode, "never", "should update the configuration file") + self.assertEqual(self.odev.config.update.interval, 5, "should update the configuration file") + class TestSetupDirectories(OdevTestCase): def setUp(self): @@ -94,22 +109,3 @@ def test_03_directories_empty(self): logger_debug.assert_any_call(f"Directory {new_dir_path.as_posix()} exists but is empty, removing it") logger_debug.assert_any_call(f"Moving {self.dir_path} to {new_dir_path}") - - -class TestSetupUpdate(OdevTestCase): - def test_01_update(self): - """Test the setup script responsible of setting the auto-update values for odev. - The configuration file should be updated with the new values. - """ - self.odev.config.reset("update") - self.assertEqual(self.odev.config.update.mode, "ask", "should have a default value") - self.assertEqual(self.odev.config.update.interval, 1, "should have a default value") - - with ( - self.patch(update.console, "select", return_value="never"), - self.patch(update.console, "integer", return_value=5), - ): - update.setup(self.odev) - - self.assertEqual(self.odev.config.update.mode, "never", "should update the configuration file") - self.assertEqual(self.odev.config.update.interval, 5, "should update the configuration file")