-
Notifications
You must be signed in to change notification settings - Fork 40
feat(TheHiveV5):add support of private PKI #1955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,15 @@ | ||
| import os | ||
| import pytest | ||
| import sys | ||
| import unittest.mock as mock | ||
| import requests_mock | ||
|
|
||
| from thehive.thehiveconnector import TheHiveConnector, key_exists | ||
| from thehive.thehiveconnector import ( | ||
| TheHiveConnector, | ||
| key_exists, | ||
| prepare_verify_param, | ||
| _cleanup_ca_files, | ||
| ) | ||
| from thehive4py.errors import TheHiveError | ||
|
|
||
|
|
||
|
|
@@ -29,6 +35,66 @@ def test_connector_init_without_apikey(): | |
| assert "API key is required" in str(exc_info.value) | ||
|
|
||
|
|
||
| class TestPrepareVerifyParam: | ||
| """Tests for the prepare_verify_param helper function""" | ||
|
|
||
| def test_verify_false_returns_false(self): | ||
| """When verify is False, should return False regardless of ca_certificate""" | ||
| assert prepare_verify_param(verify=False) is False | ||
| assert prepare_verify_param(verify=False, ca_certificate="some cert") is False | ||
|
|
||
| def test_verify_true_without_ca_returns_true(self): | ||
| """When verify is True and no CA, should return True (use system CA store)""" | ||
| assert prepare_verify_param(verify=True) is True | ||
| assert prepare_verify_param(verify=True, ca_certificate=None) is True | ||
|
|
||
| def test_verify_true_with_ca_returns_file_path(self): | ||
| """When verify is True and CA provided, should return path to temp file""" | ||
| ca_cert = "-----BEGIN CERTIFICATE-----\nTEST_UNIQUE_1\n-----END CERTIFICATE-----" | ||
| result = prepare_verify_param(verify=True, ca_certificate=ca_cert) | ||
|
|
||
| assert isinstance(result, str) | ||
| assert result.endswith(".pem") | ||
| assert os.path.exists(result) | ||
|
|
||
| # Verify content | ||
| with open(result, "r") as f: | ||
| assert f.read() == ca_cert | ||
|
|
||
| def test_same_ca_returns_cached_file(self): | ||
| """Same CA certificate content should return the same cached file path""" | ||
| ca_cert = "-----BEGIN CERTIFICATE-----\nTEST_CACHE\n-----END CERTIFICATE-----" | ||
| result1 = prepare_verify_param(verify=True, ca_certificate=ca_cert) | ||
| result2 = prepare_verify_param(verify=True, ca_certificate=ca_cert) | ||
|
|
||
| assert result1 == result2 | ||
| assert os.path.exists(result1) | ||
|
|
||
| def test_different_ca_returns_different_files(self): | ||
| """Different CA certificates should return different file paths""" | ||
| ca_cert1 = "-----BEGIN CERTIFICATE-----\nTEST_DIFF_1\n-----END CERTIFICATE-----" | ||
| ca_cert2 = "-----BEGIN CERTIFICATE-----\nTEST_DIFF_2\n-----END CERTIFICATE-----" | ||
| result1 = prepare_verify_param(verify=True, ca_certificate=ca_cert1) | ||
| result2 = prepare_verify_param(verify=True, ca_certificate=ca_cert2) | ||
|
|
||
| assert result1 != result2 | ||
| assert os.path.exists(result1) | ||
| assert os.path.exists(result2) | ||
|
|
||
| def test_cleanup_function_removes_files(self): | ||
| """The cleanup function should remove all cached CA files""" | ||
| ca_cert = "-----BEGIN CERTIFICATE-----\nTEST_CLEANUP\n-----END CERTIFICATE-----" | ||
| result = prepare_verify_param(verify=True, ca_certificate=ca_cert) | ||
|
|
||
| assert os.path.exists(result) | ||
|
|
||
| # Call cleanup | ||
| _cleanup_ca_files() | ||
|
|
||
| # File should be removed | ||
| assert not os.path.exists(result) | ||
|
Comment on lines
+84
to
+95
|
||
|
|
||
|
|
||
| def test_connector_safe_call_with_thehive_error(): | ||
| """Test that _safe_call properly handles TheHiveError""" | ||
| connector = TheHiveConnector("http://localhost:9000", "APIKEY123", "TESTORG") | ||
|
|
@@ -101,3 +167,90 @@ def test_comment_add_in_alert(): | |
|
|
||
| # Verify the request was made | ||
| assert mock_requests.call_count == 1 | ||
|
|
||
|
|
||
| class TestPrepareVerifyParamEdgeCases: | ||
| """Additional tests for prepare_verify_param edge cases""" | ||
|
|
||
| def test_empty_certificate_returns_true(self): | ||
| """Empty certificate string should return True (use system CA store)""" | ||
| assert prepare_verify_param(verify=True, ca_certificate="") is True | ||
| assert prepare_verify_param(verify=True, ca_certificate=" ") is True | ||
| assert prepare_verify_param(verify=True, ca_certificate="\n\t") is True | ||
|
|
||
| def test_line_ending_normalization(self): | ||
| """Certificates with different line endings should use the same cache entry""" | ||
| ca_cert_unix = "-----BEGIN CERTIFICATE-----\nTEST_LINE_ENDING\n-----END CERTIFICATE-----" | ||
| ca_cert_windows = "-----BEGIN CERTIFICATE-----\r\nTEST_LINE_ENDING\r\n-----END CERTIFICATE-----" | ||
| ca_cert_mac = "-----BEGIN CERTIFICATE-----\rTEST_LINE_ENDING\r-----END CERTIFICATE-----" | ||
|
|
||
| result_unix = prepare_verify_param(verify=True, ca_certificate=ca_cert_unix) | ||
| result_windows = prepare_verify_param(verify=True, ca_certificate=ca_cert_windows) | ||
| result_mac = prepare_verify_param(verify=True, ca_certificate=ca_cert_mac) | ||
|
|
||
| # All should return the same cached file | ||
| assert result_unix == result_windows == result_mac | ||
|
|
||
| def test_file_permissions(self): | ||
| """CA certificate file should have restrictive permissions""" | ||
| ca_cert = "-----BEGIN CERTIFICATE-----\nTEST_PERMISSIONS\n-----END CERTIFICATE-----" | ||
| result = prepare_verify_param(verify=True, ca_certificate=ca_cert) | ||
|
|
||
| # Check file permissions (0o600 = owner read/write only) | ||
| file_stat = os.stat(result) | ||
| permissions = file_stat.st_mode & 0o777 | ||
| assert permissions == 0o600 | ||
|
|
||
| def test_cache_recreates_deleted_file(self): | ||
| """If cached file is deleted, a new one should be created""" | ||
| ca_cert = "-----BEGIN CERTIFICATE-----\nTEST_RECREATE\n-----END CERTIFICATE-----" | ||
| result1 = prepare_verify_param(verify=True, ca_certificate=ca_cert) | ||
|
|
||
| # Delete the file | ||
| os.unlink(result1) | ||
| assert not os.path.exists(result1) | ||
|
|
||
| # Call again - should create a new file | ||
| result2 = prepare_verify_param(verify=True, ca_certificate=ca_cert) | ||
| assert os.path.exists(result2) | ||
| # New file path should be different since original was deleted | ||
| assert result1 != result2 | ||
|
|
||
| def test_cleanup_clears_cache_dict(self): | ||
| """Cleanup function should clear the cache dictionary""" | ||
| import thehive.thehiveconnector as connector_module | ||
|
|
||
| ca_cert = "-----BEGIN CERTIFICATE-----\nTEST_CACHE_CLEAR\n-----END CERTIFICATE-----" | ||
| prepare_verify_param(verify=True, ca_certificate=ca_cert) | ||
|
|
||
| # Cache should have entries | ||
| assert len(connector_module._ca_file_cache) > 0 | ||
|
|
||
| # Call cleanup | ||
| _cleanup_ca_files() | ||
|
|
||
| # Cache should be empty | ||
| assert len(connector_module._ca_file_cache) == 0 | ||
|
|
||
| def test_atexit_register_called_once(self): | ||
| """atexit.register should only be called once""" | ||
| import thehive.thehiveconnector as connector_module | ||
| import atexit | ||
|
|
||
| # Reset the flag for this test | ||
| original_flag = connector_module._atexit_registered | ||
| connector_module._atexit_registered = False | ||
|
|
||
| with mock.patch.object(atexit, "register") as mock_register: | ||
| ca_cert1 = "-----BEGIN CERTIFICATE-----\nTEST_ATEXIT_1\n-----END CERTIFICATE-----" | ||
| ca_cert2 = "-----BEGIN CERTIFICATE-----\nTEST_ATEXIT_2\n-----END CERTIFICATE-----" | ||
|
|
||
| prepare_verify_param(verify=True, ca_certificate=ca_cert1) | ||
| prepare_verify_param(verify=True, ca_certificate=ca_cert2) | ||
|
|
||
| # Should only be called once | ||
| assert mock_register.call_count == 1 | ||
| mock_register.assert_called_once_with(connector_module._cleanup_ca_files) | ||
|
|
||
| # Restore the flag | ||
| connector_module._atexit_registered = True | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,14 +6,20 @@ | |
| from requests import HTTPError | ||
| from posixpath import join as urljoin | ||
|
|
||
| from .thehiveconnector import prepare_verify_param | ||
|
|
||
|
|
||
| class TheHiveCreateAlertV5(Action): | ||
| def run(self, arguments: dict[str, Any]) -> Optional[OutputAlert]: | ||
| verify_param = prepare_verify_param( | ||
| self.module.configuration.get("verify_certificate", True), | ||
| self.module.configuration.get("ca_certificate"), | ||
| ) | ||
| api = TheHiveApi( | ||
| self.module.configuration["base_url"], | ||
| self.module.configuration["apikey"], | ||
| organisation=self.module.configuration["organisation"], | ||
| verify=self.module.configuration.get("verify_certificate", True), | ||
| verify=verify_param, | ||
| ) | ||
|
Comment on lines
+14
to
23
|
||
|
|
||
| arg_sekoia_server = arguments.get("sekoia_base_url", "https://app.sekoia.io") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,8 +6,12 @@ | |
| - Adds basic error handling and logging | ||
| """ | ||
|
|
||
| import atexit | ||
| import hashlib | ||
| import logging | ||
| from typing import Optional, Dict, List, Any | ||
| import os | ||
| import tempfile | ||
| from typing import Optional, Dict, List, Any, Union | ||
|
|
||
| from thehive4py import TheHiveApi | ||
| from thehive4py.errors import TheHiveError | ||
|
|
@@ -173,6 +177,98 @@ def key_exists(mapping: dict, key_to_check: str) -> bool: | |
| return key_to_check in mapping | ||
|
|
||
|
|
||
| # Cache for CA certificate files to avoid creating duplicates | ||
| _ca_file_cache: Dict[str, str] = {} | ||
| _atexit_registered = False | ||
|
Comment on lines
+180
to
+182
|
||
|
|
||
|
|
||
| def _cleanup_ca_files() -> None: | ||
| """Clean up all cached CA certificate files at process exit.""" | ||
| global _ca_file_cache | ||
| for ca_file in list(_ca_file_cache.values()): | ||
| try: | ||
| if os.path.exists(ca_file): | ||
| os.unlink(ca_file) | ||
| except OSError: | ||
| logger.warning("Failed to clean up temporary CA file: %s", ca_file) | ||
| _ca_file_cache.clear() | ||
|
|
||
|
|
||
| def prepare_verify_param(verify: bool, ca_certificate: Optional[str] = None) -> Union[bool, str]: | ||
| """ | ||
| Prepare the verify parameter for requests/thehive4py. | ||
|
|
||
| Args: | ||
| verify: Whether to verify the certificate | ||
| ca_certificate: PEM-encoded CA certificate content (optional) | ||
|
|
||
| Returns: | ||
| - False if verify is False | ||
| - Path to temp CA file if ca_certificate is provided | ||
| - True otherwise (use system CA store) | ||
| """ | ||
| global _atexit_registered | ||
|
|
||
| if not verify: | ||
| return False | ||
|
|
||
| if ca_certificate: | ||
| # Treat empty or whitespace-only certificate as no certificate | ||
| ca_certificate = ca_certificate.strip() | ||
| if not ca_certificate: | ||
| return True | ||
|
|
||
| # Normalize line endings to Unix-style for consistent hashing | ||
| ca_certificate = ca_certificate.replace("\r\n", "\n").replace("\r", "\n") | ||
|
|
||
| # Use hash of certificate content as cache key to avoid duplicates | ||
| ca_hash = hashlib.sha256(ca_certificate.encode()).hexdigest() | ||
|
|
||
| # Check cache with existence verification | ||
| if ca_hash in _ca_file_cache: | ||
| cached_path = _ca_file_cache[ca_hash] | ||
| try: | ||
| # Verify file still exists and is readable | ||
| with open(cached_path, "r") as f: | ||
| f.read(1) | ||
| return cached_path | ||
| except (OSError, IOError): | ||
| # File was deleted or is inaccessible, remove from cache | ||
| del _ca_file_cache[ca_hash] | ||
|
|
||
| # Create new temp file with restricted permissions | ||
| fd, ca_file = tempfile.mkstemp(suffix=".pem", text=True) | ||
| try: | ||
| # Set restrictive permissions (owner read/write only) | ||
| os.chmod(ca_file, 0o600) | ||
| with os.fdopen(fd, "w") as f: | ||
| f.write(ca_certificate) | ||
| f.flush() | ||
| os.fsync(f.fileno()) | ||
| except Exception: | ||
| # Clean up on failure | ||
| try: | ||
| os.close(fd) | ||
| except OSError: | ||
| pass | ||
| try: | ||
| os.unlink(ca_file) | ||
| except OSError: | ||
| pass | ||
| raise | ||
|
|
||
CharlesLR-sekoia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| _ca_file_cache[ca_hash] = ca_file | ||
|
|
||
| # Register cleanup only once | ||
| if not _atexit_registered: | ||
| atexit.register(_cleanup_ca_files) | ||
| _atexit_registered = True | ||
|
|
||
| return ca_file | ||
|
|
||
| return True | ||
|
|
||
|
|
||
| class TheHiveConnector: | ||
| """ | ||
| Minimal TheHive Alert Connector. | ||
|
|
@@ -181,11 +277,14 @@ class TheHiveConnector: | |
| res = connector.alert_get("ALERT-ID") | ||
| """ | ||
|
|
||
| def __init__(self, url: str, api_key: str, organisation: str, verify: bool = True): | ||
| def __init__( | ||
| self, url: str, api_key: str, organisation: str, verify: bool = True, ca_certificate: Optional[str] = None | ||
| ): | ||
| if not api_key: | ||
| raise ValueError("API key is required") | ||
|
|
||
| self.api = TheHiveApi(url=url, apikey=api_key, organisation=organisation, verify=verify) | ||
| verify_param = prepare_verify_param(verify, ca_certificate) | ||
| self.api = TheHiveApi(url=url, apikey=api_key, organisation=organisation, verify=verify_param) | ||
|
|
||
| def _safe_call(self, fn, *args, **kwargs): | ||
| try: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.