feat(TheHiveV5):add support of private PKI#1955
feat(TheHiveV5):add support of private PKI#1955CharlesLR-sekoia wants to merge 3 commits intodevelopfrom
Conversation
Reviewer's GuideAdds optional private PKI support to TheHive V5 connector by introducing a helper to prepare the TLS verify parameter from an inline CA certificate, wiring it through connector/actions, exposing it in configuration, and covering it with unit tests and documentation/version updates. Sequence diagram for TLS verify parameter preparation with private PKIsequenceDiagram
actor User
participant ModuleConfiguration
participant TheHiveCreateAlertV5
participant VerifyHelper as _prepare_verify_param
participant Tempfile
participant TheHiveApi
User->>ModuleConfiguration: set base_url, apikey, organisation
User->>ModuleConfiguration: set verify_certificate, ca_certificate
User->>TheHiveCreateAlertV5: run(arguments)
TheHiveCreateAlertV5->>ModuleConfiguration: get verify_certificate
ModuleConfiguration-->>TheHiveCreateAlertV5: verify bool
TheHiveCreateAlertV5->>ModuleConfiguration: get ca_certificate
ModuleConfiguration-->>TheHiveCreateAlertV5: ca_certificate string_or_null
TheHiveCreateAlertV5->>VerifyHelper: _prepare_verify_param(verify, ca_certificate)
alt verify is false
VerifyHelper-->>TheHiveCreateAlertV5: verify_param = false
else verify is true and ca_certificate is provided
VerifyHelper->>Tempfile: create pem file
Tempfile-->>VerifyHelper: ca_file path
VerifyHelper-->>TheHiveCreateAlertV5: verify_param = ca_file path
else verify is true and no ca_certificate
VerifyHelper-->>TheHiveCreateAlertV5: verify_param = true
end
TheHiveCreateAlertV5->>TheHiveApi: init(url, apikey, organisation, verify_param)
TheHiveApi-->>TheHiveCreateAlertV5: api instance
TheHiveCreateAlertV5-->>User: result (created alert)
Class diagram for TheHiveV5 connector TLS verification changesclassDiagram
class TheHiveConnector {
+TheHiveConnector(url str, api_key str, organisation str, verify bool, ca_certificate str)
-api TheHiveApi
}
class TheHiveCreateAlertV5 {
+run(arguments dict_str_any) OutputAlert
}
class TheHiveAddCommentV5 {
+run(arguments dict_str_any) OutputComment
}
class TheHiveAddObservableV5 {
+run(arguments dict_str_any) dict_str_list
}
class TheHiveUploadLogsV5 {
+run(arguments dict_str_any) dict_str_list_dict
}
class _prepare_verify_param {
+_prepare_verify_param(verify bool, ca_certificate str) bool_or_str
}
class TheHiveApi {
+TheHiveApi(url str, apikey str, organisation str, verify bool_or_str)
}
TheHiveConnector ..> _prepare_verify_param : uses
TheHiveCreateAlertV5 ..> _prepare_verify_param : uses
TheHiveAddCommentV5 ..> TheHiveConnector : creates
TheHiveAddObservableV5 ..> TheHiveConnector : creates
TheHiveUploadLogsV5 ..> TheHiveConnector : creates
TheHiveCreateAlertV5 ..> TheHiveApi : creates
TheHiveConnector --> TheHiveApi : composes
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- The
_prepare_verify_paramhelper creates a new temp CA file on every call and only schedules cleanup at process exit, which may leak files in long‑running workers; consider either reusing a temp file per CA value, cleaning up immediately after each request, or exposing an option to pass a filesystem path to an existing CA bundle instead. _prepare_verify_paramis imported and used fromcreate_alert.pydespite being a “private” helper onthehiveconnector; either make it a public utility (e.g. drop the leading underscore or move it to a shared module) or route all API usage throughTheHiveConnectorfor clearer ownership of TLS setup.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `_prepare_verify_param` helper creates a new temp CA file on every call and only schedules cleanup at process exit, which may leak files in long‑running workers; consider either reusing a temp file per CA value, cleaning up immediately after each request, or exposing an option to pass a filesystem path to an existing CA bundle instead.
- `_prepare_verify_param` is imported and used from `create_alert.py` despite being a “private” helper on `thehiveconnector`; either make it a public utility (e.g. drop the leading underscore or move it to a shared module) or route all API usage through `TheHiveConnector` for clearer ownership of TLS setup.
## Individual Comments
### Comment 1
<location> `TheHiveV5/thehive/thehiveconnector.py:179-188` </location>
<code_context>
return key_to_check in mapping
+def _prepare_verify_param(verify: bool, ca_certificate: Optional[str] = None) -> Union[bool, str]:
+ """
+ Prepare the verify parameter for requests/thehive4py.
+
+ Args:
+ verify: Whether to verify the certificate
+ ca_certificate: PEM-encoded CA certificate content (optional)
+
+ Returns:
+ - False if verify is False
+ - Path to temp CA file if ca_certificate is provided
+ - True otherwise (use system CA store)
+ """
+ if not verify:
+ return False
+
+ if ca_certificate:
+ with tempfile.NamedTemporaryFile(mode="w", suffix=".pem", delete=False) as f:
+ f.write(ca_certificate)
+ ca_file = f.name
+ atexit.register(lambda: os.unlink(ca_file) if os.path.exists(ca_file) else None)
+ return ca_file
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Harden temp CA file cleanup and avoid leaking multiple temp files until process exit.
Two concerns with the current approach:
1. Each call with `ca_certificate` adds a new `atexit` handler and leaves a temp file until process exit, which can accumulate in long‑lived processes.
2. `os.unlink(ca_file)` in the `atexit` lambda can raise if the file is already gone or permissions change between `exists()` and `unlink()`, causing an unhandled exception at shutdown.
You could instead:
- Use a small helper that directly unlinks and catches `OSError` (e.g. `_cleanup_temp(path)`), and register that with `atexit`.
- Optionally add a reuse/cache strategy (e.g. one temp file per unique `ca_certificate`) if this is called frequently.
This would make temp file handling safer and more efficient.
Suggested implementation:
```python
import atexit
import logging
import os
import tempfile
import hashlib
from typing import Optional, Dict, List, Any, Union
from thehive4py import TheHiveApi
from thehive4py.errors import TheHiveError
return key_to_check in mapping
_temp_ca_files: Dict[str, str] = {}
def _cleanup_temp_ca_file(path: str) -> None:
"""
Best-effort removal of a temporary CA certificate file.
Any filesystem-related errors are swallowed to avoid affecting
process shutdown or the caller.
"""
try:
os.unlink(path)
except FileNotFoundError:
# Already gone, nothing to do
return
except OSError:
# Log but do not propagate to avoid issues at interpreter shutdown
logging.warning("Failed to remove temporary CA certificate file %s", path, exc_info=True)
```
```python
def _prepare_verify_param(verify: bool, ca_certificate: Optional[str] = None) -> Union[bool, str]:
"""
Prepare the verify parameter for requests/thehive4py.
Args:
verify: Whether to verify the certificate.
ca_certificate: PEM-encoded CA certificate content (optional).
Returns:
- False if verify is False
- Path to temp CA file if ca_certificate is provided
- True otherwise (use system CA store)
"""
if not verify:
return False
if ca_certificate:
# Cache temp files per unique CA certificate to avoid creating
# many short-lived files in long-running processes.
cert_hash = hashlib.sha256(ca_certificate.encode("utf-8")).hexdigest()
cached_path = _temp_ca_files.get(cert_hash)
if cached_path and os.path.exists(cached_path):
return cached_path
with tempfile.NamedTemporaryFile(mode="w", suffix=".pem", delete=False) as f:
f.write(ca_certificate)
ca_file = f.name
_temp_ca_files[cert_hash] = ca_file
# Best-effort cleanup at process exit; errors are handled in the helper.
atexit.register(_cleanup_temp_ca_file, ca_file)
return ca_file
```
If `_prepare_verify_param` is used in contexts where the same CA certificate might become invalid or rotated while the process is running, you may want to add a mechanism to invalidate entries in `_temp_ca_files` (e.g. when a new certificate with the same logical identity but different content is provided). Right now, the cache keys by the full certificate content hash, so rotated certificates will naturally create new temp files and entries.
</issue_to_address>
### Comment 2
<location> `TheHiveV5/thehive/create_alert.py:9-18` </location>
<code_context>
from requests import HTTPError
from posixpath import join as urljoin
+from .thehiveconnector import _prepare_verify_param
+
class TheHiveCreateAlertV5(Action):
def run(self, arguments: dict[str, Any]) -> Optional[OutputAlert]:
+ verify_param = _prepare_verify_param(
+ self.module.configuration.get("verify_certificate", True),
+ self.module.configuration.get("ca_certificate"),
+ )
api = TheHiveApi(
self.module.configuration["base_url"],
self.module.configuration["apikey"],
organisation=self.module.configuration["organisation"],
- verify=self.module.configuration.get("verify_certificate", True),
+ verify=verify_param,
)
</code_context>
<issue_to_address>
**suggestion:** Consider avoiding cross-module use of a "private" helper and aligning with the connector usage pattern.
Here `create_alert` imports `_prepare_verify_param` directly, unlike other actions that rely on `TheHiveConnector` to handle this via `ca_certificate`.
To keep TLS behavior centralized and the API surface clear, either:
- Expose a public helper (e.g. `prepare_verify_param`) instead of using the underscored function across modules, or
- Route this action through `TheHiveConnector` as well, for consistent handling with the other actions.
This helps avoid future divergence in behavior between actions.
Suggested implementation:
```python
from requests import HTTPError
from posixpath import join as urljoin
from .thehiveconnector import prepare_verify_param
```
```python
class TheHiveCreateAlertV5(Action):
def run(self, arguments: dict[str, Any]) -> Optional[OutputAlert]:
verify_param = prepare_verify_param(
self.module.configuration.get("verify_certificate", True),
self.module.configuration.get("ca_certificate"),
)
api = TheHiveApi(
```
To fully implement the suggestion and keep TLS behavior centralized:
1. In `TheHiveV5/thehive/thehiveconnector.py`:
- Rename the existing `_prepare_verify_param` function to `prepare_verify_param` (remove the leading underscore) and ensure it is part of the public API of that module.
- Update any internal references in that file from `_prepare_verify_param` to `prepare_verify_param`.
2. If any other modules currently import `_prepare_verify_param` directly, update their imports and call sites to use `prepare_verify_param` instead, so there are no dangling references to the private name.
This will align this action with a public, centralized helper while avoiding cross-module use of a private function name.
</issue_to_address>
### Comment 3
<location> `TheHiveV5/tests/test_thehiveconnector.py:59-53` </location>
<code_context>
+ with open(result, "r") as f:
+ assert f.read() == ca_cert
+
+ def test_ca_file_registered_for_cleanup(self):
+ """The CA temp file should be registered for cleanup at exit"""
+ ca_cert = "-----BEGIN CERTIFICATE-----\nTEST\n-----END CERTIFICATE-----"
+ result = _prepare_verify_param(verify=True, ca_certificate=ca_cert)
+
+ # File should exist now
+ assert os.path.exists(result)
+
+
</code_context>
<issue_to_address>
**issue (testing):** The `test_ca_file_registered_for_cleanup` test name suggests cleanup verification, but it only asserts file existence
This test duplicates what `test_verify_true_with_ca_returns_file_path` already covers and doesn’t exercise the behavior implied by its name. I suggest either renaming it to match its current behavior or, preferably, monkeypatching `atexit.register` to assert a cleanup callback is registered and actually removes the file when invoked.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Pull request overview
This pull request adds support for custom CA certificates to enable private PKI TLS verification for TheHive V5 connector and actions. Users can now optionally provide a PEM-encoded CA certificate to verify TLS connections to TheHive instances using internal or private certificate authorities.
Changes:
- Added optional
ca_certificateconfiguration parameter to the module manifest - Implemented
prepare_verify_paramhelper function with certificate caching and cleanup - Updated all actions to support the new CA certificate parameter
- Bumped module version from 1.0.9 to 1.1.0
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
| TheHiveV5/manifest.json | Added ca_certificate configuration parameter and bumped version to 1.1.0 |
| TheHiveV5/thehive/thehiveconnector.py | Added prepare_verify_param helper with caching and cleanup; updated TheHiveConnector constructor to accept ca_certificate |
| TheHiveV5/thehive/create_alert.py | Updated to use prepare_verify_param for custom CA support |
| TheHiveV5/thehive/add_observable.py | Updated to pass ca_certificate to TheHiveConnector |
| TheHiveV5/thehive/add_commment.py | Updated to pass ca_certificate to TheHiveConnector |
| TheHiveV5/thehive/upload_logs.py | Updated to pass ca_certificate to TheHiveConnector |
| TheHiveV5/tests/test_thehiveconnector.py | Added comprehensive unit tests for prepare_verify_param function |
| TheHiveV5/CHANGELOG.md | Documented the new feature in version 1.1.0 |
mchupeau-sk
left a comment
There was a problem hiding this comment.
just a comment with copilot
| verify_param = prepare_verify_param( | ||
| self.module.configuration.get("verify_certificate", True), | ||
| self.module.configuration.get("ca_certificate"), | ||
| ) | ||
| api = TheHiveApi( | ||
| self.module.configuration["base_url"], | ||
| self.module.configuration["apikey"], | ||
| organisation=self.module.configuration["organisation"], | ||
| verify=self.module.configuration.get("verify_certificate", True), | ||
| verify=verify_param, | ||
| ) |
There was a problem hiding this comment.
@CharlesLR-sekoia good remarks from copilot if you can check
Summary by Sourcery
Add support for custom CA certificates to enable private PKI TLS verification for TheHive V5 connector and actions.
New Features:
Enhancements:
Build:
Documentation:
Tests: