diff --git a/tools/vulnogram/generate-cve-json/tests/test_generate_cve_json.py b/tools/vulnogram/generate-cve-json/tests/test_generate_cve_json.py
index 6a4c287..b01dce4 100644
--- a/tools/vulnogram/generate-cve-json/tests/test_generate_cve_json.py
+++ b/tools/vulnogram/generate-cve-json/tests/test_generate_cve_json.py
@@ -25,7 +25,9 @@
from __future__ import annotations
+import json
from collections.abc import Iterator
+from pathlib import Path
from typing import Any
import pytest
@@ -35,7 +37,9 @@
_is_cna_ready_for_review,
_product_for_package,
build_affected,
+ build_cna_container,
build_credits,
+ build_descriptions,
build_metrics,
build_problem_types,
build_references,
@@ -43,6 +47,7 @@
combine_remediation_developers,
compute_cna_private_state,
compute_package_url,
+ emit_json,
extract_field,
format_version_range,
parse_affected_versions,
@@ -53,6 +58,7 @@
resolve_title,
wrap_cve_record,
)
+from generate_cve_json.cve_json import normalise_severity, to_html
DEFAULT_AFFECTED_ARGS: dict[str, Any] = {
"vendor": "Apache Software Foundation",
@@ -1031,3 +1037,261 @@ def test_full_name_affiliation_pattern_preserved(self):
assert combine_remediation_developers("Jed Cunningham, Astronomer", []) == [
"Jed Cunningham, Astronomer",
]
+
+
+# ---------------------------------------------------------------------------
+# normalise_severity
+# ---------------------------------------------------------------------------
+
+
+class TestNormaliseSeverity:
+ def test_known_values_are_lowercased(self):
+ for raw in ("None", "Low", "Medium", "High", "Critical"):
+ assert normalise_severity(raw) == raw.lower()
+
+ def test_already_lowercase_known_value_passes_through(self):
+ assert normalise_severity("high") == "high"
+
+ def test_unknown_value_is_stripped_but_not_changed(self):
+ assert normalise_severity(" Informational ") == "Informational"
+
+ def test_mixed_case_known_value_normalised(self):
+ assert normalise_severity("HIGH") == "high"
+ assert normalise_severity("CRITICAL") == "critical"
+
+
+# ---------------------------------------------------------------------------
+# to_html
+# ---------------------------------------------------------------------------
+
+
+class TestToHtml:
+ def test_plain_text_is_returned_unchanged(self):
+ assert to_html("Hello world") == "Hello world"
+
+ def test_html_angle_brackets_are_escaped(self):
+ assert to_html("") == "<script>alert(1)</script>"
+
+ def test_ampersand_is_escaped(self):
+ assert to_html("A & B") == "A & B"
+
+ def test_double_newlines_become_br_br(self):
+ assert to_html("Para one.\n\nPara two.") == "Para one.
Para two."
+
+ def test_single_newlines_become_br(self):
+ assert to_html("Line one.\nLine two.") == "Line one.
Line two."
+
+ def test_windows_line_endings_normalised_before_conversion(self):
+ assert to_html("Line one.\r\nLine two.") == "Line one.
Line two."
+
+ def test_mixed_newlines_in_multiline_text(self):
+ result = to_html("Intro.\n\nBullet one.\nBullet two.")
+ assert result == "Intro.
Bullet one.
Bullet two."
+
+
+# ---------------------------------------------------------------------------
+# build_descriptions
+# ---------------------------------------------------------------------------
+
+
+class TestBuildDescriptions:
+ def test_empty_text_returns_empty_list(self):
+ assert build_descriptions("") == []
+
+ def test_non_empty_returns_single_entry(self):
+ result = build_descriptions("A vulnerability.")
+ assert len(result) == 1
+
+ def test_entry_lang_is_en(self):
+ result = build_descriptions("A vulnerability.")
+ assert result[0]["lang"] == "en"
+
+ def test_entry_value_is_plain_text(self):
+ result = build_descriptions("A vulnerability.")
+ assert result[0]["value"] == "A vulnerability."
+
+ def test_supporting_media_is_html(self):
+ result = build_descriptions("A vulnerability.")
+ media = result[0]["supportingMedia"]
+ assert len(media) == 1
+ assert media[0]["type"] == "text/html"
+ assert media[0]["base64"] is False
+
+ def test_special_chars_escaped_in_html_media(self):
+ result = build_descriptions("Use wisely & carefully.")
+ html_value = result[0]["supportingMedia"][0]["value"]
+ assert "<b>" in html_value
+ assert "&" in html_value
+
+ def test_plain_value_not_html_escaped(self):
+ result = build_descriptions("Use wisely.")
+ assert result[0]["value"] == "Use wisely."
+
+
+# ---------------------------------------------------------------------------
+# build_cna_container
+# ---------------------------------------------------------------------------
+
+_CNA_DEFAULTS: dict[str, Any] = {
+ "title": "DAG auth bypass",
+ "description": "An attacker can read arbitrary DAGs.",
+ "affected_versions_value": ">=3.0.0, <3.2.0",
+ "cwe_value": "CWE-352: CSRF",
+ "severity_value": "high",
+ "credits_value": "Alice Smith",
+ "mailing_list_value": "https://lists.example.org/thread/abc",
+ "pr_value": "https://github.com/apache/airflow/pull/123",
+ "vendor": "Apache Software Foundation",
+ "product": "Apache Example",
+ "package_name": "apache-example",
+ "collection_url": "https://pypi.python.org",
+ "org_id": "org-123",
+ "version_start": None,
+ "discovery": "UNKNOWN",
+ "remediation_developers": [],
+}
+
+
+class TestBuildCnaContainer:
+ def test_required_top_level_keys_present(self):
+ cna = build_cna_container(**_CNA_DEFAULTS)
+ for key in (
+ "affected",
+ "credits",
+ "descriptions",
+ "metrics",
+ "problemTypes",
+ "providerMetadata",
+ "references",
+ "source",
+ "title",
+ "x_generator",
+ ):
+ assert key in cna, f"missing key: {key}"
+
+ def test_title_is_set(self):
+ cna = build_cna_container(**_CNA_DEFAULTS)
+ assert cna["title"] == "DAG auth bypass"
+
+ def test_provider_metadata_carries_org_id(self):
+ cna = build_cna_container(**_CNA_DEFAULTS)
+ assert cna["providerMetadata"]["orgId"] == "org-123"
+
+ def test_source_discovery_is_set(self):
+ cna = build_cna_container(**_CNA_DEFAULTS)
+ assert cna["source"]["discovery"] == "UNKNOWN"
+
+ def test_affected_entry_uses_correct_package(self):
+ cna = build_cna_container(**_CNA_DEFAULTS)
+ assert any(a["packageName"] == "apache-example" for a in cna["affected"])
+
+ def test_description_value_appears_in_descriptions(self):
+ cna = build_cna_container(**_CNA_DEFAULTS)
+ assert any("An attacker" in d["value"] for d in cna["descriptions"])
+
+ def test_cwe_appears_in_problem_types(self):
+ cna = build_cna_container(**_CNA_DEFAULTS)
+ cwe_ids = [desc.get("cweId") for pt in cna["problemTypes"] for desc in pt.get("descriptions", [])]
+ assert "CWE-352" in cwe_ids
+
+ def test_severity_appears_in_metrics(self):
+ cna = build_cna_container(**_CNA_DEFAULTS)
+ texts = [m["other"]["content"]["text"] for m in cna["metrics"] if "other" in m]
+ assert "high" in texts
+
+ def test_reporter_credit_in_credits(self):
+ cna = build_cna_container(**_CNA_DEFAULTS)
+ credit_values = [c["value"] for c in cna["credits"]]
+ assert "Alice Smith" in credit_values
+
+ def test_remediation_developer_added_to_credits(self):
+ kwargs = {**_CNA_DEFAULTS, "remediation_developers": ["Bob Builder"]}
+ cna = build_cna_container(**kwargs)
+ dev_credits = [(c["value"], c["type"]) for c in cna["credits"]]
+ assert ("Bob Builder", "remediation developer") in dev_credits
+
+ def test_pr_url_appears_in_references(self):
+ cna = build_cna_container(**_CNA_DEFAULTS)
+ urls = [r["url"] for r in cna["references"]]
+ assert "https://github.com/apache/airflow/pull/123" in urls
+
+ def test_advisory_urls_forwarded_to_references(self):
+ kwargs = {
+ **_CNA_DEFAULTS,
+ "advisory_urls": ["https://lists.apache.org/thread/real-advisory"],
+ }
+ cna = build_cna_container(**kwargs)
+ urls = [r["url"] for r in cna["references"]]
+ assert "https://lists.apache.org/thread/real-advisory" in urls
+
+ def test_product_overrides_applied_to_affected(self):
+ kwargs = {
+ **_CNA_DEFAULTS,
+ "affected_versions_value": "apache-example-project-foo <=6.5.0",
+ "product_overrides": {"apache-example-project-foo": "Custom Foo Display"},
+ }
+ cna = build_cna_container(**kwargs)
+ assert any(a["product"] == "Custom Foo Display" for a in cna["affected"])
+
+ def test_empty_severity_produces_empty_metrics(self):
+ kwargs = {**_CNA_DEFAULTS, "severity_value": ""}
+ cna = build_cna_container(**kwargs)
+ assert cna["metrics"] == []
+
+ def test_empty_cwe_produces_empty_problem_types(self):
+ kwargs = {**_CNA_DEFAULTS, "cwe_value": ""}
+ cna = build_cna_container(**kwargs)
+ assert cna["problemTypes"] == []
+
+ def test_mailing_list_url_excluded_from_references(self):
+ # The mailing_list_value field is intentionally ignored in references.
+ cna = build_cna_container(**_CNA_DEFAULTS)
+ urls = [r["url"] for r in cna["references"]]
+ assert "https://lists.example.org/thread/abc" not in urls
+
+
+# ---------------------------------------------------------------------------
+# emit_json
+# ---------------------------------------------------------------------------
+
+
+class TestEmitJson:
+ def test_returns_valid_json_string(self):
+ obj = {"b": 2, "a": 1}
+ text = emit_json(obj, None)
+ assert json.loads(text) == obj
+
+ def test_keys_are_sorted(self):
+ obj = {"z": 1, "a": 2, "m": 3}
+ text = emit_json(obj, None)
+ keys = list(json.loads(text).keys())
+ assert keys == sorted(keys)
+
+ def test_indented_with_four_spaces(self):
+ text = emit_json({"a": 1}, None)
+ assert ' "a"' in text
+
+ def test_writes_to_file_when_output_path_given(self, tmp_path: Path):
+ out = tmp_path / "out.json"
+ emit_json({"x": 1}, out)
+ assert out.exists()
+ assert json.loads(out.read_text()) == {"x": 1}
+
+ def test_creates_missing_parent_directories(self, tmp_path: Path):
+ out = tmp_path / "nested" / "dirs" / "out.json"
+ emit_json({"x": 1}, out)
+ assert out.exists()
+
+ def test_file_ends_with_newline(self, tmp_path: Path):
+ out = tmp_path / "out.json"
+ emit_json({"x": 1}, out)
+ assert out.read_text().endswith("\n")
+
+ def test_return_value_matches_file_content_without_trailing_newline(self, tmp_path: Path):
+ out = tmp_path / "out.json"
+ returned = emit_json({"x": 1}, out)
+ assert out.read_text() == returned + "\n"
+
+ def test_non_ascii_preserved(self):
+ text = emit_json({"name": "José"}, None)
+ assert "José" in text