Skip to content

Modify Gitlab Importer to support package-first mode #1910

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
274 changes: 189 additions & 85 deletions vulnerabilities/pipelines/gitlab_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import Iterable
from typing import List
from typing import Tuple
from urllib.parse import urljoin

import pytz
import saneyaml
Expand All @@ -31,6 +32,9 @@
from vulnerabilities.utils import build_description
from vulnerabilities.utils import get_advisory_url
from vulnerabilities.utils import get_cwe_id
from vulntotal.datasources.gitlab import get_casesensitive_slug
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@keshav-space what do you think is it a good idea to import vulntotal functions in vcio ? Or shall we create separate functions here ?

Copy link
Contributor

@TG1999 TG1999 Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michaelehab , also spdx license is missing in Gitlab vulntotal datasource. We need to have that before using it in our VCIO importers. https://github.com/aboutcode-org/vulnerablecode/blob/main/vulntotal/datasources/gitlab.py#L30

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think is it a good idea to import vulntotal functions in vcio ?

We can import it from VulnTotal for now. Later on we can extract these out in common utility.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michaelehab , also spdx license is missing in Gitlab vulntotal datasource. We need to have that before using it in our VCIO importers. https://github.com/aboutcode-org/vulnerablecode/blob/main/vulntotal/datasources/gitlab.py#L30

I believe we discussed this in an earlier meeting where we said that the package-first API endpoint won't be enabled by default and users have to enable it locally which is like using vulntotal when it comes to license, that's even why I reused the vulntotal functions in VCIO's package-first mode.

from vulntotal.datasources.gitlab_api import fetch_gitlab_advisories_for_purl
from vulntotal.datasources.gitlab_api import get_estimated_advisories_count


class GitLabImporterPipeline(VulnerableCodeBaseImporterPipeline):
Expand All @@ -42,9 +46,16 @@ class GitLabImporterPipeline(VulnerableCodeBaseImporterPipeline):
license_url = "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/LICENSE"
importer_name = "GitLab Importer"
repo_url = "git+https://gitlab.com/gitlab-org/advisories-community/"
is_batch_run = True

@classmethod
def steps(cls):
if not cls.is_batch_run:
return (
cls.collect_and_store_advisories,
cls.import_new_advisories,
)

return (
cls.clone,
cls.collect_and_store_advisories,
Expand All @@ -66,15 +77,57 @@ def steps(cls):

gitlab_scheme_by_purl_type = {v: k for k, v in purl_type_by_gitlab_scheme.items()}

def __init__(self, *args, purl=None, **kwargs):
super().__init__(*args, **kwargs)
self.purl = purl
# If a purl is provided, we are running in package-first mode
if self.purl:
GitLabImporterPipeline.is_batch_run = False

def clone(self):
self.log(f"Cloning `{self.repo_url}`")
self.vcs_response = fetch_via_vcs(self.repo_url)

def advisories_count(self):
root = Path(self.vcs_response.dest_dir)
return sum(1 for _ in root.rglob("*.yml"))
if GitLabImporterPipeline.is_batch_run:
root = Path(self.vcs_response.dest_dir)
return sum(1 for _ in root.rglob("*.yml"))
else:
return get_estimated_advisories_count(
self.purl, self.purl_type_by_gitlab_scheme, get_casesensitive_slug
)

def collect_advisories(self) -> Iterable[AdvisoryData]:
if not self.is_batch_run:
advisories = fetch_gitlab_advisories_for_purl(
self.purl, self.purl_type_by_gitlab_scheme, get_casesensitive_slug
)

input_version = self.purl.version
vrc = RANGE_CLASS_BY_SCHEMES[self.purl.type]
version_obj = vrc.version_class(input_version) if input_version else None

for advisory in advisories:
advisory_data = self._advisory_dict_to_advisory_data(advisory)
# If purl has version, we need to check if advisory affects the version
if input_version:
affected = False
for affected_package in advisory_data.affected_packages:
vrange = affected_package.affected_version_range
fixed_version = affected_package.fixed_version
if vrange and version_obj in vrange:
if fixed_version:
fixed_version_obj = vrc.version_class(str(fixed_version))
if version_obj >= fixed_version_obj:
continue
affected = True
break
if affected:
yield advisory_data
else:
yield advisory_data
return

base_path = Path(self.vcs_response.dest_dir)

for file_path in base_path.rglob("*.yml"):
Expand Down Expand Up @@ -109,6 +162,135 @@ def clean_downloads(self):
def on_failure(self):
self.clean_downloads()

def _advisory_dict_to_advisory_data(self, advisory):
return advisory_dict_to_advisory_data(
advisory=advisory,
purl_type_by_gitlab_scheme=self.purl_type_by_gitlab_scheme,
gitlab_scheme_by_purl_type=self.gitlab_scheme_by_purl_type,
logger=self.log,
purl=self.purl,
)


def advisory_dict_to_advisory_data(
advisory: dict,
purl_type_by_gitlab_scheme,
gitlab_scheme_by_purl_type,
logger,
purl=None,
advisory_url=None,
):
"""
Convert a GitLab advisory dict to AdvisoryData.
"""
aliases = advisory.get("identifiers", [])
identifier = advisory.get("identifier", "")
summary = build_description(advisory.get("title"), advisory.get("description"))
urls = advisory.get("urls", [])
references = [Reference.from_url(u) for u in urls]

cwe_ids = advisory.get("cwe_ids") or []
cwe_list = list(map(get_cwe_id, cwe_ids))

date_published = dateparser.parse(advisory.get("pubdate"))
date_published = date_published.replace(tzinfo=pytz.UTC)

package_slug = advisory.get("package_slug")

# Determine purl if not provided
if not purl:
purl = get_purl(
package_slug=package_slug,
purl_type_by_gitlab_scheme=purl_type_by_gitlab_scheme,
logger=logger,
)

if not purl:
logger(
f"advisory_dict_to_advisory_data: purl is not valid: {package_slug!r}",
level=logging.ERROR,
)
return AdvisoryData(
aliases=aliases,
summary=summary,
references=references,
date_published=date_published,
url=advisory_url,
)

affected_version_range = None
fixed_versions = advisory.get("fixed_versions") or []
affected_range = advisory.get("affected_range")
gitlab_native_schemes = set(["pypi", "gem", "npm", "go", "packagist", "conan"])
vrc: VersionRange = RANGE_CLASS_BY_SCHEMES[purl.type]
gitlab_scheme = gitlab_scheme_by_purl_type[purl.type]
try:
if affected_range:
if gitlab_scheme in gitlab_native_schemes:
affected_version_range = from_gitlab_native(
gitlab_scheme=gitlab_scheme, string=affected_range
)
else:
affected_version_range = vrc.from_native(affected_range)
except Exception as e:
logger(
f"advisory_dict_to_advisory_data: affected_range is not parsable: {affected_range!r} for: {purl!s} error: {e!r}\n {traceback.format_exc()}",
level=logging.ERROR,
)

parsed_fixed_versions = []
for fixed_version in fixed_versions:
try:
fixed_version = vrc.version_class(fixed_version)
parsed_fixed_versions.append(fixed_version)
except Exception as e:
logger(
f"advisory_dict_to_advisory_data: fixed_version is not parsable`: {fixed_version!r} error: {e!r}\n {traceback.format_exc()}",
level=logging.ERROR,
)

purl_without_version = get_purl(
package_slug=package_slug,
purl_type_by_gitlab_scheme=purl_type_by_gitlab_scheme,
logger=logger,
)

if parsed_fixed_versions:
affected_packages = list(
extract_affected_packages(
affected_version_range=affected_version_range,
fixed_versions=parsed_fixed_versions,
purl=purl_without_version,
)
)
else:
if not affected_version_range:
affected_packages = []
else:
affected_packages = [
AffectedPackage(
package=purl_without_version,
affected_version_range=affected_version_range,
)
]

# Determine advisory_url if not provided
if not advisory_url and package_slug and identifier:
advisory_url = urljoin(
"https://gitlab.com/gitlab-org/advisories-community/-/blob/main/",
package_slug + "/" + identifier + ".yml",
)

return AdvisoryData(
aliases=aliases,
summary=summary,
references=references,
date_published=date_published,
affected_packages=affected_packages,
weaknesses=cwe_list,
url=advisory_url,
)


def parse_advisory_path(base_path: Path, file_path: Path) -> Tuple[str, str, str]:
"""
Expand Down Expand Up @@ -219,94 +401,16 @@ def parse_gitlab_advisory(
)
return

# refer to schema here https://gitlab.com/gitlab-org/advisories-community/-/blob/main/ci/schema/schema.json
aliases = gitlab_advisory.get("identifiers")
summary = build_description(gitlab_advisory.get("title"), gitlab_advisory.get("description"))
urls = gitlab_advisory.get("urls")
references = [Reference.from_url(u) for u in urls]

cwe_ids = gitlab_advisory.get("cwe_ids") or []
cwe_list = list(map(get_cwe_id, cwe_ids))

date_published = dateparser.parse(gitlab_advisory.get("pubdate"))
date_published = date_published.replace(tzinfo=pytz.UTC)
package_slug = gitlab_advisory.get("package_slug")
advisory_url = get_advisory_url(
file=file,
base_path=base_path,
url="https://gitlab.com/gitlab-org/advisories-community/-/blob/main/",
)
purl: PackageURL = get_purl(
package_slug=package_slug,

return advisory_dict_to_advisory_data(
advisory=gitlab_advisory,
purl_type_by_gitlab_scheme=purl_type_by_gitlab_scheme,
gitlab_scheme_by_purl_type=gitlab_scheme_by_purl_type,
logger=logger,
)
if not purl:
logger(
f"parse_yaml_file: purl is not valid: {file!r} {package_slug!r}", level=logging.ERROR
)
return AdvisoryData(
aliases=aliases,
summary=summary,
references=references,
date_published=date_published,
url=advisory_url,
)
affected_version_range = None
fixed_versions = gitlab_advisory.get("fixed_versions") or []
affected_range = gitlab_advisory.get("affected_range")
gitlab_native_schemes = set(["pypi", "gem", "npm", "go", "packagist", "conan"])
vrc: VersionRange = RANGE_CLASS_BY_SCHEMES[purl.type]
gitlab_scheme = gitlab_scheme_by_purl_type[purl.type]
try:
if affected_range:
if gitlab_scheme in gitlab_native_schemes:
affected_version_range = from_gitlab_native(
gitlab_scheme=gitlab_scheme, string=affected_range
)
else:
affected_version_range = vrc.from_native(affected_range)
except Exception as e:
logger(
f"parse_yaml_file: affected_range is not parsable: {affected_range!r} for: {purl!s} error: {e!r}\n {traceback.format_exc()}",
level=logging.ERROR,
)

parsed_fixed_versions = []
for fixed_version in fixed_versions:
try:
fixed_version = vrc.version_class(fixed_version)
parsed_fixed_versions.append(fixed_version)
except Exception as e:
logger(
f"parse_yaml_file: fixed_version is not parsable`: {fixed_version!r} error: {e!r}\n {traceback.format_exc()}",
level=logging.ERROR,
)

if parsed_fixed_versions:
affected_packages = list(
extract_affected_packages(
affected_version_range=affected_version_range,
fixed_versions=parsed_fixed_versions,
purl=purl,
)
)
else:
if not affected_version_range:
affected_packages = []
else:
affected_packages = [
AffectedPackage(
package=purl,
affected_version_range=affected_version_range,
)
]
return AdvisoryData(
aliases=aliases,
summary=summary,
references=references,
date_published=date_published,
affected_packages=affected_packages,
weaknesses=cwe_list,
url=advisory_url,
advisory_url=advisory_url,
)
Loading