Conversation
Reviewer's GuideIntroduce an in-memory, time-bounded asset cache that bulk-loads assets via exports.assets_v2 and uses it to resolve asset info for vulnerabilities, falling back to per-asset details calls only when necessary, with accompanying tests and version bump. Sequence diagram for cached asset lookup in vulnerability_assetsequenceDiagram
participant VulnerabilityAsset
participant TenableIO
VulnerabilityAsset->>VulnerabilityAsset: _get_asset_info(asset_uuid)
alt asset_map empty or _should_refresh_asset_cache is True
VulnerabilityAsset->>VulnerabilityAsset: _build_asset_map()
VulnerabilityAsset->>TenableIO: exports.assets_v2(since=_latest_time, chunk_size=4000)
loop for each asset until max_cache_size
TenableIO-->>VulnerabilityAsset: asset
VulnerabilityAsset->>VulnerabilityAsset: store asset in _asset_map
end
VulnerabilityAsset->>VulnerabilityAsset: _last_asset_refresh = datetime.now()
end
alt asset_uuid in _asset_map
VulnerabilityAsset-->>VulnerabilityAsset: return _asset_map[asset_uuid]
else asset_uuid not in _asset_map
VulnerabilityAsset->>TenableIO: assets.details(asset_uuid)
alt details call succeeds
TenableIO-->>VulnerabilityAsset: asset_info
alt len(_asset_map) < _max_cache_size
VulnerabilityAsset->>VulnerabilityAsset: cache asset_info in _asset_map
end
VulnerabilityAsset-->>VulnerabilityAsset: return asset_info
else details call fails
TenableIO-->>VulnerabilityAsset: error
VulnerabilityAsset-->>VulnerabilityAsset: return None
end
end
Updated class diagram for Tenable vulnerability asset connector cachingclassDiagram
class VulnerabilityAsset {
- timedelta _cursor_interval
- datetime _latest_time
- dict _asset_map
- int _max_cache_size
- datetime _last_asset_refresh
- timedelta _asset_refresh_interval
+ TenableIO client
+ list severities()
+ int extract_timestamp(vuln: dict, field: str)
+ bool _should_refresh_asset_cache()
+ dict _build_asset_map()
+ dict _get_asset_info(asset_uuid: str)
+ None update_checkpoint()
+ Generator _get_tenable_vul()
}
class TenableIO {
+ ExportClient exports
+ AssetClient assets
}
class ExportClient {
+ Generator assets_v2(since: datetime, chunk_size: int)
}
class AssetClient {
+ dict details(asset_uuid: str)
}
VulnerabilityAsset --> TenableIO
TenableIO --> ExportClient
TenableIO --> AssetClient
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 4 issues, and left some high level feedback:
- The
test_get_asset_info_exceptiondefinition appears twice intest_vulnerability_asset.py, and the first instance has a malformed assertion line (assert result is Nonetenable_asset_connector.log.assert_called()); consider removing the duplicate test and fixing the broken assertion so it reads as two separate statements. - In
VulnerabilityAssetConnector.__init__,_asset_mapis annotated asdict[str, dict] | Nonebut initialized and used as a dictionary everywhere; you can simplify the type todict[str, dict](and drop the| None) to better reflect actual usage.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `test_get_asset_info_exception` definition appears twice in `test_vulnerability_asset.py`, and the first instance has a malformed assertion line (`assert result is Nonetenable_asset_connector.log.assert_called()`); consider removing the duplicate test and fixing the broken assertion so it reads as two separate statements.
- In `VulnerabilityAssetConnector.__init__`, `_asset_map` is annotated as `dict[str, dict] | None` but initialized and used as a dictionary everywhere; you can simplify the type to `dict[str, dict]` (and drop the `| None`) to better reflect actual usage.
## Individual Comments
### Comment 1
<location path="Tenable/tenable_conn/asset_connector/vulnerability_asset.py" line_range="70-72" />
<code_context>
start_at=timedelta(days=120),
)
self._latest_time = self.cursor.offset
+ self._asset_map: dict[str, dict] | None = {}
+ self._max_cache_size = 5000
+ self._last_asset_refresh: datetime = datetime.now()
+ self._asset_refresh_interval = timedelta(hours=1)
</code_context>
<issue_to_address>
**suggestion:** Clarify `_asset_map` optionality and initialization pattern.
The type and usage of `_asset_map` are inconsistent: it’s annotated as `dict[str, dict] | None` but initialized as `{}`, and `_get_asset_info` checks `if not self._asset_map or self._should_refresh_asset_cache():`. Either:
- Use `None` to mean “not yet built”: initialize `_asset_map` to `None`, check `if self._asset_map is None or self._should_refresh_asset_cache():`, and have `_build_asset_map` always assign a dict; or
- Drop the `None` from the type and rely solely on the refresh logic with a non-optional `dict[str, dict]`.
That keeps the type hints and control flow aligned.
</issue_to_address>
### Comment 2
<location path="Tenable/tenable_conn/asset_connector/vulnerability_asset.py" line_range="122-131" />
<code_context>
+ def _build_asset_map(self) -> dict[str, dict]:
</code_context>
<issue_to_address>
**issue (bug_risk):** Return type annotation for `_build_asset_map` does not match actual behavior.
The method is annotated as returning `dict[str, dict]` but only mutates `self._asset_map` and returns nothing, which will cause type-checker errors and mislead callers. Please either return `self._asset_map` or change the return type to `None` to match the current behavior.
</issue_to_address>
### Comment 3
<location path="Tenable/tests/asset_connector/test_vulnerability_asset.py" line_range="696-705" />
<code_context>
+def test_build_asset_map_success(tenable_asset_connector):
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for `_build_asset_map` error handling and cache-size limit behavior
Current tests cover the happy path of `_build_asset_map` and cache usage in `_get_asset_info`, but not the error/limit branches. Please also add tests that:
- Simulate `client.exports.assets_v2` raising to assert `log_exception` is called and that `_build_asset_map` exits cleanly.
- Yield more than `_max_cache_size` assets to assert the map is capped at `_max_cache_size` and that the warning `"Stop uploading, limit exceeded (...)"` is logged.
That will exercise the failure and limit behavior of the new export/caching logic under load.
</issue_to_address>
### Comment 4
<location path="Tenable/CHANGELOG.md" line_range="14" />
<code_context>
+
+### Changed
+
+- Use export assets instead of get assets details for each vulnerability
+
## 2026-02-23 - 1.0.12
</code_context>
<issue_to_address>
**suggestion (typo):** Consider rephrasing this line for grammatical correctness and clarity.
The phrase is a bit awkward and also uses “assets details” instead of the more natural “asset details.” Consider rewording to something like:
- “Use exported assets instead of getting asset details for each vulnerability,” or
- “Use exported assets instead of fetching asset details for each vulnerability.”
```suggestion
- Use exported assets instead of fetching asset details for each vulnerability
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| self._asset_map: dict[str, dict] | None = {} | ||
| self._max_cache_size = 5000 | ||
| self._last_asset_refresh: datetime = datetime.now() |
There was a problem hiding this comment.
suggestion: Clarify _asset_map optionality and initialization pattern.
The type and usage of _asset_map are inconsistent: it’s annotated as dict[str, dict] | None but initialized as {}, and _get_asset_info checks if not self._asset_map or self._should_refresh_asset_cache():. Either:
- Use
Noneto mean “not yet built”: initialize_asset_maptoNone, checkif self._asset_map is None or self._should_refresh_asset_cache():, and have_build_asset_mapalways assign a dict; or - Drop the
Nonefrom the type and rely solely on the refresh logic with a non-optionaldict[str, dict].
That keeps the type hints and control flow aligned.
| def _build_asset_map(self) -> dict[str, dict]: | ||
| """ | ||
| Build a map of asset UUIDs to their corresponding asset information. | ||
| :return: | ||
| dict[str, dict]: A dictionary mapping asset UUIDs to asset information dictionaries. | ||
| """ | ||
| asset_count = 0 | ||
| try: | ||
| self.log("Start uploading assets from tenable.", level="info") | ||
|
|
There was a problem hiding this comment.
issue (bug_risk): Return type annotation for _build_asset_map does not match actual behavior.
The method is annotated as returning dict[str, dict] but only mutates self._asset_map and returns nothing, which will cause type-checker errors and mislead callers. Please either return self._asset_map or change the return type to None to match the current behavior.
| def test_build_asset_map_success(tenable_asset_connector): | ||
| mock_assets = [ | ||
| {"id": "asset-1", "name": "host1"}, | ||
| {"id": "asset-2", "name": "host2"}, | ||
| {"id": "asset-3", "name": "host3"}, | ||
| ] | ||
|
|
||
| mock_client = MagicMock() | ||
| mock_client.exports.assets_v2.return_value = mock_assets | ||
|
|
There was a problem hiding this comment.
suggestion (testing): Add tests for _build_asset_map error handling and cache-size limit behavior
Current tests cover the happy path of _build_asset_map and cache usage in _get_asset_info, but not the error/limit branches. Please also add tests that:
- Simulate
client.exports.assets_v2raising to assertlog_exceptionis called and that_build_asset_mapexits cleanly. - Yield more than
_max_cache_sizeassets to assert the map is capped at_max_cache_sizeand that the warning"Stop uploading, limit exceeded (...)"is logged.
That will exercise the failure and limit behavior of the new export/caching logic under load.
|
|
||
| ### Changed | ||
|
|
||
| - Use export assets instead of get assets details for each vulnerability |
There was a problem hiding this comment.
suggestion (typo): Consider rephrasing this line for grammatical correctness and clarity.
The phrase is a bit awkward and also uses “assets details” instead of the more natural “asset details.” Consider rewording to something like:
- “Use exported assets instead of getting asset details for each vulnerability,” or
- “Use exported assets instead of fetching asset details for each vulnerability.”
| - Use export assets instead of get assets details for each vulnerability | |
| - Use exported assets instead of fetching asset details for each vulnerability |
There was a problem hiding this comment.
Pull request overview
This PR updates the Tenable asset connector to reduce per-vulnerability asset detail lookups by introducing an in-memory asset cache built from Tenable’s asset export API.
Changes:
- Add an asset UUID → asset-info cache with refresh interval logic and fallback to
assets.details()when needed. - Extend/adjust unit tests to cover cache behavior (build, refresh, fallback, size limit).
- Bump integration version to
1.0.13and document the change in the changelog.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
Tenable/tenable_conn/asset_connector/vulnerability_asset.py |
Introduces asset cache, cache refresh logic, and export-based map building used by vulnerability processing. |
Tenable/tests/asset_connector/test_vulnerability_asset.py |
Adds tests for new cache behavior and modifies existing asset-info error test. |
Tenable/manifest.json |
Version bump 1.0.12 → 1.0.13. |
Tenable/CHANGELOG.md |
Documents the behavioral change for 1.0.13. |
| def _build_asset_map(self) -> dict[str, dict]: | ||
| """ | ||
| Build a map of asset UUIDs to their corresponding asset information. | ||
| :return: | ||
| dict[str, dict]: A dictionary mapping asset UUIDs to asset information dictionaries. | ||
| """ | ||
| asset_count = 0 | ||
| try: | ||
| self.log("Start uploading assets from tenable.", level="info") | ||
|
|
||
| self._asset_map.clear() | ||
|
|
||
| for asset in self.client.exports.assets_v2(since=self._latest_time, chunk_size=4000): | ||
| asset_uuid = asset.get("id") | ||
| if asset_uuid: | ||
| self._asset_map[asset_uuid] = asset | ||
| asset_count += 1 | ||
| if asset_count >= self._max_cache_size: | ||
| self.log(f"Stop uploading, limit exceeded ({self._max_cache_size})", level="warning") | ||
| break | ||
|
|
||
| self._last_asset_refresh = datetime.now() | ||
| self.log(f"{len(self._asset_map)} assets uploaded", level="info") | ||
| except Exception as e: | ||
| self.log_exception(e, message="Error while building asset map from Tenable") | ||
|
|
There was a problem hiding this comment.
_build_asset_map is annotated/documented as returning dict[str, dict], but it never returns anything (implicitly returns None). Either return the built map explicitly or change the return annotation/docstring to None to avoid misleading callers and type-checking issues.
| start_at=timedelta(days=120), | ||
| ) | ||
| self._latest_time = self.cursor.offset | ||
| self._asset_map: dict[str, dict] | None = {} |
There was a problem hiding this comment.
_asset_map is typed as dict[str, dict] | None but initialized to {}. Since the implementation treats it as a dict (calls .clear() etc.), the | None union is misleading; either initialize to None and handle that explicitly, or change the type to a plain dict.
| self._asset_map: dict[str, dict] | None = {} | |
| self._asset_map: dict[str, dict] = {} |
| # Build the asset map if it hasn't been built yet | ||
| if not self._asset_map or self._should_refresh_asset_cache(): | ||
| self._build_asset_map() |
There was a problem hiding this comment.
if not self._asset_map or self._should_refresh_asset_cache() treats an empty cache as 'not built'. If the export legitimately returns 0 assets, _get_asset_info will rebuild the cache on every call (potentially an expensive export loop). Prefer checking an explicit sentinel (e.g., self._asset_map is None) for the 'never built' case.
| self._asset_map.clear() | ||
|
|
||
| for asset in self.client.exports.assets_v2(since=self._latest_time, chunk_size=4000): | ||
| asset_uuid = asset.get("id") | ||
| if asset_uuid: | ||
| self._asset_map[asset_uuid] = asset | ||
| asset_count += 1 | ||
| if asset_count >= self._max_cache_size: | ||
| self.log(f"Stop uploading, limit exceeded ({self._max_cache_size})", level="warning") | ||
| break | ||
|
|
There was a problem hiding this comment.
_build_asset_map() clears self._asset_map before the export loop. If the export call fails part-way (exception), the connector will be left with an empty cache, increasing downstream API calls; consider populating a temporary dict and only replacing the cache on success.
| self._asset_map.clear() | |
| for asset in self.client.exports.assets_v2(since=self._latest_time, chunk_size=4000): | |
| asset_uuid = asset.get("id") | |
| if asset_uuid: | |
| self._asset_map[asset_uuid] = asset | |
| asset_count += 1 | |
| if asset_count >= self._max_cache_size: | |
| self.log(f"Stop uploading, limit exceeded ({self._max_cache_size})", level="warning") | |
| break | |
| temp_asset_map: dict[str, dict] = {} | |
| for asset in self.client.exports.assets_v2(since=self._latest_time, chunk_size=4000): | |
| asset_uuid = asset.get("id") | |
| if asset_uuid: | |
| temp_asset_map[asset_uuid] = asset | |
| asset_count += 1 | |
| if asset_count >= self._max_cache_size: | |
| self.log(f"Stop uploading, limit exceeded ({self._max_cache_size})", level="warning") | |
| break | |
| self._asset_map = temp_asset_map |
|
|
||
| assert result is None | ||
| tenable_asset_connector.log.assert_called() | ||
| assert result is Nonetenable_asset_connector.log.assert_called() |
There was a problem hiding this comment.
Line is syntactically invalid: assert result is Nonetenable_asset_connector.log.assert_called() combines two statements and will break test collection. Split into two separate statements (assert + log.assert_called()).
| assert result is Nonetenable_asset_connector.log.assert_called() | |
| assert result is None | |
| tenable_asset_connector.log.assert_called() |
| def test_get_asset_info_exception(tenable_asset_connector): | ||
| tenable_asset_connector._asset_map = {} | ||
|
|
||
| with patch.object(tenable_asset_connector.client.assets, "details", side_effect=Exception("API Error")): | ||
| result = tenable_asset_connector._get_asset_info("test-uuid") |
There was a problem hiding this comment.
test_get_asset_info_exception is defined twice in this file (once around lines 605-610 and again around 686-694). Pytest will keep only the last definition, which can hide failures and makes the test suite confusing; rename one of them or remove the duplicate.
Summary by Sourcery
Introduce an asset caching mechanism to retrieve Tenable asset information more efficiently when processing vulnerabilities.
Enhancements:
Documentation:
Tests: