-
Notifications
You must be signed in to change notification settings - Fork 40
feat(TheHiveV5):add support of private PKI #1955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,16 @@ | ||
| import os | ||
| import pytest | ||
| import sys | ||
| import unittest.mock as mock | ||
| import requests_mock | ||
|
|
||
| from thehive.thehiveconnector import TheHiveConnector, key_exists | ||
| from thehive.thehiveconnector import ( | ||
| TheHiveConnector, | ||
| key_exists, | ||
| prepare_verify_param, | ||
| _ca_file_cache, | ||
| _cleanup_ca_files, | ||
| ) | ||
CharlesLR-sekoia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| from thehive4py.errors import TheHiveError | ||
|
|
||
|
|
||
|
|
@@ -29,6 +36,66 @@ def test_connector_init_without_apikey(): | |
| assert "API key is required" in str(exc_info.value) | ||
|
|
||
|
|
||
| class TestPrepareVerifyParam: | ||
| """Tests for the prepare_verify_param helper function""" | ||
|
|
||
| def test_verify_false_returns_false(self): | ||
| """When verify is False, should return False regardless of ca_certificate""" | ||
| assert prepare_verify_param(verify=False) is False | ||
| assert prepare_verify_param(verify=False, ca_certificate="some cert") is False | ||
|
|
||
| def test_verify_true_without_ca_returns_true(self): | ||
| """When verify is True and no CA, should return True (use system CA store)""" | ||
| assert prepare_verify_param(verify=True) is True | ||
| assert prepare_verify_param(verify=True, ca_certificate=None) is True | ||
|
|
||
| def test_verify_true_with_ca_returns_file_path(self): | ||
| """When verify is True and CA provided, should return path to temp file""" | ||
| ca_cert = "-----BEGIN CERTIFICATE-----\nTEST_UNIQUE_1\n-----END CERTIFICATE-----" | ||
| result = prepare_verify_param(verify=True, ca_certificate=ca_cert) | ||
|
|
||
| assert isinstance(result, str) | ||
| assert result.endswith(".pem") | ||
| assert os.path.exists(result) | ||
CharlesLR-sekoia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| # Verify content | ||
| with open(result, "r") as f: | ||
| assert f.read() == ca_cert | ||
|
|
||
| def test_same_ca_returns_cached_file(self): | ||
| """Same CA certificate content should return the same cached file path""" | ||
| ca_cert = "-----BEGIN CERTIFICATE-----\nTEST_CACHE\n-----END CERTIFICATE-----" | ||
| result1 = prepare_verify_param(verify=True, ca_certificate=ca_cert) | ||
| result2 = prepare_verify_param(verify=True, ca_certificate=ca_cert) | ||
|
|
||
| assert result1 == result2 | ||
| assert os.path.exists(result1) | ||
|
|
||
| def test_different_ca_returns_different_files(self): | ||
| """Different CA certificates should return different file paths""" | ||
| ca_cert1 = "-----BEGIN CERTIFICATE-----\nTEST_DIFF_1\n-----END CERTIFICATE-----" | ||
| ca_cert2 = "-----BEGIN CERTIFICATE-----\nTEST_DIFF_2\n-----END CERTIFICATE-----" | ||
| result1 = prepare_verify_param(verify=True, ca_certificate=ca_cert1) | ||
| result2 = prepare_verify_param(verify=True, ca_certificate=ca_cert2) | ||
|
|
||
| assert result1 != result2 | ||
| assert os.path.exists(result1) | ||
| assert os.path.exists(result2) | ||
|
|
||
| def test_cleanup_function_removes_files(self): | ||
| """The cleanup function should remove all cached CA files""" | ||
| ca_cert = "-----BEGIN CERTIFICATE-----\nTEST_CLEANUP\n-----END CERTIFICATE-----" | ||
| result = prepare_verify_param(verify=True, ca_certificate=ca_cert) | ||
|
|
||
| assert os.path.exists(result) | ||
|
|
||
| # Call cleanup | ||
| _cleanup_ca_files() | ||
|
|
||
| # File should be removed | ||
| assert not os.path.exists(result) | ||
|
Comment on lines
+84
to
+95
|
||
|
|
||
|
|
||
| def test_connector_safe_call_with_thehive_error(): | ||
| """Test that _safe_call properly handles TheHiveError""" | ||
| connector = TheHiveConnector("http://localhost:9000", "APIKEY123", "TESTORG") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,14 +6,20 @@ | |
| from requests import HTTPError | ||
| from posixpath import join as urljoin | ||
|
|
||
| from .thehiveconnector import prepare_verify_param | ||
|
|
||
|
|
||
| class TheHiveCreateAlertV5(Action): | ||
| def run(self, arguments: dict[str, Any]) -> Optional[OutputAlert]: | ||
| verify_param = prepare_verify_param( | ||
| self.module.configuration.get("verify_certificate", True), | ||
| self.module.configuration.get("ca_certificate"), | ||
| ) | ||
| api = TheHiveApi( | ||
| self.module.configuration["base_url"], | ||
| self.module.configuration["apikey"], | ||
| organisation=self.module.configuration["organisation"], | ||
| verify=self.module.configuration.get("verify_certificate", True), | ||
| verify=verify_param, | ||
| ) | ||
|
Comment on lines
+14
to
23
|
||
|
|
||
| arg_sekoia_server = arguments.get("sekoia_base_url", "https://app.sekoia.io") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,8 +6,12 @@ | |
| - Adds basic error handling and logging | ||
| """ | ||
|
|
||
| import atexit | ||
| import hashlib | ||
| import logging | ||
| from typing import Optional, Dict, List, Any | ||
| import os | ||
| import tempfile | ||
| from typing import Optional, Dict, List, Any, Union | ||
|
|
||
| from thehive4py import TheHiveApi | ||
| from thehive4py.errors import TheHiveError | ||
|
|
@@ -173,6 +177,62 @@ def key_exists(mapping: dict, key_to_check: str) -> bool: | |
| return key_to_check in mapping | ||
|
|
||
|
|
||
| # Cache for CA certificate files to avoid creating duplicates | ||
| _ca_file_cache: Dict[str, str] = {} | ||
| _atexit_registered = False | ||
|
Comment on lines
+180
to
+182
|
||
|
|
||
|
|
||
| def _cleanup_ca_files() -> None: | ||
| """Clean up all cached CA certificate files at process exit.""" | ||
| for ca_file in _ca_file_cache.values(): | ||
| try: | ||
| if os.path.exists(ca_file): | ||
| os.unlink(ca_file) | ||
| except OSError: | ||
| logger.warning("Failed to clean up temporary CA file: %s", ca_file) | ||
|
|
||
|
|
||
CharlesLR-sekoia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| def prepare_verify_param(verify: bool, ca_certificate: Optional[str] = None) -> Union[bool, str]: | ||
| """ | ||
| Prepare the verify parameter for requests/thehive4py. | ||
|
|
||
| Args: | ||
| verify: Whether to verify the certificate | ||
| ca_certificate: PEM-encoded CA certificate content (optional) | ||
|
|
||
| Returns: | ||
| - False if verify is False | ||
| - Path to temp CA file if ca_certificate is provided | ||
| - True otherwise (use system CA store) | ||
| """ | ||
| global _atexit_registered | ||
|
|
||
| if not verify: | ||
| return False | ||
|
|
||
| if ca_certificate: | ||
| # Use hash of certificate content as cache key to avoid duplicates | ||
| ca_hash = hashlib.sha256(ca_certificate.encode()).hexdigest() | ||
|
|
||
| if ca_hash in _ca_file_cache and os.path.exists(_ca_file_cache[ca_hash]): | ||
CharlesLR-sekoia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return _ca_file_cache[ca_hash] | ||
|
|
||
| with tempfile.NamedTemporaryFile(mode="w", suffix=".pem", delete=False) as f: | ||
| f.write(ca_certificate) | ||
CharlesLR-sekoia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
CharlesLR-sekoia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
CharlesLR-sekoia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ca_file = f.name | ||
|
||
|
|
||
CharlesLR-sekoia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| _ca_file_cache[ca_hash] = ca_file | ||
|
|
||
| # Register cleanup only once | ||
| if not _atexit_registered: | ||
| atexit.register(_cleanup_ca_files) | ||
| _atexit_registered = True | ||
CharlesLR-sekoia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| return ca_file | ||
|
|
||
| return True | ||
|
|
||
|
|
||
| class TheHiveConnector: | ||
| """ | ||
| Minimal TheHive Alert Connector. | ||
|
|
@@ -181,11 +241,14 @@ class TheHiveConnector: | |
| res = connector.alert_get("ALERT-ID") | ||
| """ | ||
|
|
||
| def __init__(self, url: str, api_key: str, organisation: str, verify: bool = True): | ||
| def __init__( | ||
| self, url: str, api_key: str, organisation: str, verify: bool = True, ca_certificate: Optional[str] = None | ||
| ): | ||
| if not api_key: | ||
| raise ValueError("API key is required") | ||
|
|
||
| self.api = TheHiveApi(url=url, apikey=api_key, organisation=organisation, verify=verify) | ||
| verify_param = prepare_verify_param(verify, ca_certificate) | ||
| self.api = TheHiveApi(url=url, apikey=api_key, organisation=organisation, verify=verify_param) | ||
|
|
||
| def _safe_call(self, fn, *args, **kwargs): | ||
| try: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.