Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion MicrosoftActiveDirectory/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
## 2026-01-23 - 1.5.0

### Added

- Add optional `ca_certificate` secret configuration parameter for custom PKI/internal CA support in TLS connections
- Add optional `skip_tls_verify` configuration parameter to explicitly disable TLS verification (for testing only)

### Fixed

- Fix temporary CA certificate file leak by cleaning up after connection establishment

## 2026-01-22 - 1.4.3

Expand Down
16 changes: 14 additions & 2 deletions MicrosoftActiveDirectory/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@
"title": "Password account",
"description": "The dedicated password of the account",
"type": "string"
},
"ca_certificate": {
"title": "CA Certificate",
"description": "PEM-encoded CA certificate for TLS verification (optional, for internal PKI)",
"type": "string"
},
"skip_tls_verify": {
"title": "Skip TLS Verification",
"description": "Skip TLS certificate verification (insecure, use only for testing)",
"type": "boolean",
"default": false
}
},
"required": [
Expand All @@ -25,14 +36,15 @@
"admin_password"
],
"secrets": [
"admin_password"
"admin_password",
"ca_certificate"
]
},
"description": "Microsoft Active Directory (Microsoft AD), is a directory service developed by Microsoft for Windows domain networks. It is a centralized system that stores information about networked resources and makes these resources easily accessible to users and administrators. Active Directory provides services for authentication and authorization, organizing and managing resources, such as users, computers, and devices, in a networked environment.",
"name": "Microsoft Active Directory",
"uuid": "b2d96259-af89-4f7a-ae6e-a0af2d2400f3",
"slug": "microsoft-ad",
"version": "1.4.3",
"version": "1.5.0",
"categories": [
"IAM"
],
Expand Down
46 changes: 34 additions & 12 deletions MicrosoftActiveDirectory/microsoft_ad/actions_base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from functools import cached_property
from ldap3 import Server, Connection
import os
import ssl
import tempfile

from ldap3 import Server, Connection, Tls
from ldap3.utils.conv import escape_filter_chars

from sekoia_automation.action import Action
Expand All @@ -12,17 +16,35 @@ class MicrosoftADAction(Action):

@cached_property
def client(self):
server = Server(
host=self.module.configuration.servername,
port=636,
use_ssl=True,
)
conn = Connection(
server,
auto_bind=True,
user=self.module.configuration.admin_username,
password=self.module.configuration.admin_password,
)
tls_config = None
ca_file = None
ca_cert = self.module.configuration.ca_certificate
skip_tls_verify = self.module.configuration.skip_tls_verify

try:
if ca_cert:
with tempfile.NamedTemporaryFile(mode="w", suffix=".pem", delete=False) as f:
f.write(ca_cert)
ca_file = f.name
tls_config = Tls(validate=ssl.CERT_REQUIRED, ca_certs_file=ca_file)
elif skip_tls_verify:
tls_config = Tls(validate=ssl.CERT_NONE)

server = Server(
host=self.module.configuration.servername,
port=636,
use_ssl=True,
tls=tls_config,
)
conn = Connection(
server,
auto_bind=True,
user=self.module.configuration.admin_username,
password=self.module.configuration.admin_password,
)
finally:
if ca_file and os.path.exists(ca_file):
os.unlink(ca_file)

return conn

Expand Down
4 changes: 4 additions & 0 deletions MicrosoftActiveDirectory/microsoft_ad/models/common_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ class MicrosoftADConfiguration(BaseModel):
servername: str = Field(..., description="Remote machine IP or Name")
admin_username: str = Field(..., description="Admin username")
admin_password: str = Field(..., secret=True, description="Admin password") # type: ignore
ca_certificate: str | None = Field(None, secret=True, description="PEM-encoded CA certificate for TLS verification") # type: ignore
skip_tls_verify: bool = Field(
False, description="Skip TLS certificate verification (insecure, use only for testing)"
)


class MicrosoftADModule(Module):
Expand Down
167 changes: 166 additions & 1 deletion MicrosoftActiveDirectory/tests/test_base.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,178 @@
import os
import pytest
from unittest.mock import Mock
import ssl
from unittest.mock import Mock, patch
from microsoft_ad.actions_base import MicrosoftADAction
from microsoft_ad.models.common_models import MicrosoftADConfiguration


class ConcreteMicrosoftADAction(MicrosoftADAction):
def run(self, arguments):
pass


class TestClientTlsConfiguration:
@patch("microsoft_ad.actions_base.Connection")
@patch("microsoft_ad.actions_base.Server")
@patch("microsoft_ad.actions_base.Tls")
def test_client_without_ca_certificate_and_skip_tls_uses_default(self, mock_tls, mock_server, mock_connection):
"""Without ca_certificate and skip_tls_verify=False, no Tls config is created (uses library default)."""
action = object.__new__(ConcreteMicrosoftADAction)
mock_module = Mock()
mock_module.configuration = MicrosoftADConfiguration(
servername="ldap.example.com",
admin_username="admin@example.com",
admin_password="password",
ca_certificate=None,
skip_tls_verify=False,
)
action.module = mock_module

_ = action.client

mock_tls.assert_not_called()
mock_server.assert_called_once_with(
host="ldap.example.com",
port=636,
use_ssl=True,
tls=None,
)

@patch("microsoft_ad.actions_base.Connection")
@patch("microsoft_ad.actions_base.Server")
@patch("microsoft_ad.actions_base.Tls")
def test_client_with_skip_tls_verify_uses_cert_none(self, mock_tls, mock_server, mock_connection):
"""With skip_tls_verify=True, CERT_NONE is used."""
action = object.__new__(ConcreteMicrosoftADAction)
mock_module = Mock()
mock_module.configuration = MicrosoftADConfiguration(
servername="ldap.example.com",
admin_username="admin@example.com",
admin_password="password",
ca_certificate=None,
skip_tls_verify=True,
)
action.module = mock_module

_ = action.client

mock_tls.assert_called_once_with(validate=ssl.CERT_NONE)
assert mock_server.call_args[1]["tls"] == mock_tls.return_value

@patch("microsoft_ad.actions_base.Connection")
@patch("microsoft_ad.actions_base.Server")
@patch("microsoft_ad.actions_base.Tls")
def test_client_with_ca_certificate_uses_cert_required(self, mock_tls, mock_server, mock_connection):
"""With ca_certificate provided, CERT_REQUIRED is used with the CA file."""
action = object.__new__(ConcreteMicrosoftADAction)
mock_module = Mock()
ca_cert_content = "-----BEGIN CERTIFICATE-----\nMIIDXTCCAkWgAwIBAgIJALa...\n-----END CERTIFICATE-----"
mock_module.configuration = MicrosoftADConfiguration(
servername="ldap.example.com",
admin_username="admin@example.com",
admin_password="password",
ca_certificate=ca_cert_content,
)
action.module = mock_module

_ = action.client

mock_tls.assert_called_once()
call_kwargs = mock_tls.call_args[1]
assert call_kwargs["validate"] == ssl.CERT_REQUIRED
assert "ca_certs_file" in call_kwargs
assert call_kwargs["ca_certs_file"].endswith(".pem")

@patch("microsoft_ad.actions_base.Connection")
@patch("microsoft_ad.actions_base.Server")
@patch("microsoft_ad.actions_base.Tls")
def test_client_with_ca_certificate_cleans_up_temp_file(self, mock_tls, mock_server, mock_connection):
"""The temporary CA file is deleted after the connection is established."""
action = object.__new__(ConcreteMicrosoftADAction)
mock_module = Mock()
ca_cert_content = "-----BEGIN CERTIFICATE-----\nTEST_CERT_CONTENT\n-----END CERTIFICATE-----"
mock_module.configuration = MicrosoftADConfiguration(
servername="ldap.example.com",
admin_username="admin@example.com",
admin_password="password",
ca_certificate=ca_cert_content,
)
action.module = mock_module

_ = action.client

call_kwargs = mock_tls.call_args[1]
ca_file_path = call_kwargs["ca_certs_file"]
assert not os.path.exists(ca_file_path), "Temporary CA file should be deleted after connection"

@patch("microsoft_ad.actions_base.Connection")
@patch("microsoft_ad.actions_base.Server")
@patch("microsoft_ad.actions_base.Tls")
def test_client_ca_certificate_takes_precedence_over_skip_tls(self, mock_tls, mock_server, mock_connection):
"""If both ca_certificate and skip_tls_verify are set, ca_certificate takes precedence."""
action = object.__new__(ConcreteMicrosoftADAction)
mock_module = Mock()
ca_cert_content = "-----BEGIN CERTIFICATE-----\nTEST\n-----END CERTIFICATE-----"
mock_module.configuration = MicrosoftADConfiguration(
servername="ldap.example.com",
admin_username="admin@example.com",
admin_password="password",
ca_certificate=ca_cert_content,
skip_tls_verify=True,
)
action.module = mock_module

_ = action.client

call_kwargs = mock_tls.call_args[1]
assert call_kwargs["validate"] == ssl.CERT_REQUIRED

@patch("microsoft_ad.actions_base.Connection")
@patch("microsoft_ad.actions_base.Server")
@patch("microsoft_ad.actions_base.Tls")
def test_client_server_configured_with_ssl_on_port_636(self, mock_tls, mock_server, mock_connection):
action = object.__new__(ConcreteMicrosoftADAction)
mock_module = Mock()
mock_module.configuration = MicrosoftADConfiguration(
servername="ldap.example.com",
admin_username="admin@example.com",
admin_password="password",
skip_tls_verify=True,
)
action.module = mock_module

_ = action.client

mock_server.assert_called_once_with(
host="ldap.example.com",
port=636,
use_ssl=True,
tls=mock_tls.return_value,
)

@patch("microsoft_ad.actions_base.Connection")
@patch("microsoft_ad.actions_base.Server")
@patch("microsoft_ad.actions_base.Tls")
def test_client_connection_uses_credentials(self, mock_tls, mock_server, mock_connection):
action = object.__new__(ConcreteMicrosoftADAction)
mock_module = Mock()
mock_module.configuration = MicrosoftADConfiguration(
servername="ldap.example.com",
admin_username="admin@example.com",
admin_password="secret_password",
)
action.module = mock_module

_ = action.client

mock_connection.assert_called_once_with(
mock_server.return_value,
auto_bind=True,
user="admin@example.com",
password="secret_password",
)


class TestSearchUserdnQuery:
def test_search_returns_empty_list_when_no_users_found(self):
action = object.__new__(ConcreteMicrosoftADAction)
Expand Down
6 changes: 4 additions & 2 deletions MicrosoftActiveDirectory/tests/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,16 @@ def test_search_with_attributes():


def test_search_in_base_exception():

username = "Mick Lennon"
search = f"(|(samaccountname={username})(userPrincipalName={username})(mail={username})(givenName={username}))"
basedn = "dc=example,dc=com"
attributes = ["name"]
action = configured_action(SearchAction)

with patch("microsoft_ad.search_actions.SearchAction.run", side_effect=Exception("mocked error")):
# Patch the client property to avoid real LDAP connection
with patch("microsoft_ad.actions_base.MicrosoftADAction.client") as mock_client:
mock_client.search.side_effect = Exception("LDAP connection failed")

with pytest.raises(Exception) as exc_info:
action.run({"search_filter": search, "basedn": basedn, "attributes": attributes})

Expand Down