diff --git a/TheHiveV5/CHANGELOG.md b/TheHiveV5/CHANGELOG.md index f5f6dabdc..339a68c5b 100644 --- a/TheHiveV5/CHANGELOG.md +++ b/TheHiveV5/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +## 2026-01-23 - 1.1.0 + +### Added + +- Add optional `ca_certificate` configuration parameter for custom PKI/internal CA support in TLS connections +- Implement CA certificate file caching to avoid creating duplicate temp files for the same certificate +- Add robust cleanup mechanism for temporary CA files at process exit + ## 2025-12-22 - 1.0.9 ### Fixed diff --git a/TheHiveV5/manifest.json b/TheHiveV5/manifest.json index 1cb854eab..24e7ea88d 100644 --- a/TheHiveV5/manifest.json +++ b/TheHiveV5/manifest.json @@ -19,6 +19,11 @@ "description": "Is the server certificate verified", "type": "boolean", "default": true + }, + "ca_certificate": { + "title": "CA Certificate", + "description": "PEM-encoded CA certificate for TLS verification (optional, for internal PKI)", + "type": "string" } }, "required": [ @@ -36,8 +41,8 @@ "name": "The Hive V5", "uuid": "d6c96586-707c-451f-b9f6-d31b3291f87d", "slug": "thehivev5", - "version": "1.0.9", + "version": "1.1.0", "categories": [ "Collaboration Tools" ] -} +} \ No newline at end of file diff --git a/TheHiveV5/tests/test_thehiveconnector.py b/TheHiveV5/tests/test_thehiveconnector.py index e5a60cd00..e5f24e0a2 100644 --- a/TheHiveV5/tests/test_thehiveconnector.py +++ b/TheHiveV5/tests/test_thehiveconnector.py @@ -1,9 +1,15 @@ +import os import pytest import sys import unittest.mock as mock import requests_mock -from thehive.thehiveconnector import TheHiveConnector, key_exists +from thehive.thehiveconnector import ( + TheHiveConnector, + key_exists, + prepare_verify_param, + _cleanup_ca_files, +) from thehive4py.errors import TheHiveError @@ -29,6 +35,66 @@ def test_connector_init_without_apikey(): assert "API key is required" in str(exc_info.value) +class TestPrepareVerifyParam: + """Tests for the prepare_verify_param helper function""" + + def test_verify_false_returns_false(self): + """When verify is False, should return False regardless of ca_certificate""" + assert prepare_verify_param(verify=False) is False + assert prepare_verify_param(verify=False, ca_certificate="some cert") is False + + def test_verify_true_without_ca_returns_true(self): + """When verify is True and no CA, should return True (use system CA store)""" + assert prepare_verify_param(verify=True) is True + assert prepare_verify_param(verify=True, ca_certificate=None) is True + + def test_verify_true_with_ca_returns_file_path(self): + """When verify is True and CA provided, should return path to temp file""" + ca_cert = "-----BEGIN CERTIFICATE-----\nTEST_UNIQUE_1\n-----END CERTIFICATE-----" + result = prepare_verify_param(verify=True, ca_certificate=ca_cert) + + assert isinstance(result, str) + assert result.endswith(".pem") + assert os.path.exists(result) + + # Verify content + with open(result, "r") as f: + assert f.read() == ca_cert + + def test_same_ca_returns_cached_file(self): + """Same CA certificate content should return the same cached file path""" + ca_cert = "-----BEGIN CERTIFICATE-----\nTEST_CACHE\n-----END CERTIFICATE-----" + result1 = prepare_verify_param(verify=True, ca_certificate=ca_cert) + result2 = prepare_verify_param(verify=True, ca_certificate=ca_cert) + + assert result1 == result2 + assert os.path.exists(result1) + + def test_different_ca_returns_different_files(self): + """Different CA certificates should return different file paths""" + ca_cert1 = "-----BEGIN CERTIFICATE-----\nTEST_DIFF_1\n-----END CERTIFICATE-----" + ca_cert2 = "-----BEGIN CERTIFICATE-----\nTEST_DIFF_2\n-----END CERTIFICATE-----" + result1 = prepare_verify_param(verify=True, ca_certificate=ca_cert1) + result2 = prepare_verify_param(verify=True, ca_certificate=ca_cert2) + + assert result1 != result2 + assert os.path.exists(result1) + assert os.path.exists(result2) + + def test_cleanup_function_removes_files(self): + """The cleanup function should remove all cached CA files""" + ca_cert = "-----BEGIN CERTIFICATE-----\nTEST_CLEANUP\n-----END CERTIFICATE-----" + result = prepare_verify_param(verify=True, ca_certificate=ca_cert) + + assert os.path.exists(result) + + # Call cleanup + _cleanup_ca_files() + + # File should be removed + assert not os.path.exists(result) + + def test_connector_safe_call_with_thehive_error(): """Test that _safe_call properly handles TheHiveError""" connector = TheHiveConnector("http://localhost:9000", "APIKEY123", "TESTORG") @@ -101,3 +167,90 @@ def test_comment_add_in_alert(): # Verify the request was made assert mock_requests.call_count == 1 + + +class TestPrepareVerifyParamEdgeCases: + """Additional tests for prepare_verify_param edge cases""" + + def test_empty_certificate_returns_true(self): + """Empty certificate string should return True (use system CA store)""" + assert prepare_verify_param(verify=True, ca_certificate="") is True + assert prepare_verify_param(verify=True, ca_certificate=" ") is True + assert prepare_verify_param(verify=True, ca_certificate="\n\t") is True + + def test_line_ending_normalization(self): + """Certificates with different line endings should use the same cache entry""" + ca_cert_unix = "-----BEGIN CERTIFICATE-----\nTEST_LINE_ENDING\n-----END CERTIFICATE-----" + ca_cert_windows = "-----BEGIN CERTIFICATE-----\r\nTEST_LINE_ENDING\r\n-----END CERTIFICATE-----" + ca_cert_mac = "-----BEGIN CERTIFICATE-----\rTEST_LINE_ENDING\r-----END CERTIFICATE-----" + + result_unix = prepare_verify_param(verify=True, ca_certificate=ca_cert_unix) + result_windows = prepare_verify_param(verify=True, ca_certificate=ca_cert_windows) + result_mac = prepare_verify_param(verify=True, ca_certificate=ca_cert_mac) + + # All should return the same cached file + assert result_unix == result_windows == result_mac + + def test_file_permissions(self): + """CA certificate file should have restrictive permissions""" + ca_cert = "-----BEGIN CERTIFICATE-----\nTEST_PERMISSIONS\n-----END CERTIFICATE-----" + result = prepare_verify_param(verify=True, ca_certificate=ca_cert) + + # Check file permissions (0o600 = owner read/write only) + file_stat = os.stat(result) + permissions = file_stat.st_mode & 0o777 + assert permissions == 0o600 + + def test_cache_recreates_deleted_file(self): + """If cached file is deleted, a new one should be created""" + ca_cert = "-----BEGIN CERTIFICATE-----\nTEST_RECREATE\n-----END CERTIFICATE-----" + result1 = prepare_verify_param(verify=True, ca_certificate=ca_cert) + + # Delete the file + os.unlink(result1) + assert not os.path.exists(result1) + + # Call again - should create a new file + result2 = prepare_verify_param(verify=True, ca_certificate=ca_cert) + assert os.path.exists(result2) + # New file path should be different since original was deleted + assert result1 != result2 + + def test_cleanup_clears_cache_dict(self): + """Cleanup function should clear the cache dictionary""" + import thehive.thehiveconnector as connector_module + + ca_cert = "-----BEGIN CERTIFICATE-----\nTEST_CACHE_CLEAR\n-----END CERTIFICATE-----" + prepare_verify_param(verify=True, ca_certificate=ca_cert) + + # Cache should have entries + assert len(connector_module._ca_file_cache) > 0 + + # Call cleanup + _cleanup_ca_files() + + # Cache should be empty + assert len(connector_module._ca_file_cache) == 0 + + def test_atexit_register_called_once(self): + """atexit.register should only be called once""" + import thehive.thehiveconnector as connector_module + import atexit + + # Reset the flag for this test + original_flag = connector_module._atexit_registered + connector_module._atexit_registered = False + + with mock.patch.object(atexit, "register") as mock_register: + ca_cert1 = "-----BEGIN CERTIFICATE-----\nTEST_ATEXIT_1\n-----END CERTIFICATE-----" + ca_cert2 = "-----BEGIN CERTIFICATE-----\nTEST_ATEXIT_2\n-----END CERTIFICATE-----" + + prepare_verify_param(verify=True, ca_certificate=ca_cert1) + prepare_verify_param(verify=True, ca_certificate=ca_cert2) + + # Should only be called once + assert mock_register.call_count == 1 + mock_register.assert_called_once_with(connector_module._cleanup_ca_files) + + # Restore the flag + connector_module._atexit_registered = True diff --git a/TheHiveV5/thehive/add_commment.py b/TheHiveV5/thehive/add_commment.py index a5f17a004..edb5112a1 100644 --- a/TheHiveV5/thehive/add_commment.py +++ b/TheHiveV5/thehive/add_commment.py @@ -13,6 +13,7 @@ def run(self, arguments: dict[str, Any]) -> Optional[OutputComment]: self.module.configuration["apikey"], organisation=self.module.configuration["organisation"], verify=self.module.configuration.get("verify_certificate", True), + ca_certificate=self.module.configuration.get("ca_certificate"), ) arg_alert_id = arguments["alert_id"] diff --git a/TheHiveV5/thehive/add_observable.py b/TheHiveV5/thehive/add_observable.py index 9f76f5d07..212a995c8 100644 --- a/TheHiveV5/thehive/add_observable.py +++ b/TheHiveV5/thehive/add_observable.py @@ -12,6 +12,7 @@ def run(self, arguments: dict[str, Any]) -> Optional[Dict[str, List]]: self.module.configuration["apikey"], organisation=self.module.configuration["organisation"], verify=self.module.configuration.get("verify_certificate", True), + ca_certificate=self.module.configuration.get("ca_certificate"), ) arg_alert_id = arguments["alert_id"] diff --git a/TheHiveV5/thehive/create_alert.py b/TheHiveV5/thehive/create_alert.py index fca4b2d11..f02a60e05 100644 --- a/TheHiveV5/thehive/create_alert.py +++ b/TheHiveV5/thehive/create_alert.py @@ -6,14 +6,20 @@ from requests import HTTPError from posixpath import join as urljoin +from .thehiveconnector import prepare_verify_param + class TheHiveCreateAlertV5(Action): def run(self, arguments: dict[str, Any]) -> Optional[OutputAlert]: + verify_param = prepare_verify_param( + self.module.configuration.get("verify_certificate", True), + self.module.configuration.get("ca_certificate"), + ) api = TheHiveApi( self.module.configuration["base_url"], self.module.configuration["apikey"], organisation=self.module.configuration["organisation"], - verify=self.module.configuration.get("verify_certificate", True), + verify=verify_param, ) arg_sekoia_server = arguments.get("sekoia_base_url", "https://app.sekoia.io") diff --git a/TheHiveV5/thehive/thehiveconnector.py b/TheHiveV5/thehive/thehiveconnector.py index f89d73ba9..f045ac0da 100644 --- a/TheHiveV5/thehive/thehiveconnector.py +++ b/TheHiveV5/thehive/thehiveconnector.py @@ -6,8 +6,12 @@ - Adds basic error handling and logging """ +import atexit +import hashlib import logging -from typing import Optional, Dict, List, Any +import os +import tempfile +from typing import Optional, Dict, List, Any, Union from thehive4py import TheHiveApi from thehive4py.errors import TheHiveError @@ -173,6 +177,98 @@ def key_exists(mapping: dict, key_to_check: str) -> bool: return key_to_check in mapping +# Cache for CA certificate files to avoid creating duplicates +_ca_file_cache: Dict[str, str] = {} +_atexit_registered = False + + +def _cleanup_ca_files() -> None: + """Clean up all cached CA certificate files at process exit.""" + global _ca_file_cache + for ca_file in list(_ca_file_cache.values()): + try: + if os.path.exists(ca_file): + os.unlink(ca_file) + except OSError: + logger.warning("Failed to clean up temporary CA file: %s", ca_file) + _ca_file_cache.clear() + + +def prepare_verify_param(verify: bool, ca_certificate: Optional[str] = None) -> Union[bool, str]: + """ + Prepare the verify parameter for requests/thehive4py. + + Args: + verify: Whether to verify the certificate + ca_certificate: PEM-encoded CA certificate content (optional) + + Returns: + - False if verify is False + - Path to temp CA file if ca_certificate is provided + - True otherwise (use system CA store) + """ + global _atexit_registered + + if not verify: + return False + + if ca_certificate: + # Treat empty or whitespace-only certificate as no certificate + ca_certificate = ca_certificate.strip() + if not ca_certificate: + return True + + # Normalize line endings to Unix-style for consistent hashing + ca_certificate = ca_certificate.replace("\r\n", "\n").replace("\r", "\n") + + # Use hash of certificate content as cache key to avoid duplicates + ca_hash = hashlib.sha256(ca_certificate.encode()).hexdigest() + + # Check cache with existence verification + if ca_hash in _ca_file_cache: + cached_path = _ca_file_cache[ca_hash] + try: + # Verify file still exists and is readable + with open(cached_path, "r") as f: + f.read(1) + return cached_path + except (OSError, IOError): + # File was deleted or is inaccessible, remove from cache + del _ca_file_cache[ca_hash] + + # Create new temp file with restricted permissions + fd, ca_file = tempfile.mkstemp(suffix=".pem", text=True) + try: + # Set restrictive permissions (owner read/write only) + os.chmod(ca_file, 0o600) + with os.fdopen(fd, "w") as f: + f.write(ca_certificate) + f.flush() + os.fsync(f.fileno()) + except Exception: + # Clean up on failure + try: + os.close(fd) + except OSError: + pass + try: + os.unlink(ca_file) + except OSError: + pass + raise + + _ca_file_cache[ca_hash] = ca_file + + # Register cleanup only once + if not _atexit_registered: + atexit.register(_cleanup_ca_files) + _atexit_registered = True + + return ca_file + + return True + + class TheHiveConnector: """ Minimal TheHive Alert Connector. @@ -181,11 +277,14 @@ class TheHiveConnector: res = connector.alert_get("ALERT-ID") """ - def __init__(self, url: str, api_key: str, organisation: str, verify: bool = True): + def __init__( + self, url: str, api_key: str, organisation: str, verify: bool = True, ca_certificate: Optional[str] = None + ): if not api_key: raise ValueError("API key is required") - self.api = TheHiveApi(url=url, apikey=api_key, organisation=organisation, verify=verify) + verify_param = prepare_verify_param(verify, ca_certificate) + self.api = TheHiveApi(url=url, apikey=api_key, organisation=organisation, verify=verify_param) def _safe_call(self, fn, *args, **kwargs): try: diff --git a/TheHiveV5/thehive/upload_logs.py b/TheHiveV5/thehive/upload_logs.py index bee2a67d7..5702a256b 100644 --- a/TheHiveV5/thehive/upload_logs.py +++ b/TheHiveV5/thehive/upload_logs.py @@ -15,6 +15,7 @@ def run(self, arguments: dict[str, Any]) -> dict[str, list[dict[str, Any]]]: self.module.configuration["apikey"], organisation=self.module.configuration["organisation"], verify=self.module.configuration.get("verify_certificate", True), + ca_certificate=self.module.configuration.get("ca_certificate"), ) arg_alert_id = arguments["alert_id"]