Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ add-config-files() {
_START_DATE=$(date '+%Y-%m-%d')
display-and-exec "updating start date for opencve dag" sed -i.bak "s/start_date = .*/start_date = $_START_DATE/g" "$_AIRFLOW_CONFIG_FILE" && rm -f "$_AIRFLOW_CONFIG_FILE.bak"
display-and-exec "updating start date for summarize_reports dag" sed -i.bak "s/start_date_summarize_reports = .*/start_date_summarize_reports = $_START_DATE/g" "$_AIRFLOW_CONFIG_FILE" && rm -f "$_AIRFLOW_CONFIG_FILE.bak"
display-and-exec "updating start date for sync_weaknesses dag" sed -i.bak "s/start_date_sync_weaknesses = .*/start_date_sync_weaknesses = $_START_DATE/g" "$_AIRFLOW_CONFIG_FILE" && rm -f "$_AIRFLOW_CONFIG_FILE.bak"
local _CONFIGURED_START_DATE
_CONFIGURED_START_DATE=$(grep 'start_date' "$_AIRFLOW_CONFIG_FILE")
log "Default configuration: $_CONFIGURED_START_DATE"
Expand Down
4 changes: 4 additions & 0 deletions scheduler/airflow.cfg.example
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,10 @@ vulnrichment_repo_path = /home/airflow/repositories/vulnrichment
# It has to be set once and only once during OpenCVE initialization.
start_date = 2024-01-01

# The starting date of the sync_weaknesses workflow,
# used to sync the weakness data from MITRE.
start_date_sync_weaknesses = 2026-06-08

# The starting date of the start_date_summarize_reports workflow,
# used to generate summaries for each report using a LLM.
# It has to be set once and only once during OpenCVE initialization.
Expand Down
8 changes: 8 additions & 0 deletions scheduler/dags/includes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
VULNRICHMENT_LOCAL_REPO = pathlib.Path(conf.get("opencve", "vulnrichment_repo_path"))
KB_LOCAL_REPO = pathlib.Path(conf.get("opencve", "kb_repo_path"))

WEAKNESSES_XML_ZIP_URL = "https://cwe.mitre.org/data/xml/cwec_latest.xml.zip"

REPORTS_RETENTION_MONTHS = int(conf.get("opencve", "reports_retention", fallback="12"))

PRODUCT_SEPARATOR = "$PRODUCT$"
Expand All @@ -26,6 +28,12 @@

VARIABLE_UPSERT_PROCEDURE = "CALL variable_upsert(%(p_name)s, %(p_value)s);"

WEAKNESS_UPSERT_PROCEDURE = """
CALL weakness_upsert(
%(cwe)s, %(created)s, %(updated)s, %(name)s, %(description)s
);
"""

SQL_CHANGE_WITH_VENDORS = """
SELECT
changes.id AS change_id,
Expand Down
32 changes: 32 additions & 0 deletions scheduler/dags/includes/tasks/weaknesses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import logging

from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook

from includes.constants import WEAKNESS_UPSERT_PROCEDURE
from includes.utils import fetch_weaknesses

logger = logging.getLogger(__name__)


@task(task_id="sync_weaknesses")
def sync_weaknesses(**context):
weaknesses = fetch_weaknesses()
hook = PostgresHook(postgres_conn_id="opencve_postgres")
synced_at = context["logical_date"]

logger.info("Found %s weaknesses", len(weaknesses))

for weakness in weaknesses:
hook.run(
sql=WEAKNESS_UPSERT_PROCEDURE,
parameters={
"cwe": f"CWE-{weakness['id']}",
"created": synced_at,
"updated": synced_at,
"name": weakness["name"],
"description": weakness["description"],
},
)

logger.info("Synced %s weaknesses", len(weaknesses))
73 changes: 72 additions & 1 deletion scheduler/dags/includes/utils.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
import io
import json
import pathlib
import time
import zipfile
import xml.etree.ElementTree as ET
from logging import Logger
from typing import Dict, List, Tuple

import more_itertools

import openai
import requests
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowConfigException
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate
from git.objects.commit import Commit
from git.repo import Repo
from includes.constants import KB_LOCAL_REPO
from includes.constants import KB_LOCAL_REPO, WEAKNESSES_XML_ZIP_URL
from jinja2 import Environment, FileSystemLoader, select_autoescape
from pendulum.datetime import DateTime

Expand Down Expand Up @@ -509,3 +513,70 @@ def build_user_content_for_llm(
text_output.append("") # Empty line between CVEs

return "\n".join(text_output)


WEAKNESS_NAMESPACE = {"cwe": "http://cwe.mitre.org/cwe-7"}


def download_weaknesses_zip(url: str) -> bytes:
"""
This function downloads the weaknesses XML archive from the given URL.
"""
response = requests.get(url, timeout=60)
response.raise_for_status()
return response.content


def extract_weaknesses_xml(zip_content: bytes) -> bytes:
"""
This function extracts the XML file from the weaknesses ZIP archive.
"""
with zipfile.ZipFile(io.BytesIO(zip_content)) as archive:
xml_files = [name for name in archive.namelist() if name.endswith(".xml")]

if not xml_files:
raise RuntimeError("No XML file found in weaknesses ZIP archive")

with archive.open(xml_files[0]) as xml_file:
return xml_file.read()


def get_xml_text(element: ET.Element | None) -> str:
"""
This function extracts and joins the text content of an XML element.
"""
if element is None:
return ""

return " ".join(text.strip() for text in element.itertext() if text.strip())


def parse_weaknesses(xml_content: bytes) -> list[dict]:
"""
This function parses weaknesses from the XML content.
"""
root = ET.fromstring(xml_content)

weaknesses = []

for weakness in root.findall(".//cwe:Weakness", WEAKNESS_NAMESPACE):
description_element = weakness.find("cwe:Description", WEAKNESS_NAMESPACE)

weaknesses.append(
{
"id": weakness.attrib.get("ID", ""),
"name": weakness.attrib.get("Name", ""),
"description": get_xml_text(description_element),
}
)

return weaknesses


def fetch_weaknesses() -> list[dict]:
"""
This function downloads and parses the latest weakness data from MITRE.
"""
zip_content = download_weaknesses_zip(WEAKNESSES_XML_ZIP_URL)
xml_content = extract_weaknesses_xml(zip_content)
return parse_weaknesses(xml_content)
31 changes: 31 additions & 0 deletions scheduler/dags/sync_weaknesses_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from datetime import timedelta

import pendulum
from airflow.configuration import conf
from airflow import DAG

from includes.tasks.weaknesses import sync_weaknesses

doc_md_DAG = """
The goal of this DAG is to synchronize weakness data from MITRE.
"""


start_date = pendulum.from_format(
conf.get("opencve", "start_date_sync_weaknesses", fallback="2026-06-08"),
"YYYY-MM-DD",
)

with DAG(
"sync_weaknesses",
schedule="0 3 * * 1", # 3am UTC every Monday
start_date=start_date,
catchup=False,
max_active_runs=1,
doc_md=doc_md_DAG,
default_args={
"retries": 3,
"retry_delay": timedelta(seconds=10),
},
) as dag:
sync_weaknesses()
1 change: 1 addition & 0 deletions scheduler/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ aiosmtplib==3.0.1
openai==1.98.0
# see https://support.astronomer.io/hc/en-us/articles/44842200438931-No-module-named-flask-limiter-wrappers
flask_limiter==3.12
requests==2.32.3
1 change: 1 addition & 0 deletions scheduler/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
os.environ["AIRFLOW__OPENCVE__REDHAT_REPO_PATH"] = AIRFLOW_HOME
os.environ["AIRFLOW__OPENCVE__START_DATE"] = "2024-01-01"
os.environ["AIRFLOW__OPENCVE__START_DATE_SUMMARIZE_REPORTS"] = "2024-01-01"
os.environ["AIRFLOW__OPENCVE__START_DATE_SYNC_WEAKNESSES"] = "2026-06-08"
os.environ["AIRFLOW__OPENCVE__NOTIFICATION_REQUEST_TIMEOUT"] = "5"

os.environ["AIRFLOW__OPENCVE__DEVELOPMENT_MODE"] = "False"
Expand Down
13 changes: 13 additions & 0 deletions scheduler/tests/data/weaknesses/cwec_sample.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Weakness_Catalog xmlns="http://cwe.mitre.org/cwe-7">
<Weaknesses>
<Weakness ID="79" Name="Cross-site Scripting">
<Description>Test XSS description</Description>
</Weakness>
<Weakness ID="89" Name="SQL Injection">
<Description>Test SQLi description</Description>
</Weakness>
</Weaknesses>
<Categories>
<Category ID="1" Name="Category only"/>
</Categories>
</Weakness_Catalog>
75 changes: 75 additions & 0 deletions scheduler/tests/tasks/test_weaknesses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from unittest.mock import patch

import pendulum
import pytest

from includes.tasks.weaknesses import sync_weaknesses

MOCK_WEAKNESSES = [
{
"id": "79",
"name": "Cross-site Scripting",
"description": "Test XSS description",
},
{
"id": "89",
"name": "SQL Injection",
"description": "Test SQLi description",
},
]

SYNC_DATE = pendulum.datetime(2026, 6, 8, 3, 0, tz="UTC")


@pytest.mark.web_db
def test_sync_weaknesses_inserts_records(web_pg_hook):
"""Test sync_weaknesses inserts weakness records via weakness_upsert."""
with patch(
"includes.tasks.weaknesses.fetch_weaknesses", return_value=MOCK_WEAKNESSES
):
sync_weaknesses.function(logical_date=SYNC_DATE)

records = web_pg_hook.get_records(
"SELECT cwe_id, name, description FROM opencve_weaknesses ORDER BY cwe_id;"
)
assert records == [
("CWE-79", "Cross-site Scripting", "Test XSS description"),
("CWE-89", "SQL Injection", "Test SQLi description"),
]


@pytest.mark.web_db
def test_sync_weaknesses_upsert_preserves_created_at(web_pg_hook):
"""Test sync_weaknesses updates existing weaknesses without changing created_at."""
created_at = pendulum.datetime(2020, 1, 1, 0, 0, tz="UTC")
web_pg_hook.run(
"""
INSERT INTO opencve_weaknesses (id, created_at, updated_at, cwe_id, name, description)
VALUES (uuid_generate_v4(), %s, %s, %s, %s, %s);
""",
parameters=(
created_at,
created_at,
"CWE-79",
"Old name",
"Old description",
),
)

with patch(
"includes.tasks.weaknesses.fetch_weaknesses",
return_value=[MOCK_WEAKNESSES[0]],
):
sync_weaknesses.function(logical_date=SYNC_DATE)

record = web_pg_hook.get_records(
"""
SELECT created_at, updated_at, name, description
FROM opencve_weaknesses
WHERE cwe_id = 'CWE-79';
"""
)[0]
assert record[0] == created_at
assert record[1] == SYNC_DATE
assert record[2] == "Cross-site Scripting"
assert record[3] == "Test XSS description"
9 changes: 9 additions & 0 deletions scheduler/tests/test_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ def test_dag_check_smtp_loaded(dagbag):
assert tasks == ["run"]


def test_dag_sync_weaknesses_loaded(dagbag):
dag = dagbag.get_dag(dag_id="sync_weaknesses")
assert dagbag.import_errors == {}
assert dag is not None
assert dag.timetable.summary == "0 3 * * 1"
assert dag.catchup is False
assert {t.task_id for t in dag.tasks} == {"sync_weaknesses"}


def test_shortcircuit_removed(dagbag):
"""Sanity check: removed guard tasks."""
dag = dagbag.get_dag(dag_id="opencve")
Expand Down
Loading
Loading