Conversation
Reviewer's GuideIntroduce a new Google Threat Intelligence (VirusTotal) automation module for Sekoia, built around a reusable VTAPIConnector client wrapper and multiple Action classes (scan URL/file, IoC report, comments, passive DNS, file behaviour, vulnerability report/associations), wired into a module entrypoint, containerized with a Dockerfile, and covered by pytest-based unit tests. Sequence diagram for GTIScanURL action executionsequenceDiagram
actor SekoiaPlatform
participant Module
participant GTIScanURL
participant VTAPIConnector
participant VTClient as vt.Client
participant VirusTotalAPI
SekoiaPlatform->>Module: invoke scan_url action with arguments
Module->>GTIScanURL: instantiate and call run(arguments)
GTIScanURL->>GTIScanURL: read api_key from module.configuration
GTIScanURL->>GTIScanURL: extract url from arguments
GTIScanURL->>VTAPIConnector: __init__(api_key, url, domain, ip, file_hash, cve)
GTIScanURL->>VTClient: create context with api_key
activate VTClient
GTIScanURL->>VTAPIConnector: scan_url(VTClient)
activate VTAPIConnector
VTAPIConnector->>VTClient: scan_url(self.url, wait_for_completion=True)
VTClient->>VirusTotalAPI: POST /api/v3/urls
VirusTotalAPI-->>VTClient: analysis object
VTAPIConnector->>VTAPIConnector: _add_result("SCAN_URL", "POST", "/api/v3/urls", "SUCCESS", analysis_data)
VTAPIConnector-->>GTIScanURL: updated results list
deactivate VTAPIConnector
GTIScanURL->>GTIScanURL: analysis = results[-1].response
VTClient-->>GTIScanURL: context exit
deactivate VTClient
GTIScanURL-->>Module: return {success, data, error}
Module-->>SekoiaPlatform: action result as JSON
Class diagram for VTAPIConnector and GTI action classesclassDiagram
class Result {
+str name
+str method
+str endpoint
+str status
+Any response
+str error
}
class VTAPIConnector {
-str api_key
-str domain
-str ip
-str url
-str file_hash
-str cve
-Dict headers
-List~Result~ results
+__init__(api_key, domain, ip, url, file_hash, cve)
+_add_result(name, method, endpoint, status, response, error)
+_make_serializable(obj)
+test_connectivity(client)
+get_ioc_report(client, entity_type, entity)
+get_ip_report(client)
+get_domain_report(client)
+get_url_report(client)
+get_file_report(client)
+scan_url(client)
+scan_file(client, file_path)
+get_analysis(client, analysis_id)
+get_file_behaviour(client)
+get_comments(client, entity_type, entity)
+get_passive_dns(client)
+get_vulnerability_report(client)
+get_vulnerability_associations(client)
}
class Action {
<<abstract>>
+Module module
+run(arguments)
}
class Module {
+register(action_class, slug)
+run()
}
class GTIScanURL {
+run(arguments)
}
class GTIScanFile {
+run(arguments)
}
class GTIIoCReport {
+run(arguments)
}
class GTIGetComments {
+run(arguments)
}
class GTIGetFileBehaviour {
+run(arguments)
}
class GTIGetPassiveDNS {
+run(arguments)
}
class GTIGetVulnerabilityReport {
+run(arguments)
}
class GTIGetVulnerabilityAssociations {
+run(arguments)
}
Action <|-- GTIScanURL
Action <|-- GTIScanFile
Action <|-- GTIIoCReport
Action <|-- GTIGetComments
Action <|-- GTIGetFileBehaviour
Action <|-- GTIGetPassiveDNS
Action <|-- GTIGetVulnerabilityReport
Action <|-- GTIGetVulnerabilityAssociations
GTIScanURL --> VTAPIConnector : uses
GTIScanFile --> VTAPIConnector : uses
GTIIoCReport --> VTAPIConnector : uses
GTIGetComments --> VTAPIConnector : uses
GTIGetFileBehaviour --> VTAPIConnector : uses
GTIGetPassiveDNS --> VTAPIConnector : uses
GTIGetVulnerabilityReport --> VTAPIConnector : uses
GTIGetVulnerabilityAssociations --> VTAPIConnector : uses
Module o--> Action
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey there - I've reviewed your changes and found some issues that need to be addressed.
- There appear to be two largely overlapping implementations of
VTAPIConnector(script_v2.pyandclient.py); consider consolidating on a single maintained version and removing or clearly scoping the other to avoid divergence and confusion. - Several modules still contain debugging artifacts (
printstatements inscan_url.py,scan_file.py,get_comments.py, andscript_v2.py, plusvt_test_results.jsonchecked into source); it would be safer to replace prints with logging and exclude generated artifacts from version control. - In
main.pythe action is registered under the slugget_vulnerability_assocations(typo) andget_vulnerability_associations.pystill has aTODOabout supported parameters; aligning the slug name and resolving the TODO now will prevent hard-to-track runtime issues later.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- There appear to be two largely overlapping implementations of `VTAPIConnector` (`script_v2.py` and `client.py`); consider consolidating on a single maintained version and removing or clearly scoping the other to avoid divergence and confusion.
- Several modules still contain debugging artifacts (`print` statements in `scan_url.py`, `scan_file.py`, `get_comments.py`, and `script_v2.py`, plus `vt_test_results.json` checked into source); it would be safer to replace prints with logging and exclude generated artifacts from version control.
- In `main.py` the action is registered under the slug `get_vulnerability_assocations` (typo) and `get_vulnerability_associations.py` still has a `TODO` about supported parameters; aligning the slug name and resolving the TODO now will prevent hard-to-track runtime issues later.
## Individual Comments
### Comment 1
<location> `GoogleThreatIntelligence/googlethreatintelligence/client.py:330-336` </location>
<code_context>
+ "type": ["integer", "string"],
+ "description": "Comment timestamp (Unix timestamp or ISO 8601 string)"
+ },
+ "votes": {
+ "type": "object",
+ "description": "Vote statistics for the comment",
</code_context>
<issue_to_address>
**issue (bug_risk):** Votes extraction in `get_comments` is incorrect when `comment.votes` is a dict.
`get_comments` assumes `comment.votes` has attributes `positive`/`negative`, but when `votes` is a dict, `getattr(..., "positive", 0)` always returns 0 instead of the actual value. As a result, reported vote counts are always 0 when the API returns a dict. You can fix this by branching on the type:
```python
votes = getattr(comment, "votes", {}) or {}
if isinstance(votes, dict):
positive = votes.get("positive", 0)
negative = votes.get("negative", 0)
else:
positive = getattr(votes, "positive", 0)
negative = getattr(votes, "negative", 0)
```
and then using `positive`/`negative` in the returned data.
</issue_to_address>
### Comment 2
<location> `GoogleThreatIntelligence/googlethreatintelligence/get_comments.py:45-35` </location>
<code_context>
+
+ if error:
+ logger.error(f"[{status}] {name}: {error}")
+ else:
+ logger.info(f"[{status}] {name}: Success")
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Fallback to default domain produces an empty entity and malformed API path.
The `else` branch comment says "Use default domain", but `entity_name` is set to `domain`, which is empty here. That will generate a path like `/domains//comments` and likely cause a 404/API error. You should either use the connector’s actual default (e.g. `entity_name = connector.domain`) or assign a specific non-empty default value.
</issue_to_address>
### Comment 3
<location> `GoogleThreatIntelligence/googlethreatintelligence/scan_file.py:28-37` </location>
<code_context>
+ connector = VTAPIConnector(api_key, url="", domain="", ip="", file_hash="", cve="")
+ with vt.Client(api_key) as client:
+
+ connector.scan_file(client, file_path)
+ analysis = connector.results[-1].response
+ print(f"API call response: {analysis}") # Debugging line
+
+ return {
+ "success": True,
+ "data": {
+ "analysis_stats": analysis.get("analysis_stats"),
+ "analysis_results": analysis.get("analysis_results"),
+ "file_path": analysis.get("file_path", file_path),
+ },
+ }
</code_context>
<issue_to_address>
**issue (bug_risk):** `GTIScanFile` assumes a successful scan and non-None response, which can crash or misreport on errors.
`connector.scan_file` logs an error and returns `None` on failure, so `connector.results[-1].response` may be `None`, causing `analysis.get(...)` to raise `AttributeError`. It also means VT failures are still reported as `"success": True` because the status is never checked.
Use the last `Result` to gate the response, e.g.:
```python
connector.scan_file(client, file_path)
result = connector.results[-1]
if result.status != "SUCCESS" or not isinstance(result.response, dict):
return {"success": False, "error": result.error or "Scan failed"}
analysis = result.response
```
This prevents crashes and ensures errors are surfaced correctly.
</issue_to_address>
### Comment 4
<location> `GoogleThreatIntelligence/googlethreatintelligence/scan_url.py:27-36` </location>
<code_context>
+ connector = VTAPIConnector(api_key, url=url, domain="", ip="", file_hash="", cve="")
+ with vt.Client(api_key) as client:
+
+ connector.scan_url(client)
+ analysis = connector.results[-1].response
+ print(f"API call response: {analysis}") # Debugging line
+
+ return {
+ "success": True,
+ "data": {
+ "analysis_stats": analysis.get("analysis_stats"),
+ "analysis_results": analysis.get("analysis_results"),
+ "url": analysis.get("url", url),
+ },
+ }
</code_context>
<issue_to_address>
**issue (bug_risk):** `GTIScanURL` mirrors the same unchecked-success / possible `None` response problem as `GTIScanFile`.
If `connector.scan_url` fails it records an ERROR and returns `None`, so `connector.results[-1].response` will be `None`. Calling `analysis.get(...)` will then raise, and the function still reports `"success": True` on VT errors.
Consider using the last `Result` object and checking its `status` before accessing `response`, and propagate failures instead of returning success when the scan fails.
</issue_to_address>
### Comment 5
<location> `GoogleThreatIntelligence/main.py:18` </location>
<code_context>
+ module.register(GTIIoCReport, "get_ioc_report")
+ module.register(GTIScanFile, "scan_file")
+ module.register(GTIGetComments, "get_comments")
+ module.register(GTIGetVulnerabilityAssociations, "get_vulnerability_assocations")
+ module.register(GTIGetFileBehaviour, "get_file_behaviour")
+ module.register(GTIScanURL, "scan_url")
</code_context>
<issue_to_address>
**issue (typo):** Action key `get_vulnerability_assocations` has a typo that may affect discoverability and consistency.
The registered name for `GTIGetVulnerabilityAssociations` is currently `"get_vulnerability_assocations"` (missing the second "i"). This may break callers or manifests that use the correctly spelled `"get_vulnerability_associations"`. Please align the registered slug with the intended spelling and any existing configuration or action JSON references.
```suggestion
module.register(GTIGetVulnerabilityAssociations, "get_vulnerability_associations")
```
</issue_to_address>
### Comment 6
<location> `GoogleThreatIntelligence/tests/test_scan_file.py:11-20` </location>
<code_context>
+@patch("googlethreatintelligence.scan_file.vt.Client")
</code_context>
<issue_to_address>
**issue (testing):** The `test_scan_file_success` mock response shape does not match what `VTAPIConnector.scan_file` actually produces, making the test misleading.
`VTAPIConnector.scan_file` returns a `Result` whose `response` is a dict:
```python
{
"analysis_stats": analysis.stats,
"analysis_results": analysis.results,
"file": file_path,
}
```
In `test_scan_file_success`, `mock_result.response` is set to `mock_analysis` (a `MagicMock`) and the test relies on `.get(...)`, which would fail with the real connector (dict vs object). The mock should mirror the real response shape, e.g.:
```python
mock_result.response = {
"analysis_stats": mock_analysis.stats,
"analysis_results": mock_analysis.results,
"file": tmp_path,
}
```
and keep the assertions on `response["data"][... ]`. This makes the test representative of the real contract and avoids false positives.
</issue_to_address>
### Comment 7
<location> `GoogleThreatIntelligence/tests/test_scan_file.py:126-60` </location>
<code_context>
+# === ADDITIONAL TEST: Empty results list ===
</code_context>
<issue_to_address>
**suggestion (testing):** The empty-results test for `scan_file` only checks for a generic error, not that the error is understandable and stable.
In `test_scan_file_empty_results`, you only check that `success` is `False` and an `"error"` key exists, but not the content of the error. Given that `GTIScanFile.run` wraps all exceptions, issues like an out-of-range `connector.results[-1]` access could surface as a vague `"list index out of range"` message and still pass this test.
Please assert that the error message clearly indicates the absence of a scan result (e.g., expect a specific message or substring that you deliberately raise/format in the action code) so the test fails for unrelated regressions that merely produce some error.
Suggested implementation:
```python
# === ADDITIONAL TEST: Empty results list ===
@patch("googlethreatintelligence.scan_file.vt.Client")
@patch("googlethreatintelligence.scan_file.VTAPIConnector")
def test_scan_file_empty_results(mock_connector_class, mock_vt_client):
"""Test behavior when connector.results is empty (edge case)"""
from googlethreatintelligence.scan_file import scan_file
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_path = tmp_file.name
tmp_file.write(b"dummy content")
try:
# Mock connector with empty results
mock_client_instance = mock_vt_client.return_value
mock_connector_instance = mock_connector_class.return_value
mock_connector_instance.results = [] # Explicitly simulate no scan results
mock_connector_instance.scan_file.return_value = None
response = scan_file(tmp_path, API_KEY)
assert response is not None
assert response["success"] is False
assert "error" in response
# The error message should clearly indicate there were no scan results,
# not just surface a generic underlying exception like "list index out of range".
assert "no scan results" in response["error"].lower()
mock_connector_instance.scan_file.assert_called_once_with(
mock_client_instance,
tmp_path,
)
mock_vt_client.assert_called_once_with(API_KEY)
finally:
os.unlink(tmp_path)
```
To fully implement this behaviour, ensure that the production code (likely in `googlethreatintelligence/scan_file.py`, in the `scan_file` function or the `GTIScanFile.run` / similar action) explicitly detects the "no results" condition and returns or raises an error message containing the phrase `"No scan results"` (case-insensitive is fine). For example, when `connector.results` is empty, construct a response with `success=False` and an `"error"` string like `"No scan results available for this file"`. That will make the new assertion in this test pass and guard against unrelated regressions that merely produce some generic error text.
</issue_to_address>
### Comment 8
<location> `GoogleThreatIntelligence/tests/test_get_comments.py:46-55` </location>
<code_context>
[email protected](
</code_context>
<issue_to_address>
**suggestion (testing):** Routing tests for `GTIGetComments` don’t cover the case where no IoC fields are provided, which is a distinct code path.
The parametrized test currently covers only the single-IoC cases. Please add a test for the scenario where no IoC fields are provided so we assert:
- which endpoint is called (e.g. `/domains//comments` vs a default domain), and
- what the action returns in that case.
This will help catch regressions if the defaulting/validation behavior for missing IoCs changes.
Suggested implementation:
```python
# Routing tests based on REAL action behavior
# =============================================================================
@pytest.mark.parametrize(
=======
# =============================================================================
# Routing tests based on REAL action behavior
# =============================================================================
@patch("googlethreatintelligence.get_comments.vt.Client")
def test_get_comments_routing_no_ioc_fields(mock_vt_client):
# Import here to avoid circular imports at module load time if any
from googlethreatintelligence.get_comments import GTIGetComments
action = GTIGetComments()
# Call the action without providing any IoC fields
result = action.run({})
mock_client = mock_vt_client.return_value
args, kwargs = mock_client.iterator.call_args
# Verify which endpoint is called when no IoC is provided
endpoint = args[0]
# At minimum, we assert that the "comments" endpoint is still used
# (refine this to assert the exact expected endpoint once confirmed).
assert endpoint.endswith("/comments")
# Verify what the action returns when no IoC is provided.
# Adjust this assertion to match the action's real behavior
# (e.g. empty list, error dict, etc.).
assert result is not None
@pytest.mark.parametrize(
```
To fully align this test with the actual `GTIGetComments` behavior, you should:
1. **Tighten the endpoint assertion**:
- If the action calls a bare comments endpoint when no IoC is provided, change:
```python
assert endpoint.endswith("/comments")
```
to something like:
```python
assert endpoint == "/comments"
```
- If the action defaults to a particular IoC (e.g. a default domain), assert that explicitly, e.g.:
```python
assert endpoint.startswith("/domains/")
```
2. **Assert the concrete return value**:
- Replace:
```python
assert result is not None
```
with an assertion that matches the real return structure. For example, if `run` returns a list:
```python
assert isinstance(result, list)
assert result == []
```
or if it returns a dict:
```python
assert isinstance(result, dict)
assert result["status"] == "error" # or "ok"
```
3. If the action requires additional context (e.g. configuration, API key, or a `params` structure different from `{}`), update the `action.run` call accordingly so the test mirrors production usage.
These refinements will ensure the “no IoC provided” path is precisely specified and guarded against regressions.
</issue_to_address>
### Comment 9
<location> `GoogleThreatIntelligence/tests/test_get_vulnerability_associations.py:12-21` </location>
<code_context>
+@patch("googlethreatintelligence.get_vulnerability_associations.vt.Client")
</code_context>
<issue_to_address>
**suggestion (testing):** There’s no test for the case where `get_vulnerability_associations` returns a non-success status such as `NOT_AVAILABLE`.
In `GTIGetVulnerabilityAssociations.run`, `success` is derived solely from `result.status == "SUCCESS"`, while `data` always returns `result.response`. Your tests only cover the implicit success case and the `vt.APIError` path. Please add a test where the last `Result` has a non-success status such as `"NOT_AVAILABLE"`, and assert that `success` is `False` while `data` still returns the structured payload. This will verify the status mapping for limited/partial responses used by the connector.
Suggested implementation:
```python
from unittest.mock import patch, MagicMock, PropertyMock
from googlethreatintelligence.get_vulnerability_associations import GTIGetVulnerabilityAssociations
import vt
# === Test constants ===
API_KEY = "FAKE_API_KEY"
IP = "8.8.8.8"
@patch("googlethreatintelligence.get_vulnerability_associations.vt.Client")
@patch("googlethreatintelligence.get_vulnerability_associations.VTAPIConnector")
def test_get_vulnerability_associations_non_success_status(
mock_connector_class,
mock_vt_client,
):
"""Test that a non-success status still returns data but marks success as False."""
# Mock VTAPIConnector instance and its behavior
mock_connector = MagicMock()
# First result could be a partial/limited success, last result is NOT_AVAILABLE
mock_connector.results = [
MagicMock(
status="SUCCESS",
response={
"vulnerabilities_count": 1,
"vulnerabilities": [
{"id": "CVE-2024-0001", "severity": "medium"},
],
},
),
MagicMock(
status="NOT_AVAILABLE",
response={
"vulnerabilities_count": 1,
"vulnerabilities": [
{"id": "CVE-2024-0002", "severity": "low"},
],
},
),
]
mock_connector_class.return_value = mock_connector
# Configure vt.Client mock (actual behavior may not be used directly by the action)
mock_client_instance = MagicMock()
mock_vt_client.return_value.__enter__.return_value = mock_client_instance
action = GTIGetVulnerabilityAssociations()
# Use the same parameter style as the success test
# (adjust to match the existing test if parameter names differ)
params = {"ip": IP, "api_key": API_KEY}
result = action.run(params)
# For a non-success terminal status, success should be False
assert result["success"] is False
# Data should still expose the payload from the last result
assert result["data"] == {
"vulnerabilities_count": 1,
"vulnerabilities": [
{"id": "CVE-2024-0002", "severity": "low"},
],
}
@patch("googlethreatintelligence.get_vulnerability_associations.vt.Client")
@patch("googlethreatintelligence.get_vulnerability_associations.VTAPIConnector")
def test_get_vulnerability_associations_success(mock_connector_class, mock_vt_client):
"""Test successful retrieval of vulnerability associations"""
# Mock VTAPIConnector instance and its behavior
mock_connector = MagicMock()
mock_connector.results = [
MagicMock(
status="SUCCESS",
response={
"vulnerabilities_count": 2,
```
Because only part of `test_get_vulnerability_associations_success` is visible, you should:
1. Align the `params` used in `test_get_vulnerability_associations_non_success_status` with whatever the success test uses. For example, if the success test calls `action.run({"api_key": API_KEY, "ip": IP})`, mirror that exactly.
2. If `GTIGetVulnerabilityAssociations.run` returns a structure slightly different from `{"success": ..., "data": ...}`, adjust the assertions to match the real keys (e.g., `result.output["success"]` or similar).
3. Ensure that the connector’s `results` list is consumed in a way that the *last* element’s `status` and `response` are what determine `success` and `data`. If your implementation instead uses the first element, adapt the test’s expectations or the connector logic so the non-success terminal state is properly verified.
</issue_to_address>
### Comment 10
<location> `GoogleThreatIntelligence/tests/test_get_file_behaviour.py:85-94` </location>
<code_context>
+ mock_client_instance.iterator.assert_called_once_with(f"/files/{FILE_HASH}/behaviours", limit=5)
+
+
+def test_get_file_behaviour_no_api_key():
+ """Test handling of missing API key"""
+ action = GTIGetFileBehaviour()
+
+ # No API key configured
+ with patch.object(type(action.module), "configuration", new_callable=MagicMock) as mock_config:
+ mock_config.return_value = {}
+
+ response = action.run({"entity_type": "files", "entity": FILE_HASH})
+
+ assert response is not None
+ assert isinstance(response, dict)
+ assert response.get("success") is False
+ assert "API key" in response.get("error", "")
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test for missing `file_hash` in `GTIGetFileBehaviour` to exercise that edge case.
Right now `GTIGetFileBehaviour.run` pulls `file_hash` via `arguments.get("file_hash", "")` and passes it directly to `VTAPIConnector` without checking for emptiness. Tests only cover the "hash provided" and "missing API key" scenarios. Please add a test that calls `run({})` (no `file_hash`) and asserts the current behavior (e.g., clear error vs. connector invoked with an empty hash and a predictable failure), so this edge case is documented and guarded against regressions.
</issue_to_address>
### Comment 11
<location> `GoogleThreatIntelligence/README.md:11` </location>
<code_context>
+- Proper file existence checks before scanning
+
+### 2. **Error Handling**
+- Try-catch blocks for all API calls
+- Specific exception handling for `vt.APIError`, `FileNotFoundError`, `IOError`
+- Detailed error logging and tracking
</code_context>
<issue_to_address>
**suggestion (typo):** Consider using Python-idiomatic terminology "try/except" instead of "try-catch".
Since this is Python documentation, please change "Try-catch" to "try/except" to align with standard Python terminology.
```suggestion
- try/except blocks for all API calls
```
</issue_to_address>
### Comment 12
<location> `GoogleThreatIntelligence/README.md:49-50` </location>
<code_context>
+# Set your API key
+export VT_API_KEY='your_actual_api_key'
+
+# Run the script
+python script.py
+
+# Optional: test with a file
</code_context>
<issue_to_address>
**issue (bug_risk):** The usage example may reference a non-existent `script.py` file.
This example uses `python script.py`, but the repo only includes `main.py` and `script_v2.py`. If `script.py` doesn’t exist, this command will fail. Please update the example to use the actual entry point (e.g., `python main.py` or `python -m googlethreatintelligence`).
</issue_to_address>
### Comment 13
<location> `GoogleThreatIntelligence/googlethreatintelligence/script_v2.py:40` </location>
<code_context>
+ error: Optional[str] = None
+
+
+class VTAPIConnector:
+ """VirusTotal API Connector Class using vt-py"""
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider reusing the existing VTAPIConnector and moving the new testing/CLI logic into a separate harness layer to keep one connector implementation and simplify responsibilities.
You can keep all functionality while significantly reducing complexity by reusing the existing connector and separating the “test harness / CLI” concerns from the connector itself.
### 1. Avoid a second `VTAPIConnector` implementation
Instead of redefining `VTAPIConnector` here, import and extend the one from `client.py`, or wrap it in a separate class focused on orchestration.
For example, in this new file:
```python
# script_v2.py (or similar)
from googlethreatintelligence.client import VTAPIConnector # existing implementation
@dataclass
class Result:
name: str
method: str
endpoint: str
status: str
response: Any
error: Optional[str] = None
class VTAPITestHarness:
def __init__(self, connector: VTAPIConnector):
self.connector = connector
self.results: list[Result] = []
def _add_result(self, name, method, endpoint, status, response, error=None):
# reuse/mirror existing helper(s) from client.py if possible
self.results.append(Result(name, method, endpoint, status, response, error))
```
Then move `run_all_tests`, `run_all_tests_smart`, `save_results`, and the CLI `main()` into `VTAPITestHarness` / free functions that operate on the single connector instance.
### 2. Move orchestration logic out of the connector
Methods like `run_all_tests`, `run_all_tests_smart`, `save_results`, and `main` mix CLI/testing concerns with low‑level API calls. Those can become top-level functions or methods of the harness, not the connector.
For example:
```python
def run_all_tests_smart(harness: VTAPITestHarness, test_file_path: Optional[str] = None):
logger.info("Starting VirusTotal API tests...")
with vt.Client(harness.connector.api_key) as client:
harness.connector.scan_url(client) # call existing connector
# harness._add_result(...) if you want extra summarization
if test_file_path is not None:
harness.connector.scan_file(client, test_file_path)
# comments/passive DNS/vuln calls here...
```
This keeps `VTAPIConnector` in `client.py` focused on API operations only, and all printing / sleeping / iteration-testing in a separate harness.
### 3. Share common helpers instead of reimplementing
`_add_result`, `_make_serializable`, and `save_results` are generic and can be shared:
- Move them into `client.py` (or a small `utils.py`) and import here.
- Or at least have your harness call existing helpers instead of duplicating logic and output shaping.
Example of a shared serializer:
```python
# utils.py
def make_serializable(obj: Any) -> Any:
if isinstance(obj, dict):
return {k: make_serializable(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [make_serializable(x) for x in obj]
if hasattr(obj, "__dict__"):
return str(obj)
return obj
```
Then in your harness:
```python
from .utils import make_serializable
def _add_result(..., response: Any, ...):
if response is not None:
response = make_serializable(response)
...
```
### 4. Keep the new features, but as harness behavior
All the “FULLY tests the iterator”, smart entity selection, and summary printing are valuable, but they’re harness/test behavior rather than part of the core connector abstraction. Keeping them in a separate `VTAPITestHarness` (or similar) that uses the existing `VTAPIConnector` will:
- Eliminate the duplicate connector class.
- Keep your new testing flows and output exactly as-is.
- Make future changes to API calls/response shapes happen in one place (`client.py`).
</issue_to_address>
### Comment 14
<location> `GoogleThreatIntelligence/googlethreatintelligence/client.py:272` </location>
<code_context>
+ # IMPORTANT: Fully consume the iterator to test it properly
+ behaviours = []
+ for behaviour in behaviours_it:
+ behaviour_data = {
+ "sandbox_name": behaviour.sandbox_name if hasattr(behaviour, "sandbox_name") else None,
+ }
</code_context>
<issue_to_address>
**issue (complexity):** Consider introducing small helper methods for attribute access and iterator collection to remove duplicated `hasattr`/`getattr` and list-building logic across the class.
You can reduce a lot of incidental complexity by introducing a couple of tiny helpers and reusing them across methods, especially around `hasattr`/`getattr` and iterator handling.
### 1. Simplify repeated `hasattr` / `getattr` patterns
You have many patterns like:
```python
sandbox_name = behaviour.sandbox_name if hasattr(behaviour, "sandbox_name") else None
"ip_address": resolution.ip_address if hasattr(resolution, "ip_address") else None
"date": str(getattr(comment, "date", None)),
```
and particularly complex ones like:
```python
"votes": {
"positive": (
getattr(getattr(comment, "votes", {}), "positive", 0)
if hasattr(comment, "votes")
else 0
),
"negative": (
getattr(getattr(comment, "votes", {}), "negative", 0)
if hasattr(comment, "votes")
else 0
),
},
```
This can be flattened by a small helper:
```python
def _attr(self, obj, name, default=None):
return getattr(obj, name, default)
```
And for your votes block you can treat `votes` as a mapping-like object:
```python
def _votes_dict(self, comment):
votes = getattr(comment, "votes", None) or {}
return {
"positive": getattr(votes, "positive", 0) if not isinstance(votes, dict) else votes.get("positive", 0),
"negative": getattr(votes, "negative", 0) if not isinstance(votes, dict) else votes.get("negative", 0),
}
```
Then `get_comments` becomes much more readable:
```python
for comment in comments_it:
comments.append(
{
"text": self._attr(comment, "text"),
"date": str(self._attr(comment, "date")),
"votes": self._votes_dict(comment),
"author": self._attr(comment, "author"),
}
)
```
Similarly, in other methods:
```python
behaviour_data = {
"sandbox_name": self._attr(behaviour, "sandbox_name"),
"processes_created": len(self._attr(behaviour, "processes_created", [])),
"files_written": len(self._attr(behaviour, "files_written", [])),
# ...
}
```
This keeps the same behaviour but removes a lot of noisy `hasattr`/`getattr` branching.
### 2. Factor out iterator‑to‑list patterns
Several methods fully consume an iterator and build lists with per-item dicts: `get_file_behaviour`, `get_passive_dns`, `get_vulnerability_associations`, `get_comments`.
A tiny internal helper + per‑use mapper keeps the behaviour but avoids copy‑paste:
```python
def _collect(self, iterator, mapper):
return [mapper(item) for item in iterator]
```
Example for passive DNS:
```python
def _map_resolution(self, resolution):
return {
"ip_address": self._attr(resolution, "ip_address"),
"host_name": self._attr(resolution, "host_name"),
"date": str(self._attr(resolution, "date")),
"resolver": self._attr(resolution, "resolver"),
}
def get_passive_dns(self, client: vt.Client):
try:
resolutions_it = client.iterator(f"/domains/{self.domain}/resolutions", limit=40)
resolutions = self._collect(resolutions_it, self._map_resolution)
unique_ips = {r["ip_address"] for r in resolutions if r["ip_address"]}
self._add_result(
"PASSIVE_DNS",
"GET",
f"/api/v3/domains/{self.domain}/resolutions",
"SUCCESS",
{
"resolutions_count": len(resolutions),
"unique_ips_count": len(unique_ips),
"unique_ips": list(unique_ips),
"resolutions": resolutions,
},
)
except vt.APIError as e:
...
```
You can apply the same pattern to behaviours and vulnerabilities with small `*_map_*` functions. This keeps all current functionality (including full iterator consumption) while making each method shorter and easier to scan.
</issue_to_address>
### Comment 15
<location> `GoogleThreatIntelligence/googlethreatintelligence/client.py:467-472` </location>
<code_context>
if hasattr(vuln, "cvss"):
if isinstance(vuln.cvss, dict):
vuln_data["cvss_score"] = vuln.cvss.get("score")
vuln_data["cvss_severity"] = vuln.cvss.get("severity")
if vuln.cvss.get("severity") in ["HIGH", "CRITICAL"]:
high_severity_count += 1
</code_context>
<issue_to_address>
**suggestion (code-quality):** Merge nested if conditions ([`merge-nested-ifs`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/merge-nested-ifs))
```suggestion
if hasattr(vuln, "cvss") and isinstance(vuln.cvss, dict):
vuln_data["cvss_score"] = vuln.cvss.get("score")
vuln_data["cvss_severity"] = vuln.cvss.get("severity")
if vuln.cvss.get("severity") in ["HIGH", "CRITICAL"]:
high_severity_count += 1
```
<br/><details><summary>Explanation</summary>Too much nesting can make code difficult to understand, and this is especially
true in Python, where there are no brackets to help out with the delineation of
different nesting levels.
Reading deeply nested code is confusing, since you have to keep track of which
conditions relate to which levels. We therefore strive to reduce nesting where
possible, and the situation where two `if` conditions can be combined using
`and` is an easy win.
</details>
</issue_to_address>
### Comment 16
<location> `GoogleThreatIntelligence/googlethreatintelligence/script_v2.py:434-439` </location>
<code_context>
if hasattr(vuln, "cvss"):
if isinstance(vuln.cvss, dict):
vuln_data["cvss_score"] = vuln.cvss.get("score")
vuln_data["cvss_severity"] = vuln.cvss.get("severity")
if vuln.cvss.get("severity") in ["HIGH", "CRITICAL"]:
high_severity_count += 1
</code_context>
<issue_to_address>
**suggestion (code-quality):** Merge nested if conditions ([`merge-nested-ifs`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/merge-nested-ifs))
```suggestion
if hasattr(vuln, "cvss") and isinstance(vuln.cvss, dict):
vuln_data["cvss_score"] = vuln.cvss.get("score")
vuln_data["cvss_severity"] = vuln.cvss.get("severity")
if vuln.cvss.get("severity") in ["HIGH", "CRITICAL"]:
high_severity_count += 1
```
<br/><details><summary>Explanation</summary>Too much nesting can make code difficult to understand, and this is especially
true in Python, where there are no brackets to help out with the delineation of
different nesting levels.
Reading deeply nested code is confusing, since you have to keep track of which
conditions relate to which levels. We therefore strive to reduce nesting where
possible, and the situation where two `if` conditions can be combined using
`and` is an easy win.
</details>
</issue_to_address>
### Comment 17
<location> `GoogleThreatIntelligence/googlethreatintelligence/script_v2.py:566` </location>
<code_context>
success_count = sum(1 for r in self.results if r.status == "SUCCESS")
</code_context>
<issue_to_address>
**suggestion (code-quality):** Simplify constant sum() call ([`simplify-constant-sum`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/simplify-constant-sum))
```suggestion
success_count = sum(bool(r.status == "SUCCESS")
```
<br/><details><summary>Explanation</summary>As `sum` add the values it treats `True` as `1`, and `False` as `0`. We make use
of this fact to simplify the generator expression inside the `sum` call.
</details>
</issue_to_address>
### Comment 18
<location> `GoogleThreatIntelligence/googlethreatintelligence/script_v2.py:567` </location>
<code_context>
error_count = sum(1 for r in self.results if r.status == "ERROR")
</code_context>
<issue_to_address>
**suggestion (code-quality):** Simplify constant sum() call ([`simplify-constant-sum`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/simplify-constant-sum))
```suggestion
error_count = sum(bool(r.status == "ERROR")
```
<br/><details><summary>Explanation</summary>As `sum` add the values it treats `True` as `1`, and `False` as `0`. We make use
of this fact to simplify the generator expression inside the `sum` call.
</details>
</issue_to_address>
### Comment 19
<location> `GoogleThreatIntelligence/googlethreatintelligence/script_v2.py:568` </location>
<code_context>
not_available_count = sum(1 for r in self.results if r.status == "NOT_AVAILABLE")
</code_context>
<issue_to_address>
**suggestion (code-quality):** Simplify constant sum() call ([`simplify-constant-sum`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/simplify-constant-sum))
```suggestion
not_available_count = sum(bool(r.status == "NOT_AVAILABLE")
```
<br/><details><summary>Explanation</summary>As `sum` add the values it treats `True` as `1`, and `False` as `0`. We make use
of this fact to simplify the generator expression inside the `sum` call.
</details>
</issue_to_address>
### Comment 20
<location> `GoogleThreatIntelligence/googlethreatintelligence/client.py:324-345` </location>
<code_context>
def get_comments(self, client: vt.Client, entity_type: str, entity: str):
"""Get comments for an entity - FULLY tests the iterator"""
try:
# Special case: URLs must be base64url encoded without "=" padding
if entity_type == "urls":
import base64
entity = base64.urlsafe_b64encode(entity.encode()).decode().strip("=")
path = f"/{entity_type}/{entity}/comments"
comments_it = client.iterator(path, limit=10)
comments = []
for comment in comments_it:
comments.append(
{
"text": getattr(comment, "text", None),
"date": str(getattr(comment, "date", None)),
"votes": {
"positive": (
getattr(getattr(comment, "votes", {}), "positive", 0)
if hasattr(comment, "votes")
else 0
),
"negative": (
getattr(getattr(comment, "votes", {}), "negative", 0)
if hasattr(comment, "votes")
else 0
),
},
"author": getattr(comment, "author", None),
}
)
self._add_result(
"GET_COMMENTS",
"GET",
f"/api/v3/{entity_type}/{entity}/comments",
"SUCCESS",
{"comments_count": len(comments), "entity": entity, "comments": comments},
)
except vt.APIError as e:
self._add_result("GET_COMMENTS", "GET", f"/api/v3/{entity_type}/{entity}/comments", "ERROR", None, str(e))
</code_context>
<issue_to_address>
**suggestion (code-quality):** Convert for loop into list comprehension ([`list-comprehension`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/list-comprehension/))
```suggestion
comments = [
{
"text": getattr(comment, "text", None),
"date": str(getattr(comment, "date", None)),
"votes": {
"positive": (
getattr(getattr(comment, "votes", {}), "positive", 0)
if hasattr(comment, "votes")
else 0
),
"negative": (
getattr(getattr(comment, "votes", {}), "negative", 0)
if hasattr(comment, "votes")
else 0
),
},
"author": getattr(comment, "author", None),
}
for comment in comments_it
]
```
</issue_to_address>
### Comment 21
<location> `GoogleThreatIntelligence/googlethreatintelligence/client.py:424` </location>
<code_context>
def get_vulnerability_report(self, client: vt.Client):
"""Get vulnerability report"""
try:
# Correct path for vulnerability collections
# `https://www.virustotal.com/api/v3/collections/vulnerability--cve-2010-3765`
vuln = client.get_object(f"/collections/vulnerability--{self.cve}")
vuln_data = {
"cve": self.cve,
"id": vuln.id if hasattr(vuln, "id") else None,
}
# Extract additional vulnerability details
if hasattr(vuln, "title"):
vuln_data["title"] = vuln.title
if hasattr(vuln, "description"):
vuln_data["description"] = (
vuln.description[:200] + "..." if len(vuln.description) > 200 else vuln.description
)
if hasattr(vuln, "cvss"):
vuln_data["cvss"] = vuln.cvss
self._add_result(
"VULN_REPORT",
"GET",
f"/api/v3/intelligence/vulnerability_collections/{self.cve}",
"SUCCESS",
vuln_data,
)
except vt.APIError as e:
logger.warning(f"OUCH! Vulnerability report not available (may require Premium API): {e}")
self._add_result(
"VULN_REPORT",
"GET",
f"/api/v3/intelligence/vulnerability_collections/{self.cve}",
"NOT_AVAILABLE",
None,
f"May require Premium API: {str(e)}",
)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Use f-string instead of string concatenation ([`use-fstring-for-concatenation`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-fstring-for-concatenation/))
```suggestion
f"{vuln.description[:200]}..."
if len(vuln.description) > 200
else vuln.description
```
</issue_to_address>
### Comment 22
<location> `GoogleThreatIntelligence/googlethreatintelligence/client.py:476` </location>
<code_context>
def get_vulnerability_associations(self, client: vt.Client):
"""Get vulnerability associations for an entity - FULLY tests the iterator"""
try:
# IMPORTANT: Fully consume the iterator to test it properly
vulns_it = client.iterator(f"/ip_addresses/{self.ip}/vulnerabilities", limit=20)
vulnerabilities = []
cve_ids = set()
high_severity_count = 0
for vuln in vulns_it:
vuln_data = {
"id": vuln.id if hasattr(vuln, "id") else None,
}
# Extract CVE information
if hasattr(vuln, "cve_id"):
vuln_data["cve_id"] = vuln.cve_id
cve_ids.add(vuln.cve_id)
if hasattr(vuln, "cvss"):
if isinstance(vuln.cvss, dict):
vuln_data["cvss_score"] = vuln.cvss.get("score")
vuln_data["cvss_severity"] = vuln.cvss.get("severity")
if vuln.cvss.get("severity") in ["HIGH", "CRITICAL"]:
high_severity_count += 1
if hasattr(vuln, "description"):
vuln_data["description"] = (
vuln.description[:150] + "..." if len(vuln.description) > 150 else vuln.description
)
if hasattr(vuln, "published_date"):
vuln_data["published_date"] = str(vuln.published_date)
vulnerabilities.append(vuln_data)
self._add_result(
"VULN_ASSOCIATIONS",
"GET",
f"/api/v3/ip_addresses/{self.ip}/vulnerabilities",
"SUCCESS",
{
"vulnerabilities_count": len(vulnerabilities),
"unique_cves_count": len(cve_ids),
"high_severity_count": high_severity_count,
"cve_ids": list(cve_ids),
"vulnerabilities": vulnerabilities,
},
)
logger.info(
f"Retrieved and processed {len(vulnerabilities)} vulnerability associations ({len(cve_ids)} unique CVEs)"
)
except vt.APIError as e:
logger.warning(f"Vulnerability associations not available (may require Premium API): {e}")
self._add_result(
"VULN_ASSOCIATIONS",
"GET",
f"/api/v3/ip_addresses/{self.ip}/vulnerabilities",
"NOT_AVAILABLE",
None,
f"May require Premium API: {str(e)}",
)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Use f-string instead of string concatenation ([`use-fstring-for-concatenation`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-fstring-for-concatenation/))
```suggestion
f"{vuln.description[:150]}..."
if len(vuln.description) > 150
else vuln.description
```
</issue_to_address>
### Comment 23
<location> `GoogleThreatIntelligence/googlethreatintelligence/get_comments.py:32` </location>
<code_context>
def run(self, arguments: dict):
try:
api_key = self.module.configuration.get("api_key")
if not api_key:
return {"success": False, "error": "API key not configured"}
domain = arguments.get("domain", "")
ip = arguments.get("ip", "")
url = arguments.get("url", "")
file_hash = arguments.get("file_hash", "")
connector = VTAPIConnector(api_key, domain=domain, ip=ip, url=url, file_hash=file_hash)
with vt.Client(api_key) as client:
# Determine which entity to query for comments
# Priority: domain > ip > url > file_hash
# Only one can be provided at a time because of input constraints
entity_type = None
entity_name = None
if domain != "":
entity_type = "domains"
entity_name = domain
elif ip != "":
entity_type = "ip_addresses"
entity_name = ip
elif url != "":
entity_type = "urls"
entity_name = url
elif file_hash != "":
entity_type = "files"
entity_name = file_hash
else:
# Use default domain
entity_type = "domains"
entity_name = domain
print(f"Getting comments for {entity_type}: {entity_name}")
# connector.get_comments(client, entity_type)
connector.get_comments(client, entity_type, entity_name)
result = connector.results[-1]
print("Comments:", result.response)
return {"success": result.status == "SUCCESS", "data": result.response, "error": result.error}
except Exception as e:
return {"success": False, "error": str(e)}
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Merge duplicate blocks in conditional ([`merge-duplicate-blocks`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/merge-duplicate-blocks/))
- Remove redundant conditional [×3] ([`remove-redundant-if`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-redundant-if/))
</issue_to_address>
### Comment 24
<location> `GoogleThreatIntelligence/googlethreatintelligence/script_v2.py:391` </location>
<code_context>
def get_vulnerability_report(self, client: vt.Client):
"""Get vulnerability report"""
try:
# Correct path for vulnerability collections
print("Getting vuln", self.cve)
# `https://www.virustotal.com/api/v3/collections/vulnerability--cve-2010-3765`
vuln = client.get_object(f"/collections/vulnerability--{self.cve}")
print("VULN is:", vuln)
vuln_data = {
"cve": self.cve,
"id": vuln.id if hasattr(vuln, "id") else None,
}
# Extract additional vulnerability details
if hasattr(vuln, "title"):
vuln_data["title"] = vuln.title
if hasattr(vuln, "description"):
vuln_data["description"] = (
vuln.description[:200] + "..." if len(vuln.description) > 200 else vuln.description
)
if hasattr(vuln, "cvss"):
vuln_data["cvss"] = vuln.cvss
self._add_result(
"VULN_REPORT",
"GET",
f"/api/v3/intelligence/vulnerability_collections/{self.cve}",
"SUCCESS",
vuln_data,
)
except vt.APIError as e:
logger.warning(f"OUCH! Vulnerability report not available (may require Premium API): {e}")
self._add_result(
"VULN_REPORT",
"GET",
f"/api/v3/intelligence/vulnerability_collections/{self.cve}",
"NOT_AVAILABLE",
None,
f"May require Premium API: {str(e)}",
)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Use f-string instead of string concatenation ([`use-fstring-for-concatenation`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-fstring-for-concatenation/))
```suggestion
f"{vuln.description[:200]}..."
if len(vuln.description) > 200
else vuln.description
```
</issue_to_address>
### Comment 25
<location> `GoogleThreatIntelligence/googlethreatintelligence/script_v2.py:443` </location>
<code_context>
def get_vulnerability_associations(self, client: vt.Client):
"""Get vulnerability associations for an entity - FULLY tests the iterator"""
try:
# IMPORTANT: Fully consume the iterator to test it properly
vulns_it = client.iterator(f"/ip_addresses/{self.ip}/vulnerabilities", limit=20)
vulnerabilities = []
cve_ids = set()
high_severity_count = 0
for vuln in vulns_it:
vuln_data = {
"id": vuln.id if hasattr(vuln, "id") else None,
}
# Extract CVE information
if hasattr(vuln, "cve_id"):
vuln_data["cve_id"] = vuln.cve_id
cve_ids.add(vuln.cve_id)
if hasattr(vuln, "cvss"):
if isinstance(vuln.cvss, dict):
vuln_data["cvss_score"] = vuln.cvss.get("score")
vuln_data["cvss_severity"] = vuln.cvss.get("severity")
if vuln.cvss.get("severity") in ["HIGH", "CRITICAL"]:
high_severity_count += 1
if hasattr(vuln, "description"):
vuln_data["description"] = (
vuln.description[:150] + "..." if len(vuln.description) > 150 else vuln.description
)
if hasattr(vuln, "published_date"):
vuln_data["published_date"] = str(vuln.published_date)
vulnerabilities.append(vuln_data)
self._add_result(
"VULN_ASSOCIATIONS",
"GET",
f"/api/v3/ip_addresses/{self.ip}/vulnerabilities",
"SUCCESS",
{
"vulnerabilities_count": len(vulnerabilities),
"unique_cves_count": len(cve_ids),
"high_severity_count": high_severity_count,
"cve_ids": list(cve_ids),
"vulnerabilities": vulnerabilities,
},
)
logger.info(
f"Retrieved and processed {len(vulnerabilities)} vulnerability associations ({len(cve_ids)} unique CVEs)"
)
except vt.APIError as e:
logger.warning(f"Vulnerability associations not available (may require Premium API): {e}")
self._add_result(
"VULN_ASSOCIATIONS",
"GET",
f"/api/v3/ip_addresses/{self.ip}/vulnerabilities",
"NOT_AVAILABLE",
None,
f"May require Premium API: {str(e)}",
)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Use f-string instead of string concatenation ([`use-fstring-for-concatenation`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-fstring-for-concatenation/))
```suggestion
f"{vuln.description[:150]}..."
if len(vuln.description) > 150
else vuln.description
```
</issue_to_address>
### Comment 26
<location> `GoogleThreatIntelligence/googlethreatintelligence/script_v2.py:489-499` </location>
<code_context>
def run_all_tests(self, test_file_path: Optional[str] = None):
"""Run all API tests"""
logger.info("Starting VirusTotal API tests...")
logger.info(
f"Using: domain={self.domain}, ip={self.ip}, url={self.url}, file_hash={self.file_hash}, cve={self.cve}"
)
with vt.Client(self.api_key) as client:
# Additional data - FULLY test iterators
logger.info("Testing iterators (comments, passive DNS, vulnerability associations)...")
# Get comments - default to domain
self.get_comments(client, "domains") # Use plural "domains"
print("Comment (domain):", self.results[-1].response)
time.sleep(0.5)
# Get vulnerability report
self.get_vulnerability_report(client)
print("VULN REPORT:", self.results[-1].response)
time.sleep(0.5)
logger.info("All tests completed!")
</code_context>
<issue_to_address>
**issue (code-quality):** Extract code out into method ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
</issue_to_address>
### Comment 27
<location> `GoogleThreatIntelligence/googlethreatintelligence/script_v2.py:527` </location>
<code_context>
def run_all_tests_smart(self, test_file_path: Optional[str] = None):
"""Run all API tests - intelligently choose entity type"""
logger.info("Starting VirusTotal API tests...")
logger.info(
f"Using: domain={self.domain}, ip={self.ip}, url={self.url}, file_hash={self.file_hash}, cve={self.cve}"
)
with vt.Client(self.api_key) as client:
logger.info("Testing iterators (comments, passive DNS, vulnerability associations)...")
self.scan_url(client)
print("SCAN URL:", self.results[-1].response)
time.sleep(0.5)
if test_file_path is not None:
self.scan_file(client, test_file_path)
print("SCAN FILE:", self.results[-1].response)
time.sleep(0.5)
# Determine which entity to query for comments
# Priority: domain > ip > url > file_hash
entity_type = None
entity_name = None
if self.domain:
entity_type = "domains"
entity_name = self.domain
elif self.ip:
entity_type = "ip_addresses"
entity_name = self.ip
elif self.url:
entity_type = "urls"
entity_name = self.url
elif self.file_hash:
entity_type = "files"
entity_name = self.file_hash
else:
# Use default domain
entity_type = "domains"
entity_name = self.domain
print(f"Getting comments for {entity_type}: {entity_name}")
self.get_comments(client, entity_type)
print("Comments:", self.results[-1].response)
time.sleep(0.5)
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Merge duplicate blocks in conditional ([`merge-duplicate-blocks`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/merge-duplicate-blocks/))
- Remove redundant conditional [×3] ([`remove-redundant-if`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-redundant-if/))
</issue_to_address>
### Comment 28
<location> `GoogleThreatIntelligence/googlethreatintelligence/script_v2.py:571` </location>
<code_context>
def save_results(self, output_file: str = "vt_test_results.json"):
"""Save test results to JSON file"""
results_dict = [asdict(r) for r in self.results]
with open(output_file, "w", encoding="utf-8") as f:
json.dump(results_dict, f, indent=2, ensure_ascii=False)
logger.info(f"Results saved to {output_file}")
# Print summary
success_count = sum(1 for r in self.results if r.status == "SUCCESS")
error_count = sum(1 for r in self.results if r.status == "ERROR")
not_available_count = sum(1 for r in self.results if r.status == "NOT_AVAILABLE")
print(f"\n{'='*60}")
print(f"TEST SUMMARY")
print(f"{'='*60}")
print(f"Total tests: {len(self.results)}")
print(f"Successful: {success_count}")
print(f"Failed: {error_count}")
print(f"Not Available (Premium API): {not_available_count}")
print(f"{'='*60}\n")
</code_context>
<issue_to_address>
**suggestion (code-quality):** Replace f-string with no interpolated values with string ([`remove-redundant-fstring`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-redundant-fstring/))
```suggestion
print("TEST SUMMARY")
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| "votes": { | ||
| "positive": ( | ||
| getattr(getattr(comment, "votes", {}), "positive", 0) | ||
| if hasattr(comment, "votes") | ||
| else 0 | ||
| ), | ||
| "negative": ( |
There was a problem hiding this comment.
issue (bug_risk): Votes extraction in get_comments is incorrect when comment.votes is a dict.
get_comments assumes comment.votes has attributes positive/negative, but when votes is a dict, getattr(..., "positive", 0) always returns 0 instead of the actual value. As a result, reported vote counts are always 0 when the API returns a dict. You can fix this by branching on the type:
votes = getattr(comment, "votes", {}) or {}
if isinstance(votes, dict):
positive = votes.get("positive", 0)
negative = votes.get("negative", 0)
else:
positive = getattr(votes, "positive", 0)
negative = getattr(votes, "negative", 0)and then using positive/negative in the returned data.
|
|
||
| if domain != "": | ||
| entity_type = "domains" | ||
| entity_name = domain |
There was a problem hiding this comment.
issue (bug_risk): Fallback to default domain produces an empty entity and malformed API path.
The else branch comment says "Use default domain", but entity_name is set to domain, which is empty here. That will generate a path like /domains//comments and likely cause a 404/API error. You should either use the connector’s actual default (e.g. entity_name = connector.domain) or assign a specific non-empty default value.
| connector.scan_file(client, file_path) | ||
| analysis = connector.results[-1].response | ||
| print(f"API call response: {analysis}") # Debugging line | ||
|
|
||
| return { | ||
| "success": True, | ||
| "data": { | ||
| "analysis_stats": analysis.get("analysis_stats"), | ||
| "analysis_results": analysis.get("analysis_results"), | ||
| "file_path": analysis.get("file_path", file_path), |
There was a problem hiding this comment.
issue (bug_risk): GTIScanFile assumes a successful scan and non-None response, which can crash or misreport on errors.
connector.scan_file logs an error and returns None on failure, so connector.results[-1].response may be None, causing analysis.get(...) to raise AttributeError. It also means VT failures are still reported as "success": True because the status is never checked.
Use the last Result to gate the response, e.g.:
connector.scan_file(client, file_path)
result = connector.results[-1]
if result.status != "SUCCESS" or not isinstance(result.response, dict):
return {"success": False, "error": result.error or "Scan failed"}
analysis = result.responseThis prevents crashes and ensures errors are surfaced correctly.
| connector.scan_url(client) | ||
| analysis = connector.results[-1].response | ||
| print(f"API call response: {analysis}") # Debugging line | ||
|
|
||
| return { | ||
| "success": True, | ||
| "data": { | ||
| "analysis_stats": analysis.get("analysis_stats"), | ||
| "analysis_results": analysis.get("analysis_results"), | ||
| "url": analysis.get("url", url), |
There was a problem hiding this comment.
issue (bug_risk): GTIScanURL mirrors the same unchecked-success / possible None response problem as GTIScanFile.
If connector.scan_url fails it records an ERROR and returns None, so connector.results[-1].response will be None. Calling analysis.get(...) will then raise, and the function still reports "success": True on VT errors.
Consider using the last Result object and checking its status before accessing response, and propagate failures instead of returning success when the scan fails.
| module.register(GTIIoCReport, "get_ioc_report") | ||
| module.register(GTIScanFile, "scan_file") | ||
| module.register(GTIGetComments, "get_comments") | ||
| module.register(GTIGetVulnerabilityAssociations, "get_vulnerability_assocations") |
There was a problem hiding this comment.
issue (typo): Action key get_vulnerability_assocations has a typo that may affect discoverability and consistency.
The registered name for GTIGetVulnerabilityAssociations is currently "get_vulnerability_assocations" (missing the second "i"). This may break callers or manifests that use the correctly spelled "get_vulnerability_associations". Please align the registered slug with the intended spelling and any existing configuration or action JSON references.
| module.register(GTIGetVulnerabilityAssociations, "get_vulnerability_assocations") | |
| module.register(GTIGetVulnerabilityAssociations, "get_vulnerability_associations") |
|
Review the following changes in direct dependencies. Learn more about Socket for GitHub.
|
|
I've added: "category": "threat-intelligence" in manifest.json: Is there a list of predefined categories @CharlesLR-sekoia ? |
Summary by Sourcery
Add a new Google Threat Intelligence (VirusTotal) automation module integrating vt-py with Sekoia actions for scanning and intelligence retrieval, plus packaging, runtime wiring, and tests.
New Features:
Enhancements:
Build:
Documentation:
Tests: