diff --git a/MicrosoftActiveDirectory/CHANGELOG.md b/MicrosoftActiveDirectory/CHANGELOG.md index c97cba874..f5c3c1766 100644 --- a/MicrosoftActiveDirectory/CHANGELOG.md +++ b/MicrosoftActiveDirectory/CHANGELOG.md @@ -5,7 +5,16 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## Unreleased +## 2026-01-23 - 1.5.0 + +### Added + +- Add optional `ca_certificate` secret configuration parameter for custom PKI/internal CA support in TLS connections +- Add optional `skip_tls_verify` configuration parameter to explicitly disable TLS verification (for testing only) + +### Fixed + +- Fix temporary CA certificate file leak by cleaning up after connection establishment ## 2026-01-22 - 1.4.3 diff --git a/MicrosoftActiveDirectory/manifest.json b/MicrosoftActiveDirectory/manifest.json index c635689e2..39b76939d 100644 --- a/MicrosoftActiveDirectory/manifest.json +++ b/MicrosoftActiveDirectory/manifest.json @@ -17,6 +17,17 @@ "title": "Password account", "description": "The dedicated password of the account", "type": "string" + }, + "ca_certificate": { + "title": "CA Certificate", + "description": "PEM-encoded CA certificate for TLS verification (optional, for internal PKI)", + "type": "string" + }, + "skip_tls_verify": { + "title": "Skip TLS Verification", + "description": "Skip TLS certificate verification (insecure, use only for testing)", + "type": "boolean", + "default": false } }, "required": [ @@ -25,14 +36,15 @@ "admin_password" ], "secrets": [ - "admin_password" + "admin_password", + "ca_certificate" ] }, "description": "Microsoft Active Directory (Microsoft AD), is a directory service developed by Microsoft for Windows domain networks. It is a centralized system that stores information about networked resources and makes these resources easily accessible to users and administrators. Active Directory provides services for authentication and authorization, organizing and managing resources, such as users, computers, and devices, in a networked environment.", "name": "Microsoft Active Directory", "uuid": "b2d96259-af89-4f7a-ae6e-a0af2d2400f3", "slug": "microsoft-ad", - "version": "1.4.3", + "version": "1.5.0", "categories": [ "IAM" ], diff --git a/MicrosoftActiveDirectory/microsoft_ad/actions_base.py b/MicrosoftActiveDirectory/microsoft_ad/actions_base.py index db8f2fc54..9435d3a5d 100644 --- a/MicrosoftActiveDirectory/microsoft_ad/actions_base.py +++ b/MicrosoftActiveDirectory/microsoft_ad/actions_base.py @@ -1,5 +1,9 @@ from functools import cached_property -from ldap3 import Server, Connection +import os +import ssl +import tempfile + +from ldap3 import Server, Connection, Tls from ldap3.utils.conv import escape_filter_chars from sekoia_automation.action import Action @@ -12,17 +16,35 @@ class MicrosoftADAction(Action): @cached_property def client(self): - server = Server( - host=self.module.configuration.servername, - port=636, - use_ssl=True, - ) - conn = Connection( - server, - auto_bind=True, - user=self.module.configuration.admin_username, - password=self.module.configuration.admin_password, - ) + tls_config = None + ca_file = None + ca_cert = self.module.configuration.ca_certificate + skip_tls_verify = self.module.configuration.skip_tls_verify + + try: + if ca_cert: + with tempfile.NamedTemporaryFile(mode="w", suffix=".pem", delete=False) as f: + f.write(ca_cert) + ca_file = f.name + tls_config = Tls(validate=ssl.CERT_REQUIRED, ca_certs_file=ca_file) + elif skip_tls_verify: + tls_config = Tls(validate=ssl.CERT_NONE) + + server = Server( + host=self.module.configuration.servername, + port=636, + use_ssl=True, + tls=tls_config, + ) + conn = Connection( + server, + auto_bind=True, + user=self.module.configuration.admin_username, + password=self.module.configuration.admin_password, + ) + finally: + if ca_file and os.path.exists(ca_file): + os.unlink(ca_file) return conn diff --git a/MicrosoftActiveDirectory/microsoft_ad/models/common_models.py b/MicrosoftActiveDirectory/microsoft_ad/models/common_models.py index c6bb85993..64d9a8db8 100644 --- a/MicrosoftActiveDirectory/microsoft_ad/models/common_models.py +++ b/MicrosoftActiveDirectory/microsoft_ad/models/common_models.py @@ -8,6 +8,10 @@ class MicrosoftADConfiguration(BaseModel): servername: str = Field(..., description="Remote machine IP or Name") admin_username: str = Field(..., description="Admin username") admin_password: str = Field(..., secret=True, description="Admin password") # type: ignore + ca_certificate: str | None = Field(None, secret=True, description="PEM-encoded CA certificate for TLS verification") # type: ignore + skip_tls_verify: bool = Field( + False, description="Skip TLS certificate verification (insecure, use only for testing)" + ) class MicrosoftADModule(Module): diff --git a/MicrosoftActiveDirectory/tests/test_base.py b/MicrosoftActiveDirectory/tests/test_base.py index 196bd692d..e532abddc 100644 --- a/MicrosoftActiveDirectory/tests/test_base.py +++ b/MicrosoftActiveDirectory/tests/test_base.py @@ -1,6 +1,9 @@ +import os import pytest -from unittest.mock import Mock +import ssl +from unittest.mock import Mock, patch from microsoft_ad.actions_base import MicrosoftADAction +from microsoft_ad.models.common_models import MicrosoftADConfiguration class ConcreteMicrosoftADAction(MicrosoftADAction): @@ -8,6 +11,168 @@ def run(self, arguments): pass +class TestClientTlsConfiguration: + @patch("microsoft_ad.actions_base.Connection") + @patch("microsoft_ad.actions_base.Server") + @patch("microsoft_ad.actions_base.Tls") + def test_client_without_ca_certificate_and_skip_tls_uses_default(self, mock_tls, mock_server, mock_connection): + """Without ca_certificate and skip_tls_verify=False, no Tls config is created (uses library default).""" + action = object.__new__(ConcreteMicrosoftADAction) + mock_module = Mock() + mock_module.configuration = MicrosoftADConfiguration( + servername="ldap.example.com", + admin_username="admin@example.com", + admin_password="password", + ca_certificate=None, + skip_tls_verify=False, + ) + action.module = mock_module + + _ = action.client + + mock_tls.assert_not_called() + mock_server.assert_called_once_with( + host="ldap.example.com", + port=636, + use_ssl=True, + tls=None, + ) + + @patch("microsoft_ad.actions_base.Connection") + @patch("microsoft_ad.actions_base.Server") + @patch("microsoft_ad.actions_base.Tls") + def test_client_with_skip_tls_verify_uses_cert_none(self, mock_tls, mock_server, mock_connection): + """With skip_tls_verify=True, CERT_NONE is used.""" + action = object.__new__(ConcreteMicrosoftADAction) + mock_module = Mock() + mock_module.configuration = MicrosoftADConfiguration( + servername="ldap.example.com", + admin_username="admin@example.com", + admin_password="password", + ca_certificate=None, + skip_tls_verify=True, + ) + action.module = mock_module + + _ = action.client + + mock_tls.assert_called_once_with(validate=ssl.CERT_NONE) + assert mock_server.call_args[1]["tls"] == mock_tls.return_value + + @patch("microsoft_ad.actions_base.Connection") + @patch("microsoft_ad.actions_base.Server") + @patch("microsoft_ad.actions_base.Tls") + def test_client_with_ca_certificate_uses_cert_required(self, mock_tls, mock_server, mock_connection): + """With ca_certificate provided, CERT_REQUIRED is used with the CA file.""" + action = object.__new__(ConcreteMicrosoftADAction) + mock_module = Mock() + ca_cert_content = "-----BEGIN CERTIFICATE-----\nMIIDXTCCAkWgAwIBAgIJALa...\n-----END CERTIFICATE-----" + mock_module.configuration = MicrosoftADConfiguration( + servername="ldap.example.com", + admin_username="admin@example.com", + admin_password="password", + ca_certificate=ca_cert_content, + ) + action.module = mock_module + + _ = action.client + + mock_tls.assert_called_once() + call_kwargs = mock_tls.call_args[1] + assert call_kwargs["validate"] == ssl.CERT_REQUIRED + assert "ca_certs_file" in call_kwargs + assert call_kwargs["ca_certs_file"].endswith(".pem") + + @patch("microsoft_ad.actions_base.Connection") + @patch("microsoft_ad.actions_base.Server") + @patch("microsoft_ad.actions_base.Tls") + def test_client_with_ca_certificate_cleans_up_temp_file(self, mock_tls, mock_server, mock_connection): + """The temporary CA file is deleted after the connection is established.""" + action = object.__new__(ConcreteMicrosoftADAction) + mock_module = Mock() + ca_cert_content = "-----BEGIN CERTIFICATE-----\nTEST_CERT_CONTENT\n-----END CERTIFICATE-----" + mock_module.configuration = MicrosoftADConfiguration( + servername="ldap.example.com", + admin_username="admin@example.com", + admin_password="password", + ca_certificate=ca_cert_content, + ) + action.module = mock_module + + _ = action.client + + call_kwargs = mock_tls.call_args[1] + ca_file_path = call_kwargs["ca_certs_file"] + assert not os.path.exists(ca_file_path), "Temporary CA file should be deleted after connection" + + @patch("microsoft_ad.actions_base.Connection") + @patch("microsoft_ad.actions_base.Server") + @patch("microsoft_ad.actions_base.Tls") + def test_client_ca_certificate_takes_precedence_over_skip_tls(self, mock_tls, mock_server, mock_connection): + """If both ca_certificate and skip_tls_verify are set, ca_certificate takes precedence.""" + action = object.__new__(ConcreteMicrosoftADAction) + mock_module = Mock() + ca_cert_content = "-----BEGIN CERTIFICATE-----\nTEST\n-----END CERTIFICATE-----" + mock_module.configuration = MicrosoftADConfiguration( + servername="ldap.example.com", + admin_username="admin@example.com", + admin_password="password", + ca_certificate=ca_cert_content, + skip_tls_verify=True, + ) + action.module = mock_module + + _ = action.client + + call_kwargs = mock_tls.call_args[1] + assert call_kwargs["validate"] == ssl.CERT_REQUIRED + + @patch("microsoft_ad.actions_base.Connection") + @patch("microsoft_ad.actions_base.Server") + @patch("microsoft_ad.actions_base.Tls") + def test_client_server_configured_with_ssl_on_port_636(self, mock_tls, mock_server, mock_connection): + action = object.__new__(ConcreteMicrosoftADAction) + mock_module = Mock() + mock_module.configuration = MicrosoftADConfiguration( + servername="ldap.example.com", + admin_username="admin@example.com", + admin_password="password", + skip_tls_verify=True, + ) + action.module = mock_module + + _ = action.client + + mock_server.assert_called_once_with( + host="ldap.example.com", + port=636, + use_ssl=True, + tls=mock_tls.return_value, + ) + + @patch("microsoft_ad.actions_base.Connection") + @patch("microsoft_ad.actions_base.Server") + @patch("microsoft_ad.actions_base.Tls") + def test_client_connection_uses_credentials(self, mock_tls, mock_server, mock_connection): + action = object.__new__(ConcreteMicrosoftADAction) + mock_module = Mock() + mock_module.configuration = MicrosoftADConfiguration( + servername="ldap.example.com", + admin_username="admin@example.com", + admin_password="secret_password", + ) + action.module = mock_module + + _ = action.client + + mock_connection.assert_called_once_with( + mock_server.return_value, + auto_bind=True, + user="admin@example.com", + password="secret_password", + ) + + class TestSearchUserdnQuery: def test_search_returns_empty_list_when_no_users_found(self): action = object.__new__(ConcreteMicrosoftADAction) diff --git a/MicrosoftActiveDirectory/tests/test_search.py b/MicrosoftActiveDirectory/tests/test_search.py index 334b3bb4f..4672a72a8 100644 --- a/MicrosoftActiveDirectory/tests/test_search.py +++ b/MicrosoftActiveDirectory/tests/test_search.py @@ -70,14 +70,16 @@ def test_search_with_attributes(): def test_search_in_base_exception(): - username = "Mick Lennon" search = f"(|(samaccountname={username})(userPrincipalName={username})(mail={username})(givenName={username}))" basedn = "dc=example,dc=com" attributes = ["name"] action = configured_action(SearchAction) - with patch("microsoft_ad.search_actions.SearchAction.run", side_effect=Exception("mocked error")): + # Patch the client property to avoid real LDAP connection + with patch("microsoft_ad.actions_base.MicrosoftADAction.client") as mock_client: + mock_client.search.side_effect = Exception("LDAP connection failed") + with pytest.raises(Exception) as exc_info: action.run({"search_filter": search, "basedn": basedn, "attributes": attributes})