44to different formats (CSV and markdown).
55"""
66
7+ import enum
78import logging
89import urllib .parse
910from dataclasses import dataclass
@@ -22,6 +23,10 @@ class CvssSourceProvider:
2223 AMAZON_INSPECTOR = "AMAZON_INSPECTOR"
2324 DEFAULT_PROVIDER = NVD
2425
26+
27+ empty_rating = {"score" : 0.0 , "source" : {"name" : "in triage" }, "severity" : "other" , "method" : "other" }
28+
29+
2530def get_rating_providers ():
2631 """
2732 get_rating_providers returns a list of vulnerability
@@ -42,8 +47,8 @@ def get_rating_providers():
4247 ]
4348 return providers
4449
50+
4551class CvssSeverity :
46- UNTRIAGED = "untriaged"
4752 UNKNOWN = "unknown"
4853
4954
@@ -74,7 +79,7 @@ class Vulnerability:
7479
7580@dataclass
7681class CvssRating :
77- severity : str = CvssSeverity .UNTRIAGED
82+ severity : str = CvssSeverity .UNKNOWN
7883 provider : str = CvssSourceProvider .DEFAULT_PROVIDER
7984 cvss_score : str = NULL_STR
8085
@@ -109,7 +114,12 @@ def parse_inspector_scan_result(inspector_scan_json) -> List[Vulnerability]:
109114 pkg_vulns = get_pkg_vulns (vulns )
110115
111116 for v in pkg_vulns :
112- vuln_obj = convert_package_vuln_to_vuln_obj (v , components )
117+ vuln_obj = None
118+ try :
119+ vuln_obj = convert_package_vuln_to_vuln_obj (v , components )
120+ except Exception as e :
121+ logging .error (f"error encountered while parsing a vulnerability: { e } " )
122+ continue
113123 vuln_list .append (vuln_obj )
114124
115125 return vuln_list
@@ -143,7 +153,7 @@ def convert_package_vuln_to_vuln_obj(v, components) -> Vulnerability:
143153 vuln_obj .published = v .get ("created" , NULL_STR )
144154 vuln_obj .modified = v .get ("updated" , NULL_STR )
145155
146- ratings = v .get ("ratings" )
156+ ratings = v .get ("ratings" , [ empty_rating ] )
147157 add_ratings (ratings , vuln_obj )
148158
149159 description = v .get ("description" )
@@ -168,10 +178,7 @@ def convert_package_vuln_to_vuln_obj(v, components) -> Vulnerability:
168178
169179
170180def add_ratings (ratings , vulnerability ):
171- if ratings is None :
172- return
173-
174- rating = get_cvss_rating (ratings , vulnerability )
181+ rating = get_highest_severity_rating (ratings )
175182 vulnerability .severity = rating .severity
176183 vulnerability .severity_provider = rating .provider
177184 vulnerability .cvss_score = rating .cvss_score
@@ -276,22 +283,6 @@ def get_cwes(v) -> str:
276283 return cwe_str
277284
278285
279- def get_cvss_rating (ratings , vulnerability ) -> CvssRating :
280- rating_provider_priority = get_rating_providers ()
281- for provider in rating_provider_priority :
282- for rating in ratings :
283- if rating ["source" ]["name" ] != provider :
284- continue
285-
286- severity = CvssSeverity .UNTRIAGED if rating ["severity" ] == CvssSeverity .UNKNOWN else rating ["severity" ]
287- cvss_score = str (rating ["score" ]) if rating ["method" ] == "CVSSv31" else "null"
288- if severity and cvss_score :
289- return CvssRating (severity = severity , provider = provider , cvss_score = cvss_score )
290-
291- logging .info (f"No CVSS rating is provided for { vulnerability .vuln_id } " )
292- return CvssRating ()
293-
294-
295286def get_epss_score (ratings ):
296287 for rating in ratings :
297288 source = rating .get ("source" )
@@ -320,3 +311,126 @@ def combine_str_list_into_one_str(str_list: list[str]) -> str:
320311 if str_element == "" :
321312 str_element = NULL_STR
322313 return str_element
314+
315+
316+ def get_highest_severity_rating (ratings ) -> CvssRating :
317+ method = get_preferred_vuln_rating_method (ratings )
318+ most_severe_rating = get_highest_rating_by_method (method , ratings )
319+ cvss = CvssRating ()
320+ cvss .provider = most_severe_rating ["source" ]["name" ]
321+ cvss .severity = most_severe_rating ["severity" ]
322+ if "unknown" in cvss .severity :
323+ cvss .severity = "other"
324+ cvss .cvss_score = str (most_severe_rating .get ("score" , 0.0 ))
325+ return cvss
326+
327+
328+ class VulnRatingMethod (enum .Enum ):
329+ CVSSv2 = "CVSSv2"
330+ CVSSv3 = "CVSSv3"
331+ CVSSv31 = "CVSSv31"
332+ CVSSv4 = "CVSSv4"
333+ OWASP = "OWASP"
334+ SSVC = "SSVC"
335+ OTHER = "other"
336+
337+
338+ def get_preferred_vuln_rating_method (ratings ):
339+ if ratings is None :
340+ return VulnRatingMethod .OTHER
341+
342+ found_methods = []
343+
344+ for rating in ratings :
345+ if not rating :
346+ continue
347+
348+ method = rating .get ("method" , "" )
349+ if not method :
350+ continue
351+
352+ # we keep a list of each rating method in the
353+ # vulnerability so we can present the highest rating
354+ # to the end user
355+ if method == VulnRatingMethod .CVSSv4 .value :
356+ found_methods .append (VulnRatingMethod .CVSSv4 )
357+ elif method == VulnRatingMethod .CVSSv31 .value :
358+ found_methods .append (VulnRatingMethod .CVSSv31 )
359+ elif method == VulnRatingMethod .CVSSv3 .value :
360+ found_methods .append (VulnRatingMethod .CVSSv3 )
361+ elif method == VulnRatingMethod .CVSSv2 .value :
362+ found_methods .append (VulnRatingMethod .CVSSv2 )
363+ elif method == VulnRatingMethod .OWASP .value :
364+ found_methods .append (VulnRatingMethod .OWASP )
365+ elif method == VulnRatingMethod .SSVC .value :
366+ found_methods .append (VulnRatingMethod .SSVC )
367+ elif method == VulnRatingMethod .OTHER .value :
368+ found_methods .append (VulnRatingMethod .OTHER )
369+ else :
370+ logging .error (f"Expected a spec-conforming CycloneDX vulnerability rating method, but received '{ method } '" )
371+ continue
372+
373+ # select method to display to user in priority order
374+ rating_method_priority = [VulnRatingMethod .CVSSv4 , VulnRatingMethod .CVSSv31 , VulnRatingMethod .CVSSv3 ,
375+ VulnRatingMethod .CVSSv2 , VulnRatingMethod .OWASP , VulnRatingMethod .SSVC ]
376+ for rating_method in rating_method_priority :
377+ if rating_method in found_methods :
378+ return rating_method
379+ return VulnRatingMethod .OTHER
380+
381+
382+ def get_highest_rating_by_method (method : VulnRatingMethod , ratings ):
383+ if not ratings :
384+ return empty_rating
385+
386+ ratings_with_same_method = []
387+ for rating in ratings :
388+ if not rating :
389+ continue
390+
391+ a_method = rating .get ("method" , "" )
392+ if not method :
393+ continue
394+
395+ if is_epss (a_method , rating ):
396+ continue
397+
398+ if a_method == method .value :
399+ ratings_with_same_method .append (rating )
400+
401+ highest_rating = empty_rating
402+ for rating in ratings_with_same_method :
403+
404+ score = rating .get ("score" , 0.0 )
405+ try :
406+ score = float (score )
407+ except Exception as e :
408+ logging .error (f"threw exception while trying to convert severity score, '{ score } ' to type float: { e } " )
409+ continue
410+
411+ if score >= highest_rating ["score" ]:
412+ highest_rating = rating
413+
414+
415+ return highest_rating
416+
417+
418+ def is_epss (method , rating ):
419+ if not rating :
420+ return False
421+
422+ if method != VulnRatingMethod .OTHER .value :
423+ return False
424+
425+ source = rating .get ("source" , "" )
426+ if not source :
427+ return False
428+
429+ name = source .get ("name" , "" )
430+ if not name :
431+ return False
432+
433+ if name .lower () == "EPSS" .lower ():
434+ return True
435+
436+ return False
0 commit comments