diff --git a/README.md b/README.md index 0e8fe5e..b89d370 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,10 @@ Afterwards, create a virtual environment and install the package there: _Hint: If you would like to get the debugger running, try to install the code as follows: `pip install -e .`_ +You also need an instance of [secvisogram/csaf-validator-service](https://github.com/secvisogram/csaf-validator-service) for CSAF document validation. +If you don't have access to one, first install it. +This step can be skipped for testing purposes. + ## How to use CVRF-CSAF-converter ### Usage as CLI tool @@ -74,6 +78,21 @@ For the installation using `venv`, the config file is located in `$PATH_TO_THE_V When installing the PyPI package with pip (--user), the config file is located in `$HOME/.local/lib/python3.X/site-packages/cvrf2csaf/config/config.yaml` Converter options can be changed there, or overridden by command line arguments/options. +### Validate using a validation service + +The tool by default validates the document using a validation service such as the [secvisogram/csaf-validator-service](https://github.com/secvisogram/csaf-validator-service). + +The output of the tool includes the validation result: +```bash +2025-05-29 09:25:11,415 - cvrf2csaf - INFO - CSAF validation successful. +# or +2025-05-29 09:24:10,594 - cvrf2csaf - WARNING - Some errors were found at validation: [...] +``` + +Use `--validator-endpoint` and `--validator-mode` to specify the URL of the validator service and the type of service (currently implemented: secvisogram). + +To disable the validation by an external validator, use `--no-validate`. + ## Specifications We follow the official OASIS specifications in order to provide as much acceptance on the user base as possible. diff --git a/cvrf2csaf/cvrf2csaf.py b/cvrf2csaf/cvrf2csaf.py index e12450a..fe3afdd 100644 --- a/cvrf2csaf/cvrf2csaf.py +++ b/cvrf2csaf/cvrf2csaf.py @@ -26,6 +26,8 @@ from .section_handlers.vulnerability import Vulnerability from .common.common import SectionHandler +from .validate import Validator, DEFAULT_ENDPOINT, DEFAULT_MODE, SUPPORTED_MODES + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(module)s - %(levelname)s - %(message)s') @@ -249,7 +251,7 @@ def validate_mandatory_tests(final_csaf): # pylint: disable=missing-function-docstring -def main(): +def parse_arguments() -> dict: # General args parser = argparse.ArgumentParser( description='Converts CVRF 1.2 XML input into CSAF 2.0 JSON output.') @@ -297,8 +299,26 @@ def main(): help="Default version used for CVSS version 3, when the version cannot be" " derived from other sources. Default value is '3.0'.") + # Validation + parser.add_argument('--no-validate', action='store_true', + help="Deactivate validation by a validator service") + parser.add_argument('--validator-endpoint', + default=DEFAULT_ENDPOINT, + help="The URL where the validator service is reachable. " + f"Default: {DEFAULT_ENDPOINT!r}.") + parser.add_argument('--validator-mode', + default=DEFAULT_MODE, + help=f"The Validator mode, currently supported: " + f"{','.join(SUPPORTED_MODES)}. Default: {DEFAULT_MODE!r}.") + + args = {k: v for k, v in vars(parser.parse_args()).items() if v is not None} + return args + +# pylint: disable=missing-function-docstring +def main(): + args = parse_arguments() config = get_config_from_file() # Update & rewrite config file values with the ones from command line arguments @@ -338,6 +358,15 @@ def main(): logging.warning('Some errors occurred during conversion,' ' but producing output as --force option is used.') + if not args['no_validate']: + validator = Validator(endpoint=args['validator_endpoint'], mode=args['validator_mode']) + validation_result = validator.validate(final_csaf) + if not validation_result[0]: + valid_output = False + logging.warning("Some errors were found at validation: %s", validation_result[1]) + else: + logging.info("CSAF validation successful.") + # Output / Store results file_name = create_file_name(final_csaf['document'].get('tracking', {}).get('id', None), valid_output) diff --git a/cvrf2csaf/validate.py b/cvrf2csaf/validate.py new file mode 100644 index 0000000..5de8fe1 --- /dev/null +++ b/cvrf2csaf/validate.py @@ -0,0 +1,80 @@ +""" +The module provides validation functionality +""" +from ssl import SSLContext +from typing import Any, Optional, Union, Tuple +from logging import getLogger + +from attrs import define, field +from httpx import Client, Timeout, RequestError + +SECVISOGRAM_TEMPLATE = { + "tests": [ + { + "name": "csaf_2_0", + "type": "test" + }, + { + "name": "schema", + "type": "preset" + } + ], + "document": { + } +} + +DEFAULT_MODE = 'secvisogram' +DEFAULT_ENDPOINT = 'http://localhost:8082/api/v1/validate' +SUPPORTED_MODES = [DEFAULT_MODE] + +# Don't show debug and info logs from httpx +getLogger('httpx').setLevel('WARNING') + + +@define # creates a constructor +class Validator: + """ + Calling the validation services. + Also accepts parameters for authentication (headers, cookies). + """ + endpoint: str = field(default=DEFAULT_ENDPOINT) + mode: str = field(default=DEFAULT_MODE) + _headers: dict[str, str] = field(factory=dict, kw_only=True, alias="headers") + _timeout: Optional[Timeout] = field(default=None, kw_only=True, alias="timeout") + _verify_ssl: Union[str, bool, SSLContext] = field(default=True, kw_only=True, + alias="verify_ssl") + _httpx_args: dict[str, Any] = field(factory=dict, kw_only=True, alias="httpx_args") + _cookies: Optional[dict] = field(default=None, init=False) + + @property + def client(self): + """ + Create a httpx Client object + """ + return Client( + cookies=self._cookies, + headers=self._headers, + timeout=self._timeout, + verify=self._verify_ssl, + **self._httpx_args, + ) + + def validate(self, document: dict) -> Tuple[bool, dict]: + """ + Call the validation enpoint and return the result + + Return values: + validity: True, if the document is valid, False if it's not + errors: List of errors + """ + if self.mode == 'secvisogram': + try: + result = self.client.post(self.endpoint, + json=SECVISOGRAM_TEMPLATE | {'document': document}).json() + except RequestError as e: + return False, str(e) + + errors = [test for test in result['tests'] if test['errors']] + return result['isValid'], errors + + raise NotImplementedError(f"Mode {self.mode} is not supported.") diff --git a/pyproject.toml b/pyproject.toml index d20125f..6747fef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,9 @@ requires = [ "setuptools", "wheel", - "setuptools_scm" + "setuptools_scm", + "attrs", + "httpx", ] build-backend = "setuptools.build_meta" diff --git a/requirements.txt b/requirements.txt index 3b72de8..79f84e4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ lxml>=5.0,<6 PyYAML>=6.0,<7 jsonschema>=4.4.0,<5 -turvallisuusneuvonta \ No newline at end of file +turvallisuusneuvonta +attrs +httpx diff --git a/tests/test_cvrf_full/test_validator.py b/tests/test_cvrf_full/test_validator.py new file mode 100644 index 0000000..57cc4bd --- /dev/null +++ b/tests/test_cvrf_full/test_validator.py @@ -0,0 +1,57 @@ +""" +Minimal test for the Validator class +""" +from json import load +from pathlib import Path + +from cvrf2csaf.validate import Validator + + +MINIMAL_DOCUMENT = { + "document": { + "csaf_version": "2.0", + "category": "category", + "title": "title", + "publisher": { + "category": "vendor", + "name": "name", + "namespace": "https://example.com", + }, + "tracking": { + "current_release_date": "2025-01-01T00:00:00.000+00:00", + "initial_release_date": "2025-01-01T00:00:00.000+00:00", + "id": "0", + "revision_history": [ + { + "date": "2025-01-01T00:00:00.000+00:00", + "number": "0", + "summary": "text", + } + ], + "status": "final", + "version": "0", + }, + } +} + + +def test_validator_empty(): + """ + a minimal, basically empty document that validates + """ + validator = Validator() + result = validator.validate(MINIMAL_DOCUMENT) + assert result[0] is True + assert result[1] == [] + + +def test_validator_cvrf_full(): + """ + invalid document + """ + document = load((Path(__file__).parent / + "../../tests/test_cvrf_full/test_cvrf_full.json").open()) + validator = Validator() + result = validator.validate(document) + assert result[0] is False + assert len(result[1]) > 0