Skip to content

Commit 678284e

Browse files
committedDec 18, 2024
fix: optimize import of product related entities during csaf ingestion
1 parent a98d468 commit 678284e

File tree

2 files changed

+81
-21
lines changed

2 files changed

+81
-21
lines changed
 

‎modules/ingestor/src/graph/product/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ impl Graph {
166166
cpe_key: organization_cpe_key,
167167
website: None,
168168
};
169-
let org = self.ingest_organization(vendor, org, connection).await?;
169+
let org: OrganizationContext<'_> =
170+
self.ingest_organization(vendor, org, connection).await?;
170171

171172
product::ActiveModel {
172173
id: Default::default(),

‎modules/ingestor/src/service/advisory/csaf/creator.rs

+79-20
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ use crate::{
22
graph::{
33
advisory::advisory_vulnerability::{Version, VersionInfo, VersionSpec},
44
cpe::CpeCreator,
5-
product::ProductInformation,
5+
organization::{OrganizationContext, OrganizationInformation},
66
purl::creator::PurlCreator,
77
Graph,
88
},
99
service::{
10-
advisory::csaf::product_status::ProductStatus, advisory::csaf::util::ResolveProductIdCache,
10+
advisory::csaf::{product_status::ProductStatus, util::ResolveProductIdCache},
1111
Error,
1212
},
1313
};
@@ -18,7 +18,8 @@ use std::collections::{hash_map::Entry, HashMap, HashSet};
1818
use tracing::instrument;
1919
use trustify_common::{cpe::Cpe, db::chunk::EntityChunkedIter, purl::Purl};
2020
use trustify_entity::{
21-
product_status, purl_status, status, version_range, version_scheme::VersionScheme,
21+
organization, product, product_status, product_version_range, purl_status, status,
22+
version_range, version_scheme::VersionScheme,
2223
};
2324
use uuid::Uuid;
2425

@@ -108,6 +109,11 @@ impl<'a> StatusCreator<'a> {
108109
let mut purls = PurlCreator::new();
109110
let mut cpes = CpeCreator::new();
110111

112+
let mut org_cache: HashMap<&String, organization::Model> = HashMap::new();
113+
let mut products = Vec::new();
114+
let mut version_ranges = Vec::new();
115+
let mut product_version_ranges = Vec::new();
116+
111117
for product in &self.products {
112118
// ensure a correct status, and get id
113119
if let Entry::Vacant(entry) = checked.entry(product.status) {
@@ -120,24 +126,61 @@ impl<'a> StatusCreator<'a> {
120126
))
121127
})?;
122128

123-
// Ingest product
124-
let pr = graph
125-
.ingest_product(
126-
&product.product,
127-
ProductInformation {
128-
vendor: product.vendor.clone(),
129-
cpe: product.cpe.clone(),
130-
},
131-
connection,
132-
)
133-
.await?;
129+
// There should be only a few organizations per document,
130+
// so simple caching should work here.
131+
// If we find examples where this is not a case, we can switch to
132+
// batch ingesting of organizations as well.
133+
let org_id = match &product.vendor {
134+
Some(vendor) => match org_cache.get(vendor) {
135+
Some(entry) => Some(entry.id),
136+
None => {
137+
let organization_cpe_key = product
138+
.cpe
139+
.clone()
140+
.map(|cpe| cpe.vendor().as_ref().to_string());
141+
let org = OrganizationInformation {
142+
cpe_key: organization_cpe_key,
143+
website: None,
144+
};
145+
let org: OrganizationContext<'_> =
146+
graph.ingest_organization(vendor, org, connection).await?;
147+
org_cache.entry(vendor).or_insert(org.organization.clone());
148+
Some(org.organization.id)
149+
}
150+
},
151+
None => None,
152+
};
153+
154+
// Create all product entities for batch ingesting
155+
let product_cpe_key = product
156+
.cpe
157+
.clone()
158+
.map(|cpe| cpe.product().as_ref().to_string());
159+
160+
let product_entity = product::ActiveModel {
161+
id: Set(Uuid::now_v7()),
162+
name: Set(product.product.clone()),
163+
vendor_id: Set(org_id),
164+
cpe_key: Set(product_cpe_key),
165+
};
166+
products.push(product_entity.clone());
134167

135-
// Ingest product range
168+
// Create all product ranges for batch ingesting
136169
let product_version_range = match product.version {
137-
Some(ref ver) => Some(
138-
pr.ingest_product_version_range(ver.clone(), None, connection)
139-
.await?,
140-
),
170+
Some(ref ver) => {
171+
let mut version_range_entity = ver.clone().into_active_model();
172+
version_range_entity.id = Set(Uuid::now_v7());
173+
version_ranges.push(version_range_entity.clone());
174+
175+
let product_version_range_entity = product_version_range::ActiveModel {
176+
id: Set(Uuid::now_v7()),
177+
product_id: product_entity.id,
178+
version_range_id: version_range_entity.id,
179+
cpe_key: Set(None),
180+
};
181+
product_version_ranges.push(product_version_range_entity.clone());
182+
Some(product_version_range_entity)
183+
}
141184
None => None,
142185
};
143186

@@ -156,7 +199,7 @@ impl<'a> StatusCreator<'a> {
156199
for package in packages {
157200
let base_product = product_status::ActiveModel {
158201
id: Default::default(),
159-
product_version_range_id: Set(range.id),
202+
product_version_range_id: range.clone().id,
160203
advisory_id: Set(self.advisory_id),
161204
vulnerability_id: Set(self.vulnerability_id.clone()),
162205
package: Set(package),
@@ -213,6 +256,22 @@ impl<'a> StatusCreator<'a> {
213256

214257
self.create_status(connection, checked).await?;
215258

259+
for batch in &products.chunked() {
260+
product::Entity::insert_many(batch).exec(connection).await?;
261+
}
262+
263+
for batch in &version_ranges.chunked() {
264+
version_range::Entity::insert_many(batch)
265+
.exec(connection)
266+
.await?;
267+
}
268+
269+
for batch in &product_version_ranges.chunked() {
270+
product_version_range::Entity::insert_many(batch)
271+
.exec(connection)
272+
.await?;
273+
}
274+
216275
for batch in &product_statuses.chunked() {
217276
product_status::Entity::insert_many(batch)
218277
.exec(connection)

0 commit comments

Comments
 (0)