diff --git a/CHANGELOG.md b/CHANGELOG.md index 40c997f..d8a24a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Upcoming changes... +## [1.24.0] - 2025-05-06 +### Added +- Add `crypto` subcommand to retrieve cryptographic algorithms for the given components +- Add `crypto hints` subcommand to retrieve cryptographic hints for the given components +- Add `crypto versions-in-range` subcommand to retrieve cryptographic versions in range for the given components + ## [1.23.0] - 2025-04-24 ### Added - Add `--origin` flag to `component provenance` subcommand to retrieve provenance using contributors origin @@ -515,4 +521,5 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 [1.20.6]: https://github.com/scanoss/scanoss.py/compare/v1.20.5...v1.20.6 [1.21.0]: https://github.com/scanoss/scanoss.py/compare/v1.20.6...v1.21.0 [1.22.0]: https://github.com/scanoss/scanoss.py/compare/v1.21.0...v1.22.0 -[1.23.0]: https://github.com/scanoss/scanoss.py/compare/v1.22.0...v1.23.0 \ No newline at end of file +[1.23.0]: https://github.com/scanoss/scanoss.py/compare/v1.22.0...v1.23.0 +[1.24.0]: https://github.com/scanoss/scanoss.py/compare/v1.23.0...v1.24.0 \ No newline at end of file diff --git a/docs/source/index.rst b/docs/source/index.rst index ba3e8d8..f70f93d 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -336,6 +336,91 @@ Scans Docker container images for dependencies, extracting and analyzing compone * - --ca-cert - Alternative certificate PEM file +----------------- +Crypto: crypto, cr +----------------- + +Provides subcommands to retrieve cryptographic information for components. + +.. code-block:: bash + + scanoss-py crypto + +Subcommands: +~~~~~~~~~~~~ + +**algorithms (alg)** + Retrieve cryptographic algorithms for the given components. + + .. code-block:: bash + + scanoss-py crypto algorithms --purl + + .. list-table:: + :widths: 20 30 + :header-rows: 1 + + * - Argument + - Description + * - --with-range + - Returns the list of versions in the specified range that contains cryptographic algorithms. (Replaces the previous --range option) + +**hints** + Retrieve encryption hints for the given components. + + .. code-block:: bash + + scanoss-py crypto hints --purl + + .. list-table:: + :widths: 20 30 + :header-rows: 1 + + * - Argument + - Description + * - --with-range + - Returns the list of versions in the specified range that contains encryption hints. + +**versions-in-range (vr)** + Given a list of PURLs and version ranges, get a list of versions that do/don't contain crypto algorithms. + + .. code-block:: bash + + scanoss-py crypto versions-in-range --purl + +Common Crypto Arguments: +~~~~~~~~~~~~~~~~~~~~~~~~ +The following arguments are common to the ``algorithms``, ``hints``, and ``versions-in-range`` subcommands: + +.. list-table:: + :widths: 20 30 + :header-rows: 1 + + * - Argument + - Description + * - --purl , -p + - Package URL (PURL) to process. Can be specified multiple times. + * - --input , -i + - Input file name containing PURLs. + * - --output , -o + - Output result file name (optional - default STDOUT). + * - --timeout , -M + - Timeout (in seconds) for API communication (optional - default 600). + * - --key , -k + - SCANOSS API Key token (optional - not required for default OSSKB URL). + * - --api2url + - SCANOSS gRPC API 2.0 URL (optional - default: https://api.osskb.org). + * - --grpc-proxy + - GRPC Proxy URL to use for connections. + * - --ca-cert + - Alternative certificate PEM file. + * - --debug, -d + - Enable debug messages. + * - --trace, -t + - Enable trace messages, including API posts. + * - --quiet, -q + - Enable quiet mode. + ----------------- Component: ----------------- diff --git a/src/scanoss/__init__.py b/src/scanoss/__init__.py index 810ceca..228a61a 100644 --- a/src/scanoss/__init__.py +++ b/src/scanoss/__init__.py @@ -22,4 +22,4 @@ THE SOFTWARE. """ -__version__ = '1.23.0' +__version__ = '1.24.0' diff --git a/src/scanoss/cli.py b/src/scanoss/cli.py index 00a97c6..ec88177 100644 --- a/src/scanoss/cli.py +++ b/src/scanoss/cli.py @@ -31,6 +31,7 @@ import pypac +from scanoss.cryptography import Cryptography, create_cryptography_config_from_args from scanoss.scanners.container_scanner import ( DEFAULT_SYFT_COMMAND, DEFAULT_SYFT_TIMEOUT, @@ -50,6 +51,7 @@ from . import __version__ from .components import Components from .constants import ( + DEFAULT_API_TIMEOUT, DEFAULT_POST_SIZE, DEFAULT_RETRY, DEFAULT_TIMEOUT, @@ -292,15 +294,6 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915 help='component sub-commands', ) - # Component Sub-command: component crypto - c_crypto = comp_sub.add_parser( - 'crypto', - aliases=['cr'], - description=f'Show Cryptographic algorithms: {__version__}', - help='Retrieve cryptographic algorithms for the given components', - ) - c_crypto.set_defaults(func=comp_crypto) - # Component Sub-command: component vulns c_vulns = comp_sub.add_parser( 'vulns', @@ -361,18 +354,76 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915 c_versions.add_argument('--limit', '-l', type=int, help='Generic component search') c_versions.set_defaults(func=comp_versions) + # Sub-command: crypto + p_crypto = subparsers.add_parser( + 'crypto', + aliases=['cr'], + description=f'SCANOSS Crypto commands: {__version__}', + help='Crypto support commands', + ) + crypto_sub = p_crypto.add_subparsers( + title='Crypto Commands', + dest='subparsercmd', + description='crypto sub-commands', + help='crypto sub-commands', + ) + + # GetAlgorithms and GetAlgorithmsInRange gRPC APIs + p_crypto_algorithms = crypto_sub.add_parser( + 'algorithms', + aliases=['alg'], + description=f'Show Cryptographic algorithms: {__version__}', + help='Retrieve cryptographic algorithms for the given components', + ) + p_crypto_algorithms.add_argument( + '--with-range', + action='store_true', + help='Returns the list of versions in the specified range that contains cryptographic algorithms', + ) + p_crypto_algorithms.set_defaults(func=crypto_algorithms) + + # GetEncryptionHints and GetHintsInRange gRPC APIs + p_crypto_hints = crypto_sub.add_parser( + 'hints', + description=f'Show Encryption hints: {__version__}', + help='Retrieve encryption hints for the given components', + ) + p_crypto_hints.add_argument( + '--with-range', + action='store_true', + help='Returns the list of versions in the specified range that contains encryption hints', + ) + p_crypto_hints.set_defaults(func=crypto_hints) + + p_crypto_versions_in_range = crypto_sub.add_parser( + 'versions-in-range', + aliases=['vr'], + description=f'Show versions in range: {__version__}', + help="Given a list of PURLS and version ranges, get a list of versions that do/don't contain crypto algorithms", + ) + p_crypto_versions_in_range.set_defaults(func=crypto_versions_in_range) + # Common purl Component sub-command options - for p in [c_crypto, c_vulns, c_semgrep, c_provenance]: + for p in [c_vulns, c_semgrep, c_provenance, p_crypto_algorithms, p_crypto_hints, p_crypto_versions_in_range]: p.add_argument('--purl', '-p', type=str, nargs='*', help='Package URL - PURL to process.') p.add_argument('--input', '-i', type=str, help='Input file name') # Common Component sub-command options - for p in [c_crypto, c_vulns, c_search, c_versions, c_semgrep, c_provenance]: + for p in [ + c_vulns, + c_search, + c_versions, + c_semgrep, + c_provenance, + p_crypto_algorithms, + p_crypto_hints, + p_crypto_versions_in_range, + ]: p.add_argument( '--timeout', '-M', type=int, - default=600, + default=DEFAULT_API_TIMEOUT, help='Timeout (in seconds) for API communication (optional - default 600)', ) @@ -588,7 +639,6 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915 p_dep, p_fc, p_cnv, - c_crypto, c_vulns, c_search, c_versions, @@ -597,6 +647,9 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915 p_c_dwnld, p_folder_scan, p_folder_hash, + p_crypto_algorithms, + p_crypto_hints, + p_crypto_versions_in_range, ]: p.add_argument('--output', '-o', type=str, help='Output result file name (optional - default stdout).') @@ -674,7 +727,6 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915 # Global Scan/GRPC options for p in [ p_scan, - c_crypto, c_vulns, c_search, c_versions, @@ -682,6 +734,9 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915 c_provenance, p_folder_scan, p_cs, + p_crypto_algorithms, + p_crypto_hints, + p_crypto_versions_in_range, ]: p.add_argument( '--key', '-k', type=str, help='SCANOSS API Key token (optional - not required for default OSSKB URL)' @@ -708,7 +763,19 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915 ) # Global GRPC options - for p in [p_scan, c_crypto, c_vulns, c_search, c_versions, c_semgrep, c_provenance, p_folder_scan, p_cs]: + for p in [ + p_scan, + c_vulns, + c_search, + c_versions, + c_semgrep, + c_provenance, + p_folder_scan, + p_cs, + p_crypto_algorithms, + p_crypto_hints, + p_crypto_versions_in_range, + ]: p.add_argument( '--api2url', type=str, help='SCANOSS gRPC API 2.0 URL (optional - default: https://api.osskb.org)' ) @@ -751,7 +818,6 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915 p_c_loc, p_c_dwnld, p_p_proxy, - c_crypto, c_vulns, c_search, c_versions, @@ -763,6 +829,9 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915 p_folder_scan, p_folder_hash, p_cs, + p_crypto_algorithms, + p_crypto_hints, + p_crypto_versions_in_range, ]: p.add_argument('--debug', '-d', action='store_true', help='Enable debug messages') p.add_argument('--trace', '-t', action='store_true', help='Enable trace messages, including API posts') @@ -775,7 +844,9 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915 if not args.subparser: parser.print_help() # No sub command subcommand, print general help sys.exit(1) - elif (args.subparser in ('utils', 'ut', 'component', 'comp', 'inspect', 'insp', 'ins')) and not args.subparsercmd: + elif ( + args.subparser in ('utils', 'ut', 'component', 'comp', 'inspect', 'insp', 'ins', 'crypto', 'cr') + ) and not args.subparsercmd: parser.parse_args([args.subparser, '--help']) # Force utils helps to be displayed sys.exit(1) args.func(parser, args) # Execute the function associated with the sub-command @@ -1393,9 +1464,9 @@ def get_pac_file(pac: str): return pac_file -def comp_crypto(parser, args): +def crypto_algorithms(parser, args): """ - Run the "component crypto" sub-command + Run the "crypto algorithms" sub-command Parameters ---------- parser: ArgumentParser @@ -1410,22 +1481,112 @@ def comp_crypto(parser, args): if args.ca_cert and not os.path.exists(args.ca_cert): print_stderr(f'Error: Certificate file does not exist: {args.ca_cert}.') sys.exit(1) - pac_file = get_pac_file(args.pac) - comps = Components( - debug=args.debug, - trace=args.trace, - quiet=args.quiet, - grpc_url=args.api2url, - api_key=args.key, - ca_cert=args.ca_cert, - proxy=args.proxy, - grpc_proxy=args.grpc_proxy, - pac=pac_file, - timeout=args.timeout, - req_headers=process_req_headers(args.header), - ) - if not comps.get_crypto_details(args.input, args.purl, args.output): + try: + config = create_cryptography_config_from_args(args) + grpc_config = create_grpc_config_from_args(args) + if args.pac: + grpc_config.pac = get_pac_file(args.pac) + if args.header: + grpc_config.req_headers = process_req_headers(args.header) + client = ScanossGrpc(**asdict(grpc_config)) + + cryptography = Cryptography(config=config, client=client) + cryptography.get_algorithms() + cryptography.present(output_file=args.output) + except ScanossGrpcError as e: + print_stderr(f'API ERROR: {e}') + sys.exit(1) + except Exception as e: + if args.debug: + import traceback + + traceback.print_exc() + print_stderr(f'ERROR: {e}') + sys.exit(1) + + +def crypto_hints(parser, args): + """ + Run the "crypto hints" sub-command + Parameters + ---------- + parser: ArgumentParser + command line parser object + args: Namespace + Parsed arguments + """ + if (not args.purl and not args.input) or (args.purl and args.input): + print_stderr('Please specify an input file or purl to decorate (--purl or --input)') + parser.parse_args([args.subparser, args.subparsercmd, '-h']) + sys.exit(1) + if args.ca_cert and not os.path.exists(args.ca_cert): + print_stderr(f'Error: Certificate file does not exist: {args.ca_cert}.') + sys.exit(1) + + try: + config = create_cryptography_config_from_args(args) + grpc_config = create_grpc_config_from_args(args) + if args.pac: + grpc_config.pac = get_pac_file(args.pac) + if args.header: + grpc_config.req_headers = process_req_headers(args.header) + client = ScanossGrpc(**asdict(grpc_config)) + + cryptography = Cryptography(config=config, client=client) + cryptography.get_encryption_hints() + cryptography.present(output_file=args.output) + except ScanossGrpcError as e: + print_stderr(f'API ERROR: {e}') + sys.exit(1) + except Exception as e: + if args.debug: + import traceback + + traceback.print_exc() + print_stderr(f'ERROR: {e}') + sys.exit(1) + + +def crypto_versions_in_range(parser, args): + """ + Run the "crypto versions-in-range" sub-command + Parameters + ---------- + parser: ArgumentParser + command line parser object + args: Namespace + Parsed arguments + """ + if (not args.purl and not args.input) or (args.purl and args.input): + print_stderr('Please specify an input file or purl to decorate (--purl or --input)') + parser.parse_args([args.subparser, args.subparsercmd, '-h']) + sys.exit(1) + if args.ca_cert and not os.path.exists(args.ca_cert): + print_stderr(f'Error: Certificate file does not exist: {args.ca_cert}.') + sys.exit(1) + + try: + config = create_cryptography_config_from_args(args) + grpc_config = create_grpc_config_from_args(args) + if args.pac: + grpc_config.pac = get_pac_file(args.pac) + if args.header: + grpc_config.req_headers = process_req_headers(args.header) + client = ScanossGrpc(**asdict(grpc_config)) + + cryptography = Cryptography(config=config, client=client) + cryptography.get_versions_in_range() + cryptography.present(output_file=args.output) + except ScanossGrpcError as e: + print_stderr(f'API ERROR: {e}') + sys.exit(1) + except Exception as e: + if args.debug: + import traceback + + traceback.print_exc() + print_stderr(f'ERROR: {e}') sys.exit(1) diff --git a/src/scanoss/constants.py b/src/scanoss/constants.py index 3bce0ee..1dd9bd6 100644 --- a/src/scanoss/constants.py +++ b/src/scanoss/constants.py @@ -10,3 +10,5 @@ DEFAULT_URL = 'https://api.osskb.org' # default free service URL DEFAULT_URL2 = 'https://api.scanoss.com' # default premium service URL + +DEFAULT_API_TIMEOUT = 600 diff --git a/src/scanoss/cryptography.py b/src/scanoss/cryptography.py new file mode 100644 index 0000000..7fcc598 --- /dev/null +++ b/src/scanoss/cryptography.py @@ -0,0 +1,274 @@ +import json +from dataclasses import dataclass +from typing import Dict, List, Optional + +from scanoss.scanossbase import ScanossBase +from scanoss.scanossgrpc import ScanossGrpc +from scanoss.utils.abstract_presenter import AbstractPresenter +from scanoss.utils.file import validate_json_file + + +class ScanossCryptographyError(Exception): + pass + + +MIN_SPLIT_PARTS = 2 + + +@dataclass +class CryptographyConfig: + purl: List[str] + input_file: Optional[str] = None + output_file: Optional[str] = None + header: Optional[str] = None + debug: bool = False + trace: bool = False + quiet: bool = False + with_range: bool = False + + def __post_init__(self): + """ + Validate that the configuration is valid. + """ + if self.purl: + if self.with_range: + for purl in self.purl: + parts = purl.split('@') + if not (len(parts) >= MIN_SPLIT_PARTS and parts[1]): + raise ScanossCryptographyError( + f'Invalid PURL format: "{purl}".' f'It must include a version (e.g., pkg:type/name@version)' + ) + if self.input_file: + input_file_validation = validate_json_file(self.input_file) + if not input_file_validation.is_valid: + raise ScanossCryptographyError( + f'There was a problem with the purl input file. {input_file_validation.error}' + ) + if ( + not isinstance(input_file_validation.data, dict) + or 'purls' not in input_file_validation.data + or not isinstance(input_file_validation.data['purls'], list) + or not all(isinstance(p, dict) and 'purl' in p for p in input_file_validation.data['purls']) + ): + raise ScanossCryptographyError('The supplied input file is not in the correct PurlRequest format.') + purls = input_file_validation.data['purls'] + purls_with_requirement = [] + if self.with_range: + if any('requirement' not in p for p in purls): + raise ScanossCryptographyError( + f'One or more PURLs in "{self.input_file}" are missing the "requirement" field.' + ) + else: + for purl in purls: + purls_with_requirement.append(f'{purl["purl"]}@{purl["requirement"]}') + else: + purls_with_requirement = purls + self.purl = purls_with_requirement + + +def create_cryptography_config_from_args(args) -> CryptographyConfig: + return CryptographyConfig( + debug=getattr(args, 'debug', False), + trace=getattr(args, 'trace', False), + quiet=getattr(args, 'quiet', False), + with_range=getattr(args, 'with_range', False), + purl=getattr(args, 'purl', []), + input_file=getattr(args, 'input', None), + output_file=getattr(args, 'output', None), + header=getattr(args, 'header', None), + ) + + +class Cryptography: + """ + Cryptography Class + + This class is used to decorate purls with cryptography information. + """ + + def __init__( + self, + config: CryptographyConfig, + client: ScanossGrpc, + ): + """ + Initialize the Cryptography. + + Args: + config (CryptographyConfig): Configuration parameters for the cryptography. + client (ScanossGrpc): gRPC client for communicating with the scanning service. + """ + self.base = ScanossBase( + debug=config.debug, + trace=config.trace, + quiet=config.quiet, + ) + self.presenter = CryptographyPresenter( + self, + debug=config.debug, + trace=config.trace, + quiet=config.quiet, + ) + + self.client = client + self.config = config + self.purls_request = self._build_purls_request() + self.results = None + + def get_algorithms(self) -> Optional[Dict]: + """ + Get the cryptographic algorithms for the provided purl or input file. + + Returns: + Optional[Dict]: The folder hash response from the gRPC client, or None if an error occurs. + """ + + if not self.purls_request: + raise ScanossCryptographyError('No PURLs supplied. Provide --purl or --input.') + self.base.print_stderr( + f'Getting cryptographic algorithms for {", ".join([p["purl"] for p in self.purls_request["purls"]])}' + ) + if self.config.with_range: + response = self.client.get_crypto_algorithms_in_range_for_purl(self.purls_request) + else: + response = self.client.get_crypto_algorithms_for_purl(self.purls_request) + if response: + self.results = response + + return self.results + + def get_encryption_hints(self) -> Optional[Dict]: + """ + Get the encryption hints for the provided purl or input file. + + Returns: + Optional[Dict]: The encryption hints response from the gRPC client, or None if an error occurs. + """ + + if not self.purls_request: + raise ScanossCryptographyError('No PURLs supplied. Provide --purl or --input.') + self.base.print_stderr( + f'Getting encryption hints ' + f'{"in range" if self.config.with_range else ""} ' + f'for {", ".join([p["purl"] for p in self.purls_request["purls"]])}' + ) + if self.config.with_range: + response = self.client.get_encryption_hints_in_range_for_purl(self.purls_request) + else: + response = self.client.get_encryption_hints_for_purl(self.purls_request) + if response: + self.results = response + + return self.results + + def get_versions_in_range(self) -> Optional[Dict]: + """ + Given a list of PURLS and version ranges, get a list of versions that do/do not contain cryptographic algorithms + + Returns: + Optional[Dict]: The versions in range response from the gRPC client, or None if an error occurs. + """ + + if not self.purls_request: + raise ScanossCryptographyError('No PURLs supplied. Provide --purl or --input.') + + self.base.print_stderr( + f'Getting versions in range for {", ".join([p["purl"] for p in self.purls_request["purls"]])}' + ) + + response = self.client.get_versions_in_range_for_purl(self.purls_request) + if response: + self.results = response + + return self.results + + def _build_purls_request( + self, + ) -> Optional[dict]: + """ + Load the specified purls from a JSON file or a list of PURLs and return a dictionary + + Args: + json_file (Optional[str], optional): The JSON file containing the PURLs. Defaults to None. + purls (Optional[List[str]], optional): The list of PURLs. Defaults to None. + + Returns: + Optional[dict]: The dictionary containing the PURLs + """ + return { + 'purls': [ + { + 'purl': p, + 'requirement': self._extract_version_from_purl(p), + } + for p in self.config.purl + ] + } + + def _extract_version_from_purl(self, purl: str) -> str: + """ + Extract version from purl + + Args: + purl (str): The purl string to extract the version from + + Returns: + str: The extracted version + + Raises: + ScanossCryptographyError: If the purl is not in the correct format + """ + try: + return purl.split('@')[-1] + except IndexError: + raise ScanossCryptographyError(f'Invalid purl format: {purl}') + + def present( + self, + output_format: Optional[str] = None, + output_file: Optional[str] = None, + ): + """Present the results in the selected format""" + self.presenter.present(output_format=output_format, output_file=output_file) + + +class CryptographyPresenter(AbstractPresenter): + """ + Cryptography presenter class + Handles the presentation of the cryptography results + """ + + def __init__(self, cryptography: Cryptography, **kwargs): + super().__init__(**kwargs) + self.cryptography = cryptography + + def _format_json_output(self) -> str: + """ + Format the scan output data into a JSON object + + Returns: + str: The formatted JSON string + """ + return json.dumps(self.cryptography.results, indent=2) + + def _format_plain_output(self) -> str: + """ + Format the scan output data into a plain text string + """ + return ( + json.dumps(self.cryptography.results, indent=2) + if isinstance(self.cryptography.results, dict) + else str(self.cryptography.results) + ) + + def _format_cyclonedx_output(self) -> str: + raise NotImplementedError('CycloneDX output is not implemented') + + def _format_spdxlite_output(self) -> str: + raise NotImplementedError('SPDXlite output is not implemented') + + def _format_csv_output(self) -> str: + raise NotImplementedError('CSV output is not implemented') + + def _format_raw_output(self) -> str: + raise NotImplementedError('Raw output is not implemented') diff --git a/src/scanoss/scanossgrpc.py b/src/scanoss/scanossgrpc.py index 17752f4..189f4c1 100644 --- a/src/scanoss/scanossgrpc.py +++ b/src/scanoss/scanossgrpc.py @@ -54,7 +54,6 @@ CompVersionResponse, ) from .api.components.v2.scanoss_components_pb2_grpc import ComponentsStub -from .api.cryptography.v2.scanoss_cryptography_pb2 import AlgorithmResponse from .api.cryptography.v2.scanoss_cryptography_pb2_grpc import CryptographyStub from .api.dependencies.v2.scanoss_dependencies_pb2 import DependencyRequest from .api.dependencies.v2.scanoss_dependencies_pb2_grpc import DependenciesStub @@ -312,36 +311,6 @@ def process_file(file): merged_response['status'] = response['status'] return merged_response - def get_crypto_json(self, purls: dict) -> dict: - """ - Client function to call the rpc for Cryptography GetAlgorithms - :param purls: Message to send to the service - :return: Server response or None - """ - if not purls: - self.print_stderr('ERROR: No message supplied to send to gRPC service.') - return None - request_id = str(uuid.uuid4()) - resp: AlgorithmResponse - try: - request = ParseDict(purls, PurlRequest()) # Parse the JSON/Dict into the purl request object - metadata = self.metadata[:] - metadata.append(('x-request-id', request_id)) # Set a Request ID - self.print_debug(f'Sending crypto data for decoration (rqId: {request_id})...') - resp = self.crypto_stub.GetAlgorithms(request, metadata=metadata, timeout=self.timeout) - except Exception as e: - self.print_stderr( - f'ERROR: {e.__class__.__name__} Problem encountered sending gRPC message (rqId: {request_id}): {e}' - ) - else: - if resp: - if not self._check_status_response(resp.status, request_id): - return None - resp_dict = MessageToDict(resp, preserving_proto_field_name=True) # Convert gRPC response to a dict - del resp_dict['status'] - return resp_dict - return None - def get_vulnerabilities_json(self, purls: dict) -> dict: """ Client function to call the rpc for Vulnerability GetVulnerabilities @@ -607,6 +576,91 @@ def get_provenance_origin(self, request: Dict) -> Optional[Dict]: 'Sending data for provenance origin decoration (rqId: {rqId})...', ) + def get_crypto_algorithms_for_purl(self, request: Dict) -> Optional[Dict]: + """ + Client function to call the rpc for GetAlgorithms for a list of purls + + Args: + request (Dict): PurlRequest + + Returns: + Optional[Dict]: AlgorithmResponse, or None if the request was not successfull + """ + return self._call_rpc( + self.crypto_stub.GetAlgorithms, + request, + PurlRequest, + 'Sending data for cryptographic algorithms decoration (rqId: {rqId})...', + ) + + def get_crypto_algorithms_in_range_for_purl(self, request: Dict) -> Optional[Dict]: + """ + Client function to call the rpc for GetAlgorithmsInRange for a list of purls + + Args: + request (Dict): PurlRequest + + Returns: + Optional[Dict]: AlgorithmsInRangeResponse, or None if the request was not successfull + """ + return self._call_rpc( + self.crypto_stub.GetAlgorithmsInRange, + request, + PurlRequest, + 'Sending data for cryptographic algorithms in range decoration (rqId: {rqId})...', + ) + + def get_encryption_hints_for_purl(self, request: Dict) -> Optional[Dict]: + """ + Client function to call the rpc for GetEncryptionHints for a list of purls + + Args: + request (Dict): PurlRequest + + Returns: + Optional[Dict]: HintsResponse, or None if the request was not successfull + """ + return self._call_rpc( + self.crypto_stub.GetEncryptionHints, + request, + PurlRequest, + 'Sending data for encryption hints decoration (rqId: {rqId})...', + ) + + def get_encryption_hints_in_range_for_purl(self, request: Dict) -> Optional[Dict]: + """ + Client function to call the rpc for GetHintsInRange for a list of purls + + Args: + request (Dict): PurlRequest + + Returns: + Optional[Dict]: HintsInRangeResponse, or None if the request was not successfull + """ + return self._call_rpc( + self.crypto_stub.GetHintsInRange, + request, + PurlRequest, + 'Sending data for encryption hints in range decoration (rqId: {rqId})...', + ) + + def get_versions_in_range_for_purl(self, request: Dict) -> Optional[Dict]: + """ + Client function to call the rpc for GetVersionsInRange for a list of purls + + Args: + request (Dict): PurlRequest + + Returns: + Optional[Dict]: VersionsInRangeResponse, or None if the request was not successfull + """ + return self._call_rpc( + self.crypto_stub.GetVersionsInRange, + request, + PurlRequest, + 'Sending data for cryptographic versions in range decoration (rqId: {rqId})...', + ) + def load_generic_headers(self): """ Adds custom headers from req_headers to metadata. @@ -637,10 +691,11 @@ class GrpcConfig: quiet: Optional[bool] = False ver_details: Optional[str] = None ca_cert: Optional[str] = None - pac: Optional[PACFile] = None timeout: Optional[int] = DEFAULT_TIMEOUT proxy: Optional[str] = None grpc_proxy: Optional[str] = None + pac: Optional[PACFile] = None + req_headers: Optional[dict] = None def create_grpc_config_from_args(args) -> GrpcConfig: @@ -652,7 +707,6 @@ def create_grpc_config_from_args(args) -> GrpcConfig: quiet=getattr(args, 'quiet', False), ver_details=getattr(args, 'ver_details', None), ca_cert=getattr(args, 'ca_cert', None), - pac=getattr(args, 'pac', None), timeout=getattr(args, 'timeout', DEFAULT_TIMEOUT), proxy=getattr(args, 'proxy', None), grpc_proxy=getattr(args, 'grpc_proxy', None), diff --git a/src/scanoss/utils/file.py b/src/scanoss/utils/file.py index ab28327..84c6dc4 100644 --- a/src/scanoss/utils/file.py +++ b/src/scanoss/utils/file.py @@ -49,8 +49,8 @@ def validate_json_file(json_file_path: str) -> JsonValidation: json_file_path (str): The JSON file to validate Returns: - Tuple[bool, str]: A tuple containing a boolean indicating if the file is valid and a message - """ + JsonValidation: A JsonValidation object containing a boolean indicating if the file is valid, the data, error, and error code + """ # noqa: E501 if not json_file_path: return JsonValidation(is_valid=False, error='No JSON file specified') if not os.path.isfile(json_file_path):