Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 264 additions & 0 deletions tools/vulnogram/generate-cve-json/tests/test_generate_cve_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@

from __future__ import annotations

import json
from collections.abc import Iterator
from pathlib import Path
from typing import Any

import pytest
Expand All @@ -35,14 +37,17 @@
_is_cna_ready_for_review,
_product_for_package,
build_affected,
build_cna_container,
build_credits,
build_descriptions,
build_metrics,
build_problem_types,
build_references,
classify_reference,
combine_remediation_developers,
compute_cna_private_state,
compute_package_url,
emit_json,
extract_field,
format_version_range,
parse_affected_versions,
Expand All @@ -53,6 +58,7 @@
resolve_title,
wrap_cve_record,
)
from generate_cve_json.cve_json import normalise_severity, to_html

DEFAULT_AFFECTED_ARGS: dict[str, Any] = {
"vendor": "Apache Software Foundation",
Expand Down Expand Up @@ -1031,3 +1037,261 @@ def test_full_name_affiliation_pattern_preserved(self):
assert combine_remediation_developers("Jed Cunningham, Astronomer", []) == [
"Jed Cunningham, Astronomer",
]


# ---------------------------------------------------------------------------
# normalise_severity
# ---------------------------------------------------------------------------


class TestNormaliseSeverity:
def test_known_values_are_lowercased(self):
for raw in ("None", "Low", "Medium", "High", "Critical"):
assert normalise_severity(raw) == raw.lower()

def test_already_lowercase_known_value_passes_through(self):
assert normalise_severity("high") == "high"

def test_unknown_value_is_stripped_but_not_changed(self):
assert normalise_severity(" Informational ") == "Informational"

def test_mixed_case_known_value_normalised(self):
assert normalise_severity("HIGH") == "high"
assert normalise_severity("CRITICAL") == "critical"


# ---------------------------------------------------------------------------
# to_html
# ---------------------------------------------------------------------------


class TestToHtml:
def test_plain_text_is_returned_unchanged(self):
assert to_html("Hello world") == "Hello world"

def test_html_angle_brackets_are_escaped(self):
assert to_html("<script>alert(1)</script>") == "&lt;script&gt;alert(1)&lt;/script&gt;"

def test_ampersand_is_escaped(self):
assert to_html("A & B") == "A &amp; B"

def test_double_newlines_become_br_br(self):
assert to_html("Para one.\n\nPara two.") == "Para one.<br><br>Para two."

def test_single_newlines_become_br(self):
assert to_html("Line one.\nLine two.") == "Line one.<br>Line two."

def test_windows_line_endings_normalised_before_conversion(self):
assert to_html("Line one.\r\nLine two.") == "Line one.<br>Line two."

def test_mixed_newlines_in_multiline_text(self):
result = to_html("Intro.\n\nBullet one.\nBullet two.")
assert result == "Intro.<br><br>Bullet one.<br>Bullet two."


# ---------------------------------------------------------------------------
# build_descriptions
# ---------------------------------------------------------------------------


class TestBuildDescriptions:
def test_empty_text_returns_empty_list(self):
assert build_descriptions("") == []

def test_non_empty_returns_single_entry(self):
result = build_descriptions("A vulnerability.")
assert len(result) == 1

def test_entry_lang_is_en(self):
result = build_descriptions("A vulnerability.")
assert result[0]["lang"] == "en"

def test_entry_value_is_plain_text(self):
result = build_descriptions("A vulnerability.")
assert result[0]["value"] == "A vulnerability."

def test_supporting_media_is_html(self):
result = build_descriptions("A vulnerability.")
media = result[0]["supportingMedia"]
assert len(media) == 1
assert media[0]["type"] == "text/html"
assert media[0]["base64"] is False

def test_special_chars_escaped_in_html_media(self):
result = build_descriptions("Use <b> wisely & carefully.")
html_value = result[0]["supportingMedia"][0]["value"]
assert "&lt;b&gt;" in html_value
assert "&amp;" in html_value

def test_plain_value_not_html_escaped(self):
result = build_descriptions("Use <b> wisely.")
assert result[0]["value"] == "Use <b> wisely."


# ---------------------------------------------------------------------------
# build_cna_container
# ---------------------------------------------------------------------------

_CNA_DEFAULTS: dict[str, Any] = {
"title": "DAG auth bypass",
"description": "An attacker can read arbitrary DAGs.",
"affected_versions_value": ">=3.0.0, <3.2.0",
"cwe_value": "CWE-352: CSRF",
"severity_value": "high",
"credits_value": "Alice Smith",
"mailing_list_value": "https://lists.example.org/thread/abc",
"pr_value": "https://github.com/apache/airflow/pull/123",
"vendor": "Apache Software Foundation",
"product": "Apache Example",
"package_name": "apache-example",
"collection_url": "https://pypi.python.org",
"org_id": "org-123",
"version_start": None,
"discovery": "UNKNOWN",
"remediation_developers": [],
}


class TestBuildCnaContainer:
def test_required_top_level_keys_present(self):
cna = build_cna_container(**_CNA_DEFAULTS)
for key in (
"affected",
"credits",
"descriptions",
"metrics",
"problemTypes",
"providerMetadata",
"references",
"source",
"title",
"x_generator",
):
assert key in cna, f"missing key: {key}"

def test_title_is_set(self):
cna = build_cna_container(**_CNA_DEFAULTS)
assert cna["title"] == "DAG auth bypass"

def test_provider_metadata_carries_org_id(self):
cna = build_cna_container(**_CNA_DEFAULTS)
assert cna["providerMetadata"]["orgId"] == "org-123"

def test_source_discovery_is_set(self):
cna = build_cna_container(**_CNA_DEFAULTS)
assert cna["source"]["discovery"] == "UNKNOWN"

def test_affected_entry_uses_correct_package(self):
cna = build_cna_container(**_CNA_DEFAULTS)
assert any(a["packageName"] == "apache-example" for a in cna["affected"])

def test_description_value_appears_in_descriptions(self):
cna = build_cna_container(**_CNA_DEFAULTS)
assert any("An attacker" in d["value"] for d in cna["descriptions"])

def test_cwe_appears_in_problem_types(self):
cna = build_cna_container(**_CNA_DEFAULTS)
cwe_ids = [desc.get("cweId") for pt in cna["problemTypes"] for desc in pt.get("descriptions", [])]
assert "CWE-352" in cwe_ids

def test_severity_appears_in_metrics(self):
cna = build_cna_container(**_CNA_DEFAULTS)
texts = [m["other"]["content"]["text"] for m in cna["metrics"] if "other" in m]
assert "high" in texts

def test_reporter_credit_in_credits(self):
cna = build_cna_container(**_CNA_DEFAULTS)
credit_values = [c["value"] for c in cna["credits"]]
assert "Alice Smith" in credit_values

def test_remediation_developer_added_to_credits(self):
kwargs = {**_CNA_DEFAULTS, "remediation_developers": ["Bob Builder"]}
cna = build_cna_container(**kwargs)
dev_credits = [(c["value"], c["type"]) for c in cna["credits"]]
assert ("Bob Builder", "remediation developer") in dev_credits

def test_pr_url_appears_in_references(self):
cna = build_cna_container(**_CNA_DEFAULTS)
urls = [r["url"] for r in cna["references"]]
assert "https://github.com/apache/airflow/pull/123" in urls

def test_advisory_urls_forwarded_to_references(self):
kwargs = {
**_CNA_DEFAULTS,
"advisory_urls": ["https://lists.apache.org/thread/real-advisory"],
}
cna = build_cna_container(**kwargs)
urls = [r["url"] for r in cna["references"]]
assert "https://lists.apache.org/thread/real-advisory" in urls

def test_product_overrides_applied_to_affected(self):
kwargs = {
**_CNA_DEFAULTS,
"affected_versions_value": "apache-example-project-foo <=6.5.0",
"product_overrides": {"apache-example-project-foo": "Custom Foo Display"},
}
cna = build_cna_container(**kwargs)
assert any(a["product"] == "Custom Foo Display" for a in cna["affected"])

def test_empty_severity_produces_empty_metrics(self):
kwargs = {**_CNA_DEFAULTS, "severity_value": ""}
cna = build_cna_container(**kwargs)
assert cna["metrics"] == []

def test_empty_cwe_produces_empty_problem_types(self):
kwargs = {**_CNA_DEFAULTS, "cwe_value": ""}
cna = build_cna_container(**kwargs)
assert cna["problemTypes"] == []

def test_mailing_list_url_excluded_from_references(self):
# The mailing_list_value field is intentionally ignored in references.
cna = build_cna_container(**_CNA_DEFAULTS)
urls = [r["url"] for r in cna["references"]]
assert "https://lists.example.org/thread/abc" not in urls


# ---------------------------------------------------------------------------
# emit_json
# ---------------------------------------------------------------------------


class TestEmitJson:
def test_returns_valid_json_string(self):
obj = {"b": 2, "a": 1}
text = emit_json(obj, None)
assert json.loads(text) == obj

def test_keys_are_sorted(self):
obj = {"z": 1, "a": 2, "m": 3}
text = emit_json(obj, None)
keys = list(json.loads(text).keys())
assert keys == sorted(keys)

def test_indented_with_four_spaces(self):
text = emit_json({"a": 1}, None)
assert ' "a"' in text

def test_writes_to_file_when_output_path_given(self, tmp_path: Path):
out = tmp_path / "out.json"
emit_json({"x": 1}, out)
assert out.exists()
assert json.loads(out.read_text()) == {"x": 1}

def test_creates_missing_parent_directories(self, tmp_path: Path):
out = tmp_path / "nested" / "dirs" / "out.json"
emit_json({"x": 1}, out)
assert out.exists()

def test_file_ends_with_newline(self, tmp_path: Path):
out = tmp_path / "out.json"
emit_json({"x": 1}, out)
assert out.read_text().endswith("\n")

def test_return_value_matches_file_content_without_trailing_newline(self, tmp_path: Path):
out = tmp_path / "out.json"
returned = emit_json({"x": 1}, out)
assert out.read_text() == returned + "\n"

def test_non_ascii_preserved(self):
text = emit_json({"name": "José"}, None)
assert "José" in text
Loading