diff --git a/Cargo.lock b/Cargo.lock index 03faff603..8e7b291ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6722,6 +6722,7 @@ version = "0.1.0-alpha.8" dependencies = [ "anyhow", "bytes", + "csaf", "cyclonedx-bom", "humantime", "log", diff --git a/integration-tests/Cargo.toml b/integration-tests/Cargo.toml index 15c79ba1e..6fbb2293c 100644 --- a/integration-tests/Cargo.toml +++ b/integration-tests/Cargo.toml @@ -12,6 +12,7 @@ trustify-module-storage = { workspace = true } anyhow = { workspace = true } bytes = { workspace = true } +csaf = { workspace = true } cyclonedx-bom = { workspace = true } humantime = { workspace = true } log = { workspace = true } diff --git a/integration-tests/src/csaf/mod.rs b/integration-tests/src/csaf/mod.rs new file mode 100644 index 000000000..6391cf028 --- /dev/null +++ b/integration-tests/src/csaf/mod.rs @@ -0,0 +1 @@ +mod perf; diff --git a/integration-tests/src/csaf/perf.rs b/integration-tests/src/csaf/perf.rs new file mode 100644 index 000000000..39073ad5a --- /dev/null +++ b/integration-tests/src/csaf/perf.rs @@ -0,0 +1,32 @@ +#![cfg(test)] + +use std::time::Instant; +use test_context::test_context; +use test_log::test; +use tracing::instrument; +use trustify_common::{db::test::TrustifyContext, hashing::Digests}; +use trustify_module_ingestor::{graph::Graph, service::advisory::csaf::loader::CsafLoader}; + +#[test_context(TrustifyContext, skip_teardown)] +#[test(tokio::test)] +#[instrument] +async fn ingest(ctx: TrustifyContext) -> anyhow::Result<()> { + let db = ctx.db; + let graph = Graph::new(db.clone()); + + let start = Instant::now(); + + // let data = include_bytes!("../../../etc/test-data/csaf/CVE-2023-20862.json"); + let data = include_bytes!("../../../etc/test-data/csaf/cve-2023-33201.json"); + + let digests = Digests::digest(data); + CsafLoader::new(&graph) + .load((), &data[..], &digests) + .await?; + + let ingest_time = start.elapsed(); + + log::info!("ingest: {}", humantime::Duration::from(ingest_time)); + + Ok(()) +} diff --git a/integration-tests/src/lib.rs b/integration-tests/src/lib.rs index 118d7388c..e0417c4e1 100644 --- a/integration-tests/src/lib.rs +++ b/integration-tests/src/lib.rs @@ -1 +1,3 @@ +mod csaf; mod sbom; +mod stream; diff --git a/integration-tests/src/sbom/mod.rs b/integration-tests/src/sbom/mod.rs index b8ab8e374..70923e3ee 100644 --- a/integration-tests/src/sbom/mod.rs +++ b/integration-tests/src/sbom/mod.rs @@ -1,4 +1,3 @@ mod graph; mod reingest; -mod stream; mod test; diff --git a/integration-tests/src/sbom/reingest/mod.rs b/integration-tests/src/sbom/reingest/mod.rs index 79c6ae809..0be6f2d0a 100644 --- a/integration-tests/src/sbom/reingest/mod.rs +++ b/integration-tests/src/sbom/reingest/mod.rs @@ -1,10 +1,9 @@ //! Testing to re-ingest a document, ensuring there is not stale data #![cfg(test)] -use crate::sbom::stream::xz_stream; +use crate::stream::{stream, xz_stream}; use bytes::Bytes; use serde_json::Value; -use std::convert::Infallible; use std::str::FromStr; use test_context::futures::stream; use test_context::test_context; @@ -39,11 +38,9 @@ async fn quarkus(ctx: TrustifyContext) -> Result<(), anyhow::Error> { ("source", "test"), None, Format::SPDX, - stream::once(async { - Ok::<_, Infallible>(Bytes::from_static(include_bytes!( - "data/quarkus/v1/quarkus-bom-2.13.8.Final-redhat-00004.json" - ))) - }), + stream(include_bytes!( + "data/quarkus/v1/quarkus-bom-2.13.8.Final-redhat-00004.json" + )), ) .await?; @@ -56,11 +53,9 @@ async fn quarkus(ctx: TrustifyContext) -> Result<(), anyhow::Error> { ("source", "test"), None, Format::SPDX, - stream::once(async { - Ok::<_, Infallible>(Bytes::from_static(include_bytes!( - "data/quarkus/v2/quarkus-bom-2.13.8.Final-redhat-00004.json" - ))) - }), + stream(include_bytes!( + "data/quarkus/v2/quarkus-bom-2.13.8.Final-redhat-00004.json" + )), ) .await?; diff --git a/integration-tests/src/sbom/test/mod.rs b/integration-tests/src/sbom/test/mod.rs index 272df9ecf..533452960 100644 --- a/integration-tests/src/sbom/test/mod.rs +++ b/integration-tests/src/sbom/test/mod.rs @@ -166,7 +166,6 @@ where let start = Instant::now(); i(&ctx, sbom, &tx).await?; - // sbom.ingest_spdx(sbom.clone(), &tx).await?; let ingest_time_2 = start.elapsed(); // commit diff --git a/integration-tests/src/sbom/stream.rs b/integration-tests/src/stream.rs similarity index 66% rename from integration-tests/src/sbom/stream.rs rename to integration-tests/src/stream.rs index 588f6bce0..8ab999146 100644 --- a/integration-tests/src/sbom/stream.rs +++ b/integration-tests/src/stream.rs @@ -2,6 +2,7 @@ use bytes::Bytes; use lzma::LzmaError; +use std::convert::Infallible; use test_context::futures::{stream, Stream}; @@ -12,3 +13,8 @@ pub fn xz_stream(data: &[u8]) -> impl Stream> { let result = lzma::decompress(data).map(|data| data.into()); stream::once(async { result }) } + +/// Create a stream from a static BLOB. +pub fn stream(data: &'static [u8]) -> impl Stream> { + stream::once(async move { Ok(Bytes::from_static(data)) }) +} diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 21b041863..c75934e82 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -43,6 +43,7 @@ mod m0000350_remove_old_assertion_tables; mod m0000355_labels; mod m0000360_add_sbom_file; mod m0000370_add_cwe; +mod m0000380_create_package_status_index; pub struct Migrator; @@ -92,6 +93,7 @@ impl MigratorTrait for Migrator { Box::new(m0000355_labels::Migration), Box::new(m0000360_add_sbom_file::Migration), Box::new(m0000370_add_cwe::Migration), + Box::new(m0000380_create_package_status_index::Migration), ] } } diff --git a/migration/src/m0000380_create_package_status_index.rs b/migration/src/m0000380_create_package_status_index.rs new file mode 100644 index 000000000..ceeb5e335 --- /dev/null +++ b/migration/src/m0000380_create_package_status_index.rs @@ -0,0 +1,43 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_index( + Index::create() + .table(PackageStatus::Table) + .name("package_status_idx") + .col(PackageStatus::PackageId) + .col(PackageStatus::AdvisoryId) + .col(PackageStatus::StatusId) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_index( + Index::drop() + .table(PackageStatus::Table) + .name("package_status_idx") + .if_exists() + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum PackageStatus { + Table, + AdvisoryId, + StatusId, + PackageId, +} diff --git a/modules/ingestor/src/graph/advisory/advisory_vulnerability.rs b/modules/ingestor/src/graph/advisory/advisory_vulnerability.rs index 9987036f5..bdec281b0 100644 --- a/modules/ingestor/src/graph/advisory/advisory_vulnerability.rs +++ b/modules/ingestor/src/graph/advisory/advisory_vulnerability.rs @@ -31,7 +31,7 @@ pub enum Version { } impl VersionInfo { - fn into_active_model(self) -> version_range::ActiveModel { + pub fn into_active_model(self) -> version_range::ActiveModel { version_range::ActiveModel { id: Default::default(), version_scheme_id: Set(self.scheme), @@ -400,6 +400,7 @@ impl<'g> AdvisoryVulnerabilityContext<'g> { .map(|cvss| cvss.into())) } + #[instrument(skip(self, tx), err)] pub async fn ingest_cvss3_score>( &self, cvss3: Cvss3Base, diff --git a/modules/ingestor/src/graph/advisory/mod.rs b/modules/ingestor/src/graph/advisory/mod.rs index 866b8eebc..795274f21 100644 --- a/modules/ingestor/src/graph/advisory/mod.rs +++ b/modules/ingestor/src/graph/advisory/mod.rs @@ -10,6 +10,7 @@ use sea_orm::{ColumnTrait, QuerySelect, RelationTrait}; use sea_query::{Condition, JoinType}; use std::fmt::{Debug, Formatter}; use time::OffsetDateTime; +use tracing::instrument; use trustify_common::db::Transactional; use trustify_common::hashing::Digests; use trustify_entity as entity; @@ -65,6 +66,7 @@ impl Graph { .map(|advisory| AdvisoryContext::new(self, advisory))) } + #[instrument(skip(self, tx), err)] pub async fn get_advisory_by_digest>( &self, sha256: &str, @@ -90,9 +92,10 @@ impl Graph { .collect()) } + #[instrument(skip(self, labels, information, tx), err)] pub async fn ingest_advisory>( &self, - identifier: impl Into, + identifier: impl Into + Debug, labels: impl Into, digests: &Digests, information: impl Into, @@ -200,6 +203,7 @@ impl<'g> AdvisoryContext<'g> { self.advisory.withdrawn } + #[instrument(skip(self, tx), err)] pub async fn get_vulnerability>( &self, identifier: &str, @@ -217,6 +221,7 @@ impl<'g> AdvisoryContext<'g> { .map(|vuln| (self, vuln).into())) } + #[instrument(skip(self, information, tx), err)] pub async fn link_to_vulnerability>( &self, identifier: &str, diff --git a/modules/ingestor/src/graph/organization.rs b/modules/ingestor/src/graph/organization.rs index 839b3c5af..0d86e3828 100644 --- a/modules/ingestor/src/graph/organization.rs +++ b/modules/ingestor/src/graph/organization.rs @@ -1,5 +1,6 @@ use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set}; - +use std::fmt::Debug; +use tracing::instrument; use trustify_common::db::Transactional; use trustify_entity::organization; @@ -52,9 +53,10 @@ impl super::Graph { .map(|organization| OrganizationContext::new(self, organization))) } + #[instrument(skip(self, information, tx), err)] pub async fn ingest_organization>( &self, - name: impl Into, + name: impl Into + Debug, information: impl Into, tx: TX, ) -> Result { diff --git a/modules/ingestor/src/graph/vulnerability/mod.rs b/modules/ingestor/src/graph/vulnerability/mod.rs index edafd6381..9a987652a 100644 --- a/modules/ingestor/src/graph/vulnerability/mod.rs +++ b/modules/ingestor/src/graph/vulnerability/mod.rs @@ -10,6 +10,7 @@ use sea_orm::{ use sea_query::{Condition, JoinType}; use std::fmt::{Debug, Formatter}; use time::OffsetDateTime; +use tracing::instrument; use trustify_common::db::Transactional; use trustify_entity as entity; use trustify_entity::{advisory, advisory_vulnerability, vulnerability, vulnerability_description}; @@ -46,6 +47,7 @@ impl From<()> for VulnerabilityInformation { } impl Graph { + #[instrument(skip(self, information, tx), err)] pub async fn ingest_vulnerability< I: Into, TX: AsRef, @@ -55,6 +57,7 @@ impl Graph { information: I, tx: TX, ) -> Result { + // TODO: consider transforming into upsert let information = information.into(); if let Some(found) = self.get_vulnerability(identifier, &tx).await? { if information.has_data() { @@ -87,6 +90,7 @@ impl Graph { } } + #[instrument(skip(self, tx), err)] pub async fn get_vulnerability>( &self, identifier: &str, diff --git a/modules/ingestor/src/service/advisory/csaf/creator.rs b/modules/ingestor/src/service/advisory/csaf/creator.rs new file mode 100644 index 000000000..0316df887 --- /dev/null +++ b/modules/ingestor/src/service/advisory/csaf/creator.rs @@ -0,0 +1,147 @@ +use crate::{ + graph::{ + advisory::advisory_vulnerability::{VersionInfo, VersionSpec}, + purl::creator::PurlCreator, + }, + service::{advisory::csaf::util::resolve_purls, Error}, +}; +use csaf::{definitions::ProductIdT, Csaf}; +use sea_orm::{ActiveValue::Set, ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter}; +use sea_query::IntoCondition; +use std::{collections::hash_map::Entry, collections::HashMap}; +use trustify_common::{db::chunk::EntityChunkedIter, purl::Purl}; +use trustify_entity::{package_status, status, version_range}; +use uuid::Uuid; + +struct PackageStatus { + package: Purl, + status: &'static str, + info: VersionInfo, +} + +pub struct PackageStatusCreator { + advisory_id: Uuid, + vulnerability_id: i32, + entries: Vec, +} + +impl PackageStatusCreator { + pub fn new(advisory_id: Uuid, vulnerability_id: i32) -> Self { + Self { + advisory_id, + vulnerability_id, + entries: Vec::new(), + } + } + + pub fn add_all(&mut self, csaf: &Csaf, ps: &Option>, status: &'static str) { + for r in ps.iter().flatten() { + for purl in resolve_purls(csaf, r) { + let package = Purl::from(purl.clone()); + + if let Some(version) = package.version.clone() { + self.entries.push(PackageStatus { + package, + status, + info: VersionInfo { + scheme: "generic".to_string(), + spec: VersionSpec::Exact(version), + }, + }); + } + } + } + } + + async fn check_status( + status: &str, + connection: &impl ConnectionTrait, + ) -> Result { + Ok(status::Entity::find() + .filter(status::Column::Slug.eq(status)) + .one(connection) + .await? + .ok_or_else(|| crate::graph::error::Error::InvalidStatus(status.to_string()))?) + } + + pub async fn create(self, connection: &impl ConnectionTrait) -> Result<(), Error> { + let mut checked = HashMap::new(); + + let mut purls = PurlCreator::new(); + + for ps in &self.entries { + // ensure a correct status, and get id + if let Entry::Vacant(entry) = checked.entry(ps.status) { + entry.insert(Self::check_status(ps.status, connection).await?); + } + // add to PURL creator + purls.add(ps.package.clone()); + } + + purls.create(connection).await?; + + // round two, status is checked, purls exist + + let mut version_ranges = Vec::new(); + let mut package_statuses = Vec::new(); + + for ps in self.entries { + let status = checked.get(&ps.status).ok_or_else(|| { + Error::Graph(crate::graph::error::Error::InvalidStatus( + ps.status.to_string(), + )) + })?; + + // TODO: we could try to batch process this too + + let package_id = ps.package.package_uuid(); + + let package_status = package_status::Entity::find() + .filter(package_status::Column::PackageId.eq(package_id)) + .filter(package_status::Column::AdvisoryId.eq(self.advisory_id)) + .filter(package_status::Column::StatusId.eq(status.id)) + .left_join(version_range::Entity) + .filter(ps.info.clone().into_condition()) + .one(connection) + .await?; + + if package_status.is_some() { + continue; + } + + let mut version_range = ps.info.into_active_model(); + let version_range_id = Uuid::now_v7(); + version_range.id = Set(version_range_id); + version_ranges.push(version_range); + + let package_status = package_status::ActiveModel { + id: Default::default(), + advisory_id: Set(self.advisory_id), + vulnerability_id: Set(self.vulnerability_id), + status_id: Set(status.id), + package_id: Set(package_id), + version_range_id: Set(version_range_id), + }; + + package_statuses.push(package_status); + } + + // batch insert + + for batch in &version_ranges.chunked() { + version_range::Entity::insert_many(batch) + .exec(connection) + .await?; + } + + for batch in &package_statuses.chunked() { + package_status::Entity::insert_many(batch) + .exec(connection) + .await?; + } + + // done + + Ok(()) + } +} diff --git a/modules/ingestor/src/service/advisory/csaf/loader.rs b/modules/ingestor/src/service/advisory/csaf/loader.rs index 06101e02b..a98b59ec8 100644 --- a/modules/ingestor/src/service/advisory/csaf/loader.rs +++ b/modules/ingestor/src/service/advisory/csaf/loader.rs @@ -1,5 +1,3 @@ -use crate::graph::advisory::advisory_vulnerability::{VersionInfo, VersionSpec}; -use crate::model::IngestResult; use crate::{ graph::{ advisory::{ @@ -8,16 +6,17 @@ use crate::{ }, Graph, }, - service::{advisory::csaf::util::resolve_purls, Error}, + model::IngestResult, + service::{advisory::csaf::PackageStatusCreator, Error}, }; use csaf::{ vulnerability::{ProductStatus, Vulnerability}, Csaf, }; -use std::io::Read; -use std::str::FromStr; +use std::{io::Read, str::FromStr}; use time::OffsetDateTime; -use trustify_common::{db::Transactional, hashing::Digests, id::Id, purl::Purl}; +use tracing::{info_span, instrument}; +use trustify_common::{db::Transactional, hashing::Digests, id::Id}; use trustify_cvss::cvss3::Cvss3Base; use trustify_entity::labels::Labels; @@ -51,13 +50,15 @@ impl<'g> CsafLoader<'g> { Self { graph } } + #[instrument(skip(self, labels, document), err)] pub async fn load( &self, labels: impl Into, document: R, digests: &Digests, ) -> Result { - let csaf: Csaf = serde_json::from_reader(document)?; + let csaf: Csaf = + info_span!("parse document").in_scope(|| serde_json::from_reader(document))?; let tx = self.graph.transaction().await?; @@ -81,6 +82,12 @@ impl<'g> CsafLoader<'g> { }) } + #[instrument(skip_all, + fields( + csaf=csaf.document.tracking.id, + cve=vulnerability.cve + ) + )] async fn ingest_vulnerability>( &self, csaf: &Csaf, @@ -136,6 +143,7 @@ impl<'g> CsafLoader<'g> { Ok(()) } + #[instrument(skip_all, err)] async fn ingest_product_statuses>( &self, csaf: &Csaf, @@ -143,63 +151,18 @@ impl<'g> CsafLoader<'g> { product_status: &ProductStatus, tx: TX, ) -> Result<(), Error> { - for r in product_status.fixed.iter().flatten() { - for purl in resolve_purls(csaf, r) { - let package = Purl::from(purl.clone()); - - if let Some(version) = &package.version { - advisory_vulnerability - .ingest_package_status( - &package, - "fixed", - VersionInfo { - scheme: "generic".to_string(), - spec: VersionSpec::Exact(version.clone()), - }, - &tx, - ) - .await? - } - } - } - for r in product_status.known_not_affected.iter().flatten() { - for purl in resolve_purls(csaf, r) { - let package = Purl::from(purl.clone()); - - if let Some(version) = &package.version { - advisory_vulnerability - .ingest_package_status( - &package, - "not_affected", - VersionInfo { - scheme: "generic".to_string(), - spec: VersionSpec::Exact(version.clone()), - }, - &tx, - ) - .await? - } - } - } - for r in product_status.known_affected.iter().flatten() { - for purl in resolve_purls(csaf, r) { - let package = Purl::from(purl.clone()); - - if let Some(version) = &package.version { - advisory_vulnerability - .ingest_package_status( - &package, - "affected", - VersionInfo { - scheme: "generic".to_string(), - spec: VersionSpec::Exact(version.clone()), - }, - &tx, - ) - .await? - } - } - } + let mut creator = PackageStatusCreator::new( + advisory_vulnerability.advisory_vulnerability.advisory_id, + advisory_vulnerability + .advisory_vulnerability + .vulnerability_id, + ); + + creator.add_all(csaf, &product_status.fixed, "fixed"); + creator.add_all(csaf, &product_status.known_not_affected, "not_affected"); + creator.add_all(csaf, &product_status.known_affected, "affected"); + + creator.create(&self.graph.connection(&tx)).await?; Ok(()) } diff --git a/modules/ingestor/src/service/advisory/csaf/mod.rs b/modules/ingestor/src/service/advisory/csaf/mod.rs index 5876894c7..0191e0d9c 100644 --- a/modules/ingestor/src/service/advisory/csaf/mod.rs +++ b/modules/ingestor/src/service/advisory/csaf/mod.rs @@ -1,2 +1,5 @@ pub mod loader; mod util; + +mod creator; +pub use creator::*; diff --git a/server/src/lib.rs b/server/src/lib.rs index 26d73f706..b48ba40a6 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,4 +1,5 @@ #![allow(unused)] +#![recursion_limit = "256"] #[cfg(feature = "garage-door")] mod embedded_oidc;