Skip to content

feat: [SP-2400] - Create crypto sub command #117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 54 additions & 33 deletions src/scanoss/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import pypac

from scanoss.cryptography import Cryptography, create_cryptography_config_from_args
from scanoss.scanners.container_scanner import (
DEFAULT_SYFT_COMMAND,
DEFAULT_SYFT_TIMEOUT,
Expand All @@ -50,6 +51,7 @@
from . import __version__
from .components import Components
from .constants import (
DEFAULT_API_TIMEOUT,
DEFAULT_POST_SIZE,
DEFAULT_RETRY,
DEFAULT_TIMEOUT,
Expand Down Expand Up @@ -292,15 +294,6 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915
help='component sub-commands',
)

# Component Sub-command: component crypto
c_crypto = comp_sub.add_parser(
'crypto',
aliases=['cr'],
description=f'Show Cryptographic algorithms: {__version__}',
help='Retrieve cryptographic algorithms for the given components',
)
c_crypto.set_defaults(func=comp_crypto)

# Component Sub-command: component vulns
c_vulns = comp_sub.add_parser(
'vulns',
Expand Down Expand Up @@ -361,18 +354,48 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915
c_versions.add_argument('--limit', '-l', type=int, help='Generic component search')
c_versions.set_defaults(func=comp_versions)

# Sub-command: crypto
p_crypto = subparsers.add_parser(
'crypto',
aliases=['cr'],
description=f'SCANOSS Crypto commands: {__version__}',
help='Crypto support commands',
)
crypto_sub = p_crypto.add_subparsers(
title='Crypto Commands',
dest='subparsercmd',
description='crypto sub-commands',
help='crypto sub-commands',
required=True,
)

# GetAlgorithms and GetAlgorithmsInRange gRPC APIs
p_crypto_algorithms = crypto_sub.add_parser(
'algorithms',
aliases=['alg'],
description=f'Show Cryptographic algorithms: {__version__}',
help='Retrieve cryptographic algorithms for the given components',
)
p_crypto_algorithms.add_argument(
'--range',
'-r',
type=str,
help='Returns the list of versions in the specified range that contains cryptographic algorithms',
)
p_crypto_algorithms.set_defaults(func=crypto_algorithms)

# Common purl Component sub-command options
for p in [c_crypto, c_vulns, c_semgrep, c_provenance]:
for p in [c_vulns, c_semgrep, c_provenance, p_crypto_algorithms]:
p.add_argument('--purl', '-p', type=str, nargs='*', help='Package URL - PURL to process.')
p.add_argument('--input', '-i', type=str, help='Input file name')

# Common Component sub-command options
for p in [c_crypto, c_vulns, c_search, c_versions, c_semgrep, c_provenance]:
for p in [c_vulns, c_search, c_versions, c_semgrep, c_provenance, p_crypto_algorithms]:
p.add_argument(
'--timeout',
'-M',
type=int,
default=600,
default=DEFAULT_API_TIMEOUT,
help='Timeout (in seconds) for API communication (optional - default 600)',
)

Expand Down Expand Up @@ -588,7 +611,6 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915
p_dep,
p_fc,
p_cnv,
c_crypto,
c_vulns,
c_search,
c_versions,
Expand All @@ -597,6 +619,7 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915
p_c_dwnld,
p_folder_scan,
p_folder_hash,
p_crypto_algorithms,
]:
p.add_argument('--output', '-o', type=str, help='Output result file name (optional - default stdout).')

Expand Down Expand Up @@ -674,14 +697,14 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915
# Global Scan/GRPC options
for p in [
p_scan,
c_crypto,
c_vulns,
c_search,
c_versions,
c_semgrep,
c_provenance,
p_folder_scan,
p_cs,
p_crypto_algorithms,
]:
p.add_argument(
'--key', '-k', type=str, help='SCANOSS API Key token (optional - not required for default OSSKB URL)'
Expand All @@ -708,7 +731,7 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915
)

# Global GRPC options
for p in [p_scan, c_crypto, c_vulns, c_search, c_versions, c_semgrep, c_provenance, p_folder_scan, p_cs]:
for p in [p_scan, c_vulns, c_search, c_versions, c_semgrep, c_provenance, p_folder_scan, p_cs, p_crypto_algorithms]:
p.add_argument(
'--api2url', type=str, help='SCANOSS gRPC API 2.0 URL (optional - default: https://api.osskb.org)'
)
Expand Down Expand Up @@ -751,7 +774,6 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915
p_c_loc,
p_c_dwnld,
p_p_proxy,
c_crypto,
c_vulns,
c_search,
c_versions,
Expand All @@ -763,6 +785,7 @@ def setup_args() -> None: # noqa: PLR0912, PLR0915
p_folder_scan,
p_folder_hash,
p_cs,
p_crypto_algorithms,
]:
p.add_argument('--debug', '-d', action='store_true', help='Enable debug messages')
p.add_argument('--trace', '-t', action='store_true', help='Enable trace messages, including API posts')
Expand Down Expand Up @@ -1393,9 +1416,9 @@ def get_pac_file(pac: str):
return pac_file


def comp_crypto(parser, args):
def crypto_algorithms(parser, args):
"""
Run the "component crypto" sub-command
Run the "crypto algorithms" sub-command
Parameters
----------
parser: ArgumentParser
Expand All @@ -1410,22 +1433,20 @@ def comp_crypto(parser, args):
if args.ca_cert and not os.path.exists(args.ca_cert):
print_stderr(f'Error: Certificate file does not exist: {args.ca_cert}.')
sys.exit(1)
pac_file = get_pac_file(args.pac)

comps = Components(
debug=args.debug,
trace=args.trace,
quiet=args.quiet,
grpc_url=args.api2url,
api_key=args.key,
ca_cert=args.ca_cert,
proxy=args.proxy,
grpc_proxy=args.grpc_proxy,
pac=pac_file,
timeout=args.timeout,
req_headers=process_req_headers(args.header),
)
if not comps.get_crypto_details(args.input, args.purl, args.output):
try:
config = create_cryptography_config_from_args(args)
grpc_config = create_grpc_config_from_args(args)
client = ScanossGrpc(**asdict(grpc_config))

# TODO: Add PAC file support
# pac_file = get_pac_file(config.pac)

cryptography = Cryptography(config=config, client=client)
cryptography.get_algorithms()
cryptography.present(output_file=args.output)
except Exception as e:
print_stderr(f'ERROR: {e}')
sys.exit(1)


Expand Down
2 changes: 2 additions & 0 deletions src/scanoss/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@

DEFAULT_URL = 'https://api.osskb.org' # default free service URL
DEFAULT_URL2 = 'https://api.scanoss.com' # default premium service URL

DEFAULT_API_TIMEOUT = 600
176 changes: 176 additions & 0 deletions src/scanoss/cryptography.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import json
from dataclasses import dataclass
from typing import Dict, Optional

from scanoss.scanossbase import ScanossBase
from scanoss.scanossgrpc import ScanossGrpc
from scanoss.utils.abstract_presenter import AbstractPresenter
from scanoss.utils.file import validate_json_file


class ScanossCryptographyError(Exception):
pass


@dataclass
class CryptographyConfig:
debug: bool = False
trace: bool = False
quiet: bool = False
get_range: bool = False
purl: str = None
input_file: str = None
output_file: str = None
header: str = None


def create_cryptography_config_from_args(args) -> CryptographyConfig:
return CryptographyConfig(
debug=getattr(args, 'debug', None),
trace=getattr(args, 'trace', None),
quiet=getattr(args, 'quiet', None),
get_range=getattr(args, 'range', None),
purl=getattr(args, 'purl', None),
input_file=getattr(args, 'input', None),
output_file=getattr(args, 'output', None),
header=getattr(args, 'header', None),
)


class Cryptography:
"""
Cryptography Class

This class is used to decorate purls with cryptography information.
"""

def __init__(
self,
config: CryptographyConfig,
client: ScanossGrpc,
):
"""
Initialize the Cryptography.

Args:
config (CryptographyConfig): Configuration parameters for the cryptography.
client (ScanossGrpc): gRPC client for communicating with the scanning service.
"""
self.base = ScanossBase(
debug=config.debug,
trace=config.trace,
quiet=config.quiet,
)
self.presenter = CryptographyPresenter(
self,
debug=config.debug,
trace=config.trace,
quiet=config.quiet,
)

self.client = client
self.config = config
self.purls_request = self._build_purls_request()
self.results = None

def get_algorithms(self) -> Optional[Dict]:
"""
Get the cryptographic algorithms for the provided purl or input file.

Returns:
Optional[Dict]: The folder hash response from the gRPC client, or None if an error occurs.
"""

try:
self.base.print_stderr(
f'Getting cryptographic algorithms for {", ".join([p["purl"] for p in self.purls_request["purls"]])}'
)
if self.config.get_range:
response = self.client.get_crypto_algorithms_in_range_for_purl(self.purls_request)
else:
response = self.client.get_crypto_algorithms_for_purl(self.purls_request)
if response:
self.results = response
except Exception as e:
raise ScanossCryptographyError(f'Problem with purl input file. {e}')

return self.results

def _build_purls_request(
self,
) -> Optional[dict]:
"""
Load the specified purls from a JSON file or a list of PURLs and return a dictionary

Args:
json_file (Optional[str], optional): The JSON file containing the PURLs. Defaults to None.
purls (Optional[List[str]], optional): The list of PURLs. Defaults to None.

Returns:
Optional[dict]: The dictionary containing the PURLs
"""
if self.config.input_file:
input_file_validation = validate_json_file(self.config.input_file)
if not input_file_validation.is_valid:
self.base.print_stderr(
f'ERROR: The supplied input file "{self.config.input_file}" was not found or is empty.'
)
raise Exception(f'Problem with purl input file. {input_file_validation.error}')
# Validate the input file is in PurlRequest format
if (
not isinstance(input_file_validation.data, dict)
or 'purls' not in input_file_validation.data
or not isinstance(input_file_validation.data['purls'], list)
or not all(isinstance(p, dict) and 'purl' in p for p in input_file_validation.data['purls'])
):
raise Exception('The supplied input file is not in the correct PurlRequest format.')
return input_file_validation.data
if self.config.purl:
return {'purls': [{'purl': p} for p in self.config.purl]}
return None

def present(self, output_format: str = None, output_file: str = None):
"""Present the results in the selected format"""
self.presenter.present(output_format=output_format, output_file=output_file)


class CryptographyPresenter(AbstractPresenter):
"""
Cryptography presenter class
Handles the presentation of the cryptography results
"""

def __init__(self, cryptography: Cryptography, **kwargs):
super().__init__(**kwargs)
self.cryptography = cryptography

def _format_json_output(self) -> str:
"""
Format the scan output data into a JSON object

Returns:
str: The formatted JSON string
"""
return json.dumps(self.cryptography.results, indent=2)

def _format_plain_output(self) -> str:
"""
Format the scan output data into a plain text string
"""
return (
json.dumps(self.cryptography.results, indent=2)
if isinstance(self.cryptography.results, dict)
else str(self.cryptography.results)
)

def _format_cyclonedx_output(self) -> str:
raise NotImplementedError('CycloneDX output is not implemented')

def _format_spdxlite_output(self) -> str:
raise NotImplementedError('SPDXlite output is not implemented')

def _format_csv_output(self) -> str:
raise NotImplementedError('CSV output is not implemented')

def _format_raw_output(self) -> str:
raise NotImplementedError('Raw output is not implemented')
Loading