Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions MicrosoftActiveDirectory/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

## 2026-01-23 - 1.5.0

### Added

- Add optional `ca_certificate` configuration parameter for custom PKI/internal CA support in TLS connections

## 2026-01-22 - 1.4.3

### Changed
Expand Down
7 changes: 6 additions & 1 deletion MicrosoftActiveDirectory/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
"title": "Password account",
"description": "The dedicated password of the account",
"type": "string"
},
"ca_certificate": {
"title": "CA Certificate",
"description": "PEM-encoded CA certificate for TLS verification (optional, for internal PKI)",
"type": "string"
}
},
"required": [
Expand All @@ -32,7 +37,7 @@
"name": "Microsoft Active Directory",
"uuid": "b2d96259-af89-4f7a-ae6e-a0af2d2400f3",
"slug": "microsoft-ad",
"version": "1.4.3",
"version": "1.5.0",
"categories": [
"IAM"
],
Expand Down
16 changes: 15 additions & 1 deletion MicrosoftActiveDirectory/microsoft_ad/actions_base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from functools import cached_property
from ldap3 import Server, Connection
import ssl
import tempfile

from ldap3 import Server, Connection, Tls
from ldap3.utils.conv import escape_filter_chars

from sekoia_automation.action import Action
Expand All @@ -12,10 +15,21 @@ class MicrosoftADAction(Action):

@cached_property
def client(self):
tls_config = None
ca_cert = self.module.configuration.ca_certificate
if ca_cert:
with tempfile.NamedTemporaryFile(mode="w", suffix=".pem", delete=False) as f:
f.write(ca_cert)
ca_file = f.name
tls_config = Tls(validate=ssl.CERT_REQUIRED, ca_certs_file=ca_file)
else:
tls_config = Tls(validate=ssl.CERT_NONE)

server = Server(
host=self.module.configuration.servername,
port=636,
use_ssl=True,
tls=tls_config,
)
conn = Connection(
server,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class MicrosoftADConfiguration(BaseModel):
servername: str = Field(..., description="Remote machine IP or Name")
admin_username: str = Field(..., description="Admin username")
admin_password: str = Field(..., secret=True, description="Admin password") # type: ignore
ca_certificate: str | None = Field(None, description="PEM-encoded CA certificate for TLS verification")


class MicrosoftADModule(Module):
Expand Down
117 changes: 116 additions & 1 deletion MicrosoftActiveDirectory/tests/test_base.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,128 @@
import pytest
from unittest.mock import Mock
import ssl
import tempfile
from unittest.mock import Mock, patch, MagicMock
from microsoft_ad.actions_base import MicrosoftADAction
from microsoft_ad.models.common_models import MicrosoftADConfiguration, MicrosoftADModule


class ConcreteMicrosoftADAction(MicrosoftADAction):
def run(self, arguments):
pass


class TestClientTlsConfiguration:
@patch("microsoft_ad.actions_base.Connection")
@patch("microsoft_ad.actions_base.Server")
@patch("microsoft_ad.actions_base.Tls")
def test_client_without_ca_certificate_uses_cert_none(self, mock_tls, mock_server, mock_connection):
action = object.__new__(ConcreteMicrosoftADAction)
mock_module = Mock()
mock_module.configuration = MicrosoftADConfiguration(
servername="ldap.example.com",
admin_username="admin@example.com",
admin_password="password",
ca_certificate=None,
)
action.module = mock_module

_ = action.client

mock_tls.assert_called_once_with(validate=ssl.CERT_NONE)
mock_server.assert_called_once()
assert mock_server.call_args[1]["tls"] == mock_tls.return_value

@patch("microsoft_ad.actions_base.Connection")
@patch("microsoft_ad.actions_base.Server")
@patch("microsoft_ad.actions_base.Tls")
def test_client_with_ca_certificate_uses_cert_required(self, mock_tls, mock_server, mock_connection):
action = object.__new__(ConcreteMicrosoftADAction)
mock_module = Mock()
ca_cert_content = "-----BEGIN CERTIFICATE-----\nMIIDXTCCAkWgAwIBAgIJALa...\n-----END CERTIFICATE-----"
mock_module.configuration = MicrosoftADConfiguration(
servername="ldap.example.com",
admin_username="admin@example.com",
admin_password="password",
ca_certificate=ca_cert_content,
)
action.module = mock_module

_ = action.client

mock_tls.assert_called_once()
call_kwargs = mock_tls.call_args[1]
assert call_kwargs["validate"] == ssl.CERT_REQUIRED
assert "ca_certs_file" in call_kwargs
assert call_kwargs["ca_certs_file"].endswith(".pem")

@patch("microsoft_ad.actions_base.Connection")
@patch("microsoft_ad.actions_base.Server")
@patch("microsoft_ad.actions_base.Tls")
def test_client_with_ca_certificate_writes_cert_to_temp_file(self, mock_tls, mock_server, mock_connection):
action = object.__new__(ConcreteMicrosoftADAction)
mock_module = Mock()
ca_cert_content = "-----BEGIN CERTIFICATE-----\nTEST_CERT_CONTENT\n-----END CERTIFICATE-----"
mock_module.configuration = MicrosoftADConfiguration(
servername="ldap.example.com",
admin_username="admin@example.com",
admin_password="password",
ca_certificate=ca_cert_content,
)
action.module = mock_module

_ = action.client

call_kwargs = mock_tls.call_args[1]
ca_file_path = call_kwargs["ca_certs_file"]
with open(ca_file_path, "r") as f:
content = f.read()
assert content == ca_cert_content

@patch("microsoft_ad.actions_base.Connection")
@patch("microsoft_ad.actions_base.Server")
@patch("microsoft_ad.actions_base.Tls")
def test_client_server_configured_with_ssl_on_port_636(self, mock_tls, mock_server, mock_connection):
action = object.__new__(ConcreteMicrosoftADAction)
mock_module = Mock()
mock_module.configuration = MicrosoftADConfiguration(
servername="ldap.example.com",
admin_username="admin@example.com",
admin_password="password",
)
action.module = mock_module

_ = action.client

mock_server.assert_called_once_with(
host="ldap.example.com",
port=636,
use_ssl=True,
tls=mock_tls.return_value,
)

@patch("microsoft_ad.actions_base.Connection")
@patch("microsoft_ad.actions_base.Server")
@patch("microsoft_ad.actions_base.Tls")
def test_client_connection_uses_credentials(self, mock_tls, mock_server, mock_connection):
action = object.__new__(ConcreteMicrosoftADAction)
mock_module = Mock()
mock_module.configuration = MicrosoftADConfiguration(
servername="ldap.example.com",
admin_username="admin@example.com",
admin_password="secret_password",
)
action.module = mock_module

_ = action.client

mock_connection.assert_called_once_with(
mock_server.return_value,
auto_bind=True,
user="admin@example.com",
password="secret_password",
)


class TestSearchUserdnQuery:
def test_search_returns_empty_list_when_no_users_found(self):
action = object.__new__(ConcreteMicrosoftADAction)
Expand Down
Loading