Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of CSAF ingestion #458

Merged
merged 4 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integration-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ trustify-module-storage = { workspace = true }

anyhow = { workspace = true }
bytes = { workspace = true }
csaf = { workspace = true }
cyclonedx-bom = { workspace = true }
humantime = { workspace = true }
log = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions integration-tests/src/csaf/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod perf;
32 changes: 32 additions & 0 deletions integration-tests/src/csaf/perf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#![cfg(test)]

use std::time::Instant;
use test_context::test_context;
use test_log::test;
use tracing::instrument;
use trustify_common::{db::test::TrustifyContext, hashing::Digests};
use trustify_module_ingestor::{graph::Graph, service::advisory::csaf::loader::CsafLoader};

#[test_context(TrustifyContext, skip_teardown)]
#[test(tokio::test)]
#[instrument]
async fn ingest(ctx: TrustifyContext) -> anyhow::Result<()> {
let db = ctx.db;
let graph = Graph::new(db.clone());

let start = Instant::now();

// let data = include_bytes!("../../../etc/test-data/csaf/CVE-2023-20862.json");
let data = include_bytes!("../../../etc/test-data/csaf/cve-2023-33201.json");

let digests = Digests::digest(data);
CsafLoader::new(&graph)
.load((), &data[..], &digests)
.await?;

let ingest_time = start.elapsed();

log::info!("ingest: {}", humantime::Duration::from(ingest_time));

Ok(())
}
2 changes: 2 additions & 0 deletions integration-tests/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
mod csaf;
mod sbom;
mod stream;
1 change: 0 additions & 1 deletion integration-tests/src/sbom/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
mod graph;
mod reingest;
mod stream;
mod test;
19 changes: 7 additions & 12 deletions integration-tests/src/sbom/reingest/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
//! Testing to re-ingest a document, ensuring there is not stale data
#![cfg(test)]

use crate::sbom::stream::xz_stream;
use crate::stream::{stream, xz_stream};
use bytes::Bytes;
use serde_json::Value;
use std::convert::Infallible;
use std::str::FromStr;
use test_context::futures::stream;
use test_context::test_context;
Expand Down Expand Up @@ -39,11 +38,9 @@ async fn quarkus(ctx: TrustifyContext) -> Result<(), anyhow::Error> {
("source", "test"),
None,
Format::SPDX,
stream::once(async {
Ok::<_, Infallible>(Bytes::from_static(include_bytes!(
"data/quarkus/v1/quarkus-bom-2.13.8.Final-redhat-00004.json"
)))
}),
stream(include_bytes!(
"data/quarkus/v1/quarkus-bom-2.13.8.Final-redhat-00004.json"
)),
)
.await?;

Expand All @@ -56,11 +53,9 @@ async fn quarkus(ctx: TrustifyContext) -> Result<(), anyhow::Error> {
("source", "test"),
None,
Format::SPDX,
stream::once(async {
Ok::<_, Infallible>(Bytes::from_static(include_bytes!(
"data/quarkus/v2/quarkus-bom-2.13.8.Final-redhat-00004.json"
)))
}),
stream(include_bytes!(
"data/quarkus/v2/quarkus-bom-2.13.8.Final-redhat-00004.json"
)),
)
.await?;

Expand Down
1 change: 0 additions & 1 deletion integration-tests/src/sbom/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ where

let start = Instant::now();
i(&ctx, sbom, &tx).await?;
// sbom.ingest_spdx(sbom.clone(), &tx).await?;
let ingest_time_2 = start.elapsed();

// commit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use bytes::Bytes;
use lzma::LzmaError;
use std::convert::Infallible;

use test_context::futures::{stream, Stream};

Expand All @@ -12,3 +13,8 @@ pub fn xz_stream(data: &[u8]) -> impl Stream<Item = Result<Bytes, LzmaError>> {
let result = lzma::decompress(data).map(|data| data.into());
stream::once(async { result })
}

/// Create a stream from a static BLOB.
pub fn stream(data: &'static [u8]) -> impl Stream<Item = Result<Bytes, Infallible>> {
stream::once(async move { Ok(Bytes::from_static(data)) })
}
2 changes: 2 additions & 0 deletions migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ mod m0000350_remove_old_assertion_tables;
mod m0000355_labels;
mod m0000360_add_sbom_file;
mod m0000370_add_cwe;
mod m0000380_create_package_status_index;

pub struct Migrator;

Expand Down Expand Up @@ -92,6 +93,7 @@ impl MigratorTrait for Migrator {
Box::new(m0000355_labels::Migration),
Box::new(m0000360_add_sbom_file::Migration),
Box::new(m0000370_add_cwe::Migration),
Box::new(m0000380_create_package_status_index::Migration),
]
}
}
Expand Down
43 changes: 43 additions & 0 deletions migration/src/m0000380_create_package_status_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_index(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks!

Index::create()
.table(PackageStatus::Table)
.name("package_status_idx")
.col(PackageStatus::PackageId)
.col(PackageStatus::AdvisoryId)
.col(PackageStatus::StatusId)
.to_owned(),
)
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_index(
Index::drop()
.table(PackageStatus::Table)
.name("package_status_idx")
.if_exists()
.to_owned(),
)
.await
}
}

#[derive(DeriveIden)]
enum PackageStatus {
Table,
AdvisoryId,
StatusId,
PackageId,
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub enum Version {
}

impl VersionInfo {
fn into_active_model(self) -> version_range::ActiveModel {
pub fn into_active_model(self) -> version_range::ActiveModel {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bonus points: if you can understand why implementing IntoActiveModel caused rustc to complain about duplicate impls, that'd be lovely. There was a trait I could use, but I couldn't make rustc happy, hence this one-off fn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That one was a bit weird, it seems to be coming from the blanket implementation of that trait in seaorm:

impl<A> IntoActiveModel<A> for A
where
    A: ActiveModelTrait,
{
    fn into_active_model(self) -> A {
        self
    }
}

Which basically means that you can't implement IntoActiveModel for anything that should become an ActiveModel. Which feels like an API bug to me.

It also doesn't seem possible to implement From or Into for that case due to similar conflicts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that's not correct. Because in this case A would be VersionInfo, which doesn't implement ActiveModelTrait. And the compiler complains about impl sea_orm::IntoActiveModel<trustify_entity::version_range::ActiveModel> for <trustify_entity::version_range::Entity as sea_orm::EntityTrait>::Model being the conflict.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I give up on the bonus points and leave it a mystery.

version_range::ActiveModel {
id: Default::default(),
version_scheme_id: Set(self.scheme),
Expand Down Expand Up @@ -400,6 +400,7 @@ impl<'g> AdvisoryVulnerabilityContext<'g> {
.map(|cvss| cvss.into()))
}

#[instrument(skip(self, tx), err)]
pub async fn ingest_cvss3_score<TX: AsRef<Transactional>>(
&self,
cvss3: Cvss3Base,
Expand Down
7 changes: 6 additions & 1 deletion modules/ingestor/src/graph/advisory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use sea_orm::{ColumnTrait, QuerySelect, RelationTrait};
use sea_query::{Condition, JoinType};
use std::fmt::{Debug, Formatter};
use time::OffsetDateTime;
use tracing::instrument;
use trustify_common::db::Transactional;
use trustify_common::hashing::Digests;
use trustify_entity as entity;
Expand Down Expand Up @@ -65,6 +66,7 @@ impl Graph {
.map(|advisory| AdvisoryContext::new(self, advisory)))
}

#[instrument(skip(self, tx), err)]
pub async fn get_advisory_by_digest<TX: AsRef<Transactional>>(
&self,
sha256: &str,
Expand All @@ -90,9 +92,10 @@ impl Graph {
.collect())
}

#[instrument(skip(self, labels, information, tx), err)]
pub async fn ingest_advisory<TX: AsRef<Transactional>>(
&self,
identifier: impl Into<String>,
identifier: impl Into<String> + Debug,
labels: impl Into<Labels>,
digests: &Digests,
information: impl Into<AdvisoryInformation>,
Expand Down Expand Up @@ -200,6 +203,7 @@ impl<'g> AdvisoryContext<'g> {
self.advisory.withdrawn
}

#[instrument(skip(self, tx), err)]
pub async fn get_vulnerability<TX: AsRef<Transactional>>(
&self,
identifier: &str,
Expand All @@ -217,6 +221,7 @@ impl<'g> AdvisoryContext<'g> {
.map(|vuln| (self, vuln).into()))
}

#[instrument(skip(self, information, tx), err)]
pub async fn link_to_vulnerability<TX: AsRef<Transactional>>(
&self,
identifier: &str,
Expand Down
6 changes: 4 additions & 2 deletions modules/ingestor/src/graph/organization.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set};

use std::fmt::Debug;
use tracing::instrument;
use trustify_common::db::Transactional;
use trustify_entity::organization;

Expand Down Expand Up @@ -52,9 +53,10 @@ impl super::Graph {
.map(|organization| OrganizationContext::new(self, organization)))
}

#[instrument(skip(self, information, tx), err)]
pub async fn ingest_organization<TX: AsRef<Transactional>>(
&self,
name: impl Into<String>,
name: impl Into<String> + Debug,
information: impl Into<OrganizationInformation>,
tx: TX,
) -> Result<OrganizationContext, Error> {
Expand Down
4 changes: 4 additions & 0 deletions modules/ingestor/src/graph/vulnerability/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use sea_orm::{
use sea_query::{Condition, JoinType};
use std::fmt::{Debug, Formatter};
use time::OffsetDateTime;
use tracing::instrument;
use trustify_common::db::Transactional;
use trustify_entity as entity;
use trustify_entity::{advisory, advisory_vulnerability, vulnerability, vulnerability_description};
Expand Down Expand Up @@ -46,6 +47,7 @@ impl From<()> for VulnerabilityInformation {
}

impl Graph {
#[instrument(skip(self, information, tx), err)]
pub async fn ingest_vulnerability<
I: Into<VulnerabilityInformation>,
TX: AsRef<Transactional>,
Expand All @@ -55,6 +57,7 @@ impl Graph {
information: I,
tx: TX,
) -> Result<VulnerabilityContext, Error> {
// TODO: consider transforming into upsert
let information = information.into();
if let Some(found) = self.get_vulnerability(identifier, &tx).await? {
if information.has_data() {
Expand Down Expand Up @@ -87,6 +90,7 @@ impl Graph {
}
}

#[instrument(skip(self, tx), err)]
pub async fn get_vulnerability<TX: AsRef<Transactional>>(
&self,
identifier: &str,
Expand Down
Loading