Skip to content

Created new trigger with IDS-flagged events for MISP#1869

Open
Imothep-Akonis wants to merge 25 commits intoSEKOIA-IO:developfrom
Akonis-cybersecurity:main
Open

Created new trigger with IDS-flagged events for MISP#1869
Imothep-Akonis wants to merge 25 commits intoSEKOIA-IO:developfrom
Akonis-cybersecurity:main

Conversation

@Imothep-Akonis
Copy link

@Imothep-Akonis Imothep-Akonis commented Jan 13, 2026

Summary by Sourcery

Add a new trigger to export IDS-flagged MISP attributes into a Sekoia.io IOC collection and update the MISP integration packaging and tooling.

New Features:

  • Introduce a MISP trigger that periodically retrieves IDS-flagged attributes and pushes them to a configured Sekoia.io IOC Collection, with type filtering, deduplication, batching, and retry logic.

Bug Fixes:

  • Correct a log message typo in the existing MISP events trigger.

Enhancements:

  • Modernize the MISP automation module packaging metadata, dependencies, and code-quality tooling configuration, including Python 3.11 targeting and expanded dev tools.
  • Expose the new trigger via the module entrypoint and manifest, including support for a Sekoia.io API key secret.
  • Add documentation and a README section describing the new MISP IDS attributes to IOC Collection trigger usage and configuration.

Tests:

  • Add comprehensive unit tests covering the new IDS attributes to IOC Collection trigger behavior, including MISP interaction, Sekoia API calls, error handling, and the main run loop.

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Jan 13, 2026

Reviewer's Guide

Adds a new production-ready trigger that periodically pulls IDS-flagged attributes from MISP and pushes them into a Sekoia.io IOC Collection, alongside a full modernization of the MISP module’s packaging, tooling, and tests.

Sequence diagram for a polling cycle of the new IDS-to-IOC trigger

sequenceDiagram
    actor SekoiaOperator
    participant PlaybookEngine
    participant MISPIDSTrigger as MISPIDSAttributesToIOCCollectionTrigger
    participant MISPAPI as MISP_API
    participant TTLCache as ProcessedAttributesCache
    participant IOCAPI as Sekoia_IOC_Collections_API

    SekoiaOperator->>PlaybookEngine: Activate_playbook_with_trigger
    PlaybookEngine->>MISPIDSTrigger: start()

    MISPIDSTrigger->>MISPIDSTrigger: initialize_misp_client()
    MISPIDSTrigger->>MISPIDSTrigger: initialize_cache()

    loop While_trigger_running
        MISPIDSTrigger->>MISPAPI: search(to_ids=1, publish_timestamp)
        MISPAPI-->>MISPIDSTrigger: attributes

        MISPIDSTrigger->>MISPIDSTrigger: filter_supported_types(attributes)
        MISPIDSTrigger->>TTLCache: check_already_processed(uuids)
        TTLCache-->>MISPIDSTrigger: unprocessed_uuids
        MISPIDSTrigger->>MISPIDSTrigger: extract_ioc_value(new_attributes)

        alt Has_new_IOCs
            MISPIDSTrigger->>IOCAPI: POST /ioc-collections/{uuid}/indicators/text
            IOCAPI-->>MISPIDSTrigger: 200_created_updated_ignored
            MISPIDSTrigger->>TTLCache: mark_as_processed(uuids)
        else No_new_IOCs
            MISPIDSTrigger->>MISPIDSTrigger: log("No new IOCs to process")
        end

        MISPIDSTrigger->>MISPIDSTrigger: sleep(sleep_time)
    end
Loading

Class diagram for the new MISPIDSAttributesToIOCCollectionTrigger

classDiagram
    class Trigger

    class MISPIDSAttributesToIOCCollectionTrigger {
        - misp_client
        - processed_attributes
        + sleep_time
        + publish_timestamp
        + ioc_collection_server
        + ioc_collection_uuid
        + sekoia_api_key
        + __init__(args, kwargs)
        + initialize_misp_client()
        + initialize_cache()
        + fetch_attributes(publish_timestamp)
        + filter_supported_types(attributes)
        + extract_ioc_value(attribute)
        + push_to_sekoia(ioc_values)
        + run()
    }

    MISPIDSAttributesToIOCCollectionTrigger --|> Trigger
Loading

File-Level Changes

Change Details Files
Introduce new trigger that syncs IDS-flagged MISP attributes into a Sekoia.io IOC Collection and register it in the module entrypoint.
  • Implement MISPIDSAttributesToIOCCollectionTrigger with polling loop, configuration properties, MISP client initialization, in-memory TTL cache for processed attributes, and robust error logging.
  • Implement attribute fetching from MISP using PyMISP with to_ids filter and publish_timestamp window plus filtering for a specific set of supported attribute types and composite-value handling.
  • Implement push_to_sekoia with batching, retries, rate-limit handling, and authentication/404 error handling against the IOC Collection text ingestion API.
  • Wire the new trigger into the main module by registering it under the name trigger-misp-ids-to-ioc-collection.
MISP/misp/trigger_misp_ids_attributes_to_ioc_collection.py
MISP/main.py
Add a comprehensive unit test suite for the new trigger, including network/logging isolation and loop behavior, and accompanying user-facing documentation.
  • Create tests that cover trigger configuration properties, MISP client initialization, cache setup, attribute filtering and IOC extraction, MISP search error paths, Sekoia push retry logic, and the main run loop including error recovery and KeyboardInterrupt.
  • Mock Sekoia Trigger logging and HTTP calls to avoid side effects during tests.
  • Add user documentation describing prerequisites, configuration parameters, supported IOC types, behavior, and troubleshooting for the new trigger.
  • Extend README with a brief overview and configuration of the new trigger and reference to detailed docs.
MISP/tests/test_trigger_misp_ids_to_ioc_collection.py
MISP/docs/trigger_misp_ids_to_ioc_collection.md
MISP/README.md
Modernize the MISP module packaging and tooling configuration and expose a CLI entrypoint for the new trigger.
  • Update pyproject metadata (name, description, version, authors, classifiers, keywords) and bump poetry-core and runtime dependencies (pymisp, sekoia-automation-sdk, stix2, cachetools, orjson, requests) plus add aiohttp and tenacity.
  • Define explicit dev dependencies and tooling configuration for black, isort, pytest (with coverage and markers), mypy, pylint, bandit, ruff, and Sphinx, including coverage and mypy configuration tuned to the misp package.
  • Change Black and other tools’ target Python version to 3.11 and adjust coverage includes/omits to focus on misp while excluding unrelated modules and test helpers.
  • Add a poetry script entrypoint misp-trigger pointing to misp.trigger_misp_ids_attributes_to_ioc_collection:main (though main is not yet implemented in that module).
MISP/pyproject.toml
MISP/poetry.lock
Update the module manifest to support the new Sekoia API key secret and bump the integration version.
  • Add sekoia_api_key as a secret configuration field required for the connector, and include it in the secrets list alongside misp_api_key.
  • Bump manifest version from 2.8.0 to 2.8.1 to reflect the new functionality.
MISP/manifest.json
Minor maintenance fixes in existing code.
  • Fix a typo in an informational log message in the existing MISPTrigger process_new_events method from 'Skippink' to 'Skipping'.
  • Add a trigger-specific JSON file stub for the new trigger (contents not shown in diff).
MISP/misp/trigger.py
MISP/trigger_misp_ids_attributes_to_ioc_collection.json

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 8 issues, and left some high level feedback:

  • The MISPIDSAttributesToIOCCollectionTrigger mixes use of module.configuration and configuration for settings (and even keeps a commented-out sekoia_api_key source); it would be clearer and less error-prone to consistently read all parameters from a single configuration source and remove dead code.
  • In trigger_misp_ids_attributes_to_ioc_collection.py, the os import and the FORMAT constant are never used; consider removing them to keep the module lean.
  • You added tenacity as a dependency but implemented custom retry logic with while retry_count < max_retries; either refactor to use tenacity or drop the unused dependency to avoid maintenance overhead.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The `MISPIDSAttributesToIOCCollectionTrigger` mixes use of `module.configuration` and `configuration` for settings (and even keeps a commented-out `sekoia_api_key` source); it would be clearer and less error-prone to consistently read all parameters from a single configuration source and remove dead code.
- In `trigger_misp_ids_attributes_to_ioc_collection.py`, the `os` import and the `FORMAT` constant are never used; consider removing them to keep the module lean.
- You added `tenacity` as a dependency but implemented custom retry logic with `while retry_count < max_retries`; either refactor to use `tenacity` or drop the unused dependency to avoid maintenance overhead.

## Individual Comments

### Comment 1
<location> `MISP/misp/trigger_misp_ids_attributes_to_ioc_collection.py:249-258` </location>
<code_context>
+                try:
+                    response = requests.post(url, json=payload, headers=headers, timeout=30)
+
+                    if response.status_code == 200:
+                        result = response.json()
+                        self.log(
+                            message=f"Batch {batch_num}/{total_batches} pushed successfully: "
+                            f"{result.get('created', 0)} created, "
+                            f"{result.get('updated', 0)} updated, "
+                            f"{result.get('ignored', 0)} ignored",
+                            level="info",
+                        )
+                        success = True
+                        break
+                    elif response.status_code == 429:
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Handle a broader range of successful HTTP status codes, not just 200, and be defensive around JSON parsing.

Other 2xx responses (e.g. 201/202) may be valid success cases for this API. Limiting success to `status_code == 200` will incorrectly treat them as errors and trigger retries. Prefer `response.ok` or `200 <= response.status_code < 300`, and wrap `response.json()` in try/except so a non‑JSON body doesn’t cause the whole batch to be marked as failed.
</issue_to_address>

### Comment 2
<location> `MISP/misp/trigger_misp_ids_attributes_to_ioc_collection.py:265-273` </location>
<code_context>
+                        retry_after = response.headers.get("Retry-After", None)
+                        if retry_after:
+                            wait_time = int(retry_after)
+                        else:
+                            wait_time = 2**retry_count * 10
+
+                        self.log(
+                            message=f"Rate limited. Waiting {wait_time} seconds...",
+                            level="info",
+                        )
+                        time.sleep(wait_time)
+                        retry_count += 1
+                    elif response.status_code in [401, 403]:
+                        # Authentication/Authorization errors - fatal
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Avoid retrying on non-retriable 4xx errors (other than 429) to prevent unnecessary load and delays.

In the generic `else` branch, any status not in [200, 401, 403, 404, 429] (including 400/422 and other 4xx) is treated as transient and retried. These are typically client errors and won’t succeed on retry, adding unnecessary attempts and delay. Consider restricting retries to 5xx responses (plus 429, which you already special-case) and treating other 4xx as fatal for that batch (log and stop retrying).

Suggested implementation:

```python
                    elif response.status_code == 404:
                        # Not found - fatal
                        self.log(
                            message=f"IOC Collection not found: {response.status_code} - {response.text}",
                            level="error",
                        )
                        raise Exception(f"Sekoia API resource not found: {response.status_code}")
                    elif 400 <= response.status_code < 500:
                        # Other client errors (non-retriable) - fatal
                        self.log(
                            message=f"Client error when pushing IOCs: {response.status_code} - {response.text}",
                            level="error",
                        )
                        raise Exception(f"Sekoia API client error: {response.status_code}")

```

If there is a generic `else:` branch after this block that performs retries for all remaining status codes, it will now only affect 5xx and any non-HTTP/unexpected codes, because all 4xx responses are handled explicitly. You do not need to change that `else:` branch to meet the recommendation; retries are now effectively restricted to 5xx (plus the existing 429 special case).
</issue_to_address>

### Comment 3
<location> `MISP/misp/trigger_misp_ids_attributes_to_ioc_collection.py:60-69` </location>
<code_context>
+                    for attr in new_attributes:
+                        self.processed_attributes[attr.uuid] = True
+
+                    self.log(
+                        message=f"Successfully processed {len(new_attributes)} new IOCs",
+                        level="info",
+                    )
+                else:
+                    self.log(
+                        message="No new IOCs to process",
+                        level="info",
+                    )
+
+                # Sleep until next poll
+                self.log(
+                    message=f"Sleeping for {self.sleep_time} seconds",
+                    level="info",
+                )
+                time.sleep(self.sleep_time)
+
+            except KeyboardInterrupt:
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Long blocking sleep in the main loop can delay trigger shutdown responsiveness.

Because the loop uses a single `time.sleep(self.sleep_time)`, setting `self.running = False` during the sleep won’t take effect until the full interval passes. To make shutdown more responsive, either break the sleep into shorter intervals that re-check `self.running`, or use an interruptible wait primitive (e.g., an Event) that can be signaled when the trigger should stop.
</issue_to_address>

### Comment 4
<location> `MISP/tests/test_trigger_misp_ids_to_ioc_collection.py:201-204` </location>
<code_context>
+    # Sekoia push
+    # ------------------------------------------------------------------ #
+
+    def test_push_to_sekoia_empty(self, trigger):
+        """Test that empty list doesn't make any requests."""
+        trigger.push_to_sekoia([])
+        trigger.log.assert_called()
+
+    @patch("misp.trigger_misp_ids_attributes_to_ioc_collection.requests.post")
</code_context>

<issue_to_address>
**suggestion (testing):** Add tests for `push_to_sekoia` when `sekoia_api_key` or `ioc_collection_uuid` are missing to cover the early-return branches.

`push_to_sekoia` has explicit guards for missing `sekoia_api_key` and `ioc_collection_uuid` that log an error and return without calling the API, but these branches aren’t tested. Please add tests that: (1) remove/override `trigger.module.configuration["sekoia_api_key"]` and assert no `requests.post` call and an error log, and (2) remove `trigger.configuration["ioc_collection_uuid"]` and assert the same. This will cover the configuration validation paths.

Suggested implementation:

```python
        trigger.initialize_misp_client()

        with pytest.raises(Exception, match="Network error"):
            trigger.fetch_attributes("1")

    # ------------------------------------------------------------------ #
    # Sekoia push
    # ------------------------------------------------------------------ #

    def test_push_to_sekoia_empty(self, trigger):
        """Test that empty list doesn't make any requests."""
        trigger.push_to_sekoia([])
        trigger.log.assert_called()

    @patch("misp.trigger_misp_ids_attributes_to_ioc_collection.requests.post")
    def test_push_to_sekoia_missing_api_key(self, mock_post, trigger):
        """Test that missing sekoia_api_key prevents any API call."""
        # Remove/override the API key to trigger the guard branch
        trigger.module.configuration.pop("sekoia_api_key", None)

        trigger.push_to_sekoia(["1.1.1.1"])

        mock_post.assert_not_called()
        # Ensure an error was logged about the missing configuration
        trigger.log.error.assert_called()

    @patch("misp.trigger_misp_ids_attributes_to_ioc_collection.requests.post")
    def test_push_to_sekoia_missing_ioc_collection_uuid(self, mock_post, trigger):
        """Test that missing ioc_collection_uuid prevents any API call."""
        # Remove/override the ioc collection UUID to trigger the guard branch
        trigger.configuration.pop("ioc_collection_uuid", None)

        trigger.push_to_sekoia(["1.1.1.1"])

        mock_post.assert_not_called()
        # Ensure an error was logged about the missing configuration
        trigger.log.error.assert_called()

    @patch("misp.trigger_misp_ids_attributes_to_ioc_collection.requests.post")
    def test_push_to_sekoia_success(self, mock_post, trigger):
        resp = Mock(status_code=200)
        resp.json.return_value = {"created": 1, "updated": 0, "ignored": 0}
        mock_post.return_value = resp

        trigger.push_to_sekoia(["1.1.1.1", "evil.com"])

        payload = mock_post.call_args.kwargs["json"]
        assert payload["format"] == "one_per_line"
        assert "1.1.1.1" in payload["indicators"]

```

If `trigger.log` is not a `Mock` with an `.error` attribute in your current tests, you may need to ensure the `trigger` fixture configures `trigger.log` as a mock or spy that exposes `.error`. Adjust the assertion (e.g., `trigger.log.assert_called()` with argument inspection) if your logging abstraction differs.
</issue_to_address>

### Comment 5
<location> `MISP/tests/test_trigger_misp_ids_to_ioc_collection.py:206-215` </location>
<code_context>
+        trigger.push_to_sekoia([])
+        trigger.log.assert_called()
+
+    @patch("misp.trigger_misp_ids_attributes_to_ioc_collection.requests.post")
+    def test_push_to_sekoia_success(self, mock_post, trigger):
+        resp = Mock(status_code=200)
+        resp.json.return_value = {"created": 1, "updated": 0, "ignored": 0}
+        mock_post.return_value = resp
+
+        trigger.push_to_sekoia(["1.1.1.1", "evil.com"])
+
+        payload = mock_post.call_args.kwargs["json"]
+        assert payload["format"] == "one_per_line"
+        assert "1.1.1.1" in payload["indicators"]
+
+    @patch("misp.trigger_misp_ids_attributes_to_ioc_collection.requests.post")
</code_context>

<issue_to_address>
**suggestion (testing):** Consider adding a test that validates batching behavior when pushing more than 1000 IOCs.

Currently tests only cover small inputs, so the batching branch (chunks of 1000) isn’t exercised. Please add a test (e.g. `test_push_to_sekoia_batches_large_input`) that sends 1500 mock IOCs and asserts `requests.post` is called twice, with payloads of 1000 and 500 indicators respectively, to ensure batching isn’t accidentally broken later.

Suggested implementation:

```python
        payload = mock_post.call_args.kwargs["json"]
        assert payload["format"] == "one_per_line"
        assert "1.1.1.1" in payload["indicators"]

    @patch("misp.trigger_misp_ids_attributes_to_ioc_collection.requests.post")
    def test_push_to_sekoia_batches_large_input(self, mock_post, trigger):
        """Test that IOCs are sent in batches of 1000 indicators."""
        resp = Mock(status_code=200)
        resp.json.return_value = {"created": 1000, "updated": 0, "ignored": 0}
        mock_post.return_value = resp

        # Create 1500 mock indicators
        indicators = [f"1.1.1.{i}" for i in range(1500)]

        trigger.push_to_sekoia(indicators)

        # Expect two POST calls: one with 1000 indicators, one with 500
        assert mock_post.call_count == 2

        first_payload = mock_post.call_args_list[0].kwargs["json"]
        second_payload = mock_post.call_args_list[1].kwargs["json"]

        assert first_payload["format"] == "one_per_line"
        assert second_payload["format"] == "one_per_line"

        assert len(first_payload["indicators"]) == 1000
        assert len(second_payload["indicators"]) == 500

        # Sanity check that all indicators are present across batches
        combined = first_payload["indicators"] + second_payload["indicators"]
        assert set(combined) == set(indicators)

```

This assumes that `push_to_sekoia` batches the provided list in contiguous chunks of 1000, using `requests.post(..., json=payload)` and preserving the `"format": "one_per_line"` field for each batch. If the batching logic uses a different chunk size or a different field name for the indicators array, adjust the expected lengths and `"indicators"` key accordingly.
</issue_to_address>

### Comment 6
<location> `MISP/tests/test_trigger_misp_ids_to_ioc_collection.py:12-21` </location>
<code_context>
+        assert mock_post.call_count == 3  # max_retries
+        trigger.log.assert_called()
+
+    # ------------------------------------------------------------------ #
+    # Run loop
+    # ------------------------------------------------------------------ #
+
</code_context>

<issue_to_address>
**suggestion (testing):** Add tests for `run` configuration validation branches (missing sekoia_api_key, ioc_collection_uuid, misp_url, misp_api_key).

The early-return branches in `run` for missing `sekoia_api_key`, `ioc_collection_uuid`, `misp_url`, and `misp_api_key` aren’t covered yet because all current tests use a valid configuration. Please add tests that, for each parameter: clear it, invoke `trigger.run()`, and assert the expected error log is produced and that neither `initialize_misp_client` nor `requests.post` are called. This will lock in the validation behavior and prevent regressions.
</issue_to_address>

### Comment 7
<location> `MISP/tests/test_trigger_misp_ids_to_ioc_collection.py:79-88` </location>
<code_context>
+    @patch("misp.trigger_misp_ids_attributes_to_ioc_collection.PyMISP")
</code_context>

<issue_to_address>
**suggestion (testing):** Add a test case to verify that the deduplication cache (`processed_attributes`) prevents re-sending the same attribute UUID across runs.

Current tests only cover new attributes. Please add a test where `misp.search` returns the same attribute UUID in two consecutive loop iterations (using the `running` PropertyMock to control the loop). Assert that `push_to_sekoia` (or `requests.post`) is called only on the first iteration, confirming that `processed_attributes` correctly skips already-seen UUIDs.

Suggested implementation:

```python
    @patch("misp.trigger_misp_ids_attributes_to_ioc_collection.PyMISP")
    def test_initialize_misp_client_success(self, mock_pymisp, trigger):
        misp = Mock()
        mock_pymisp.return_value = misp

        trigger.initialize_misp_client()

        assert trigger.misp_client == misp
        mock_pymisp.assert_called_once_with(
            url="https://misp.example.com",
            key="test_misp_api_key",
        )

    @patch("misp.trigger_misp_ids_attributes_to_ioc_collection.requests.post")
    @patch("misp.trigger_misp_ids_attributes_to_ioc_collection.PyMISP")
    def test_deduplication_skips_already_processed_attribute_uuids(
        self, mock_pymisp, mock_post, trigger
    ):
        """
        Ensure that the deduplication cache (processed_attributes) prevents
        re-sending the same attribute UUID across loop iterations.
        """
        misp_client = Mock()
        mock_pymisp.return_value = misp_client

        # Same attribute returned on two consecutive loop iterations
        attribute = {
            "uuid": "attribute-uuid-1",
            "type": "ip-src",
            "value": "1.2.3.4",
        }
        misp_client.search.side_effect = [
            {"Attribute": [attribute]},  # first loop iteration
            {"Attribute": [attribute]},  # second loop iteration, same UUID
        ]

        # First two iterations run, third stops the loop
        type(trigger).running = PropertyMock(side_effect=[True, True, False])

        # Run the trigger loop
        trigger.run()

        # Ensure we only send the attribute once despite it appearing twice
        assert mock_post.call_count == 1

```

1. Ensure `PropertyMock` is imported at the top of `MISP/tests/test_trigger_misp_ids_to_ioc_collection.py`:
   - `from unittest.mock import Mock, patch, PropertyMock` (or equivalent).
2. This test assumes:
   - The trigger loop is started with `trigger.run()`.
   - The loop condition reads a `running` property on the trigger instance/class.
   - The production code uses `requests.post` in `misp.trigger_misp_ids_attributes_to_ioc_collection` to push IOCs.
   If any of these differ (e.g. a different loop entry point or a `push_to_sekoia` method wrapper), adjust the patched path and the method called in the test accordingly (e.g. patch `...push_to_sekoia` and assert `mock_push.call_count == 1` instead of `requests.post`).
</issue_to_address>

### Comment 8
<location> `MISP/misp/trigger_misp_ids_attributes_to_ioc_collection.py:11` </location>
<code_context>
+from sekoia_automation.trigger import Trigger
+
+
+class MISPIDSAttributesToIOCCollectionTrigger(Trigger):
+    """
+    Trigger to retrieve IDS-flagged attributes from MISP and push them
</code_context>

<issue_to_address>
**issue (complexity):** Consider refactoring this trigger by extracting small helper methods and shared constants to centralize configuration checks, deduplication, batching/retry logic, and supported type handling while removing unused code.

You can reduce complexity and duplication without changing behavior by extracting a few focused helpers and constants.

### 1. Centralize configuration validation & initialization

Move the config checks out of `run` and reuse them instead of re‑checking in `push_to_sekoia`:

```python
class MISPIDSAttributesToIOCCollectionTrigger(Trigger):
    # at class level
    REQUIRED_TRIGGER_CONFIG = ("ioc_collection_uuid",)
    REQUIRED_MODULE_CONFIG = ("misp_url", "misp_api_key", "sekoia_api_key")

    def _validate_configuration(self) -> bool:
        missing = []
        if not self.ioc_collection_uuid:
            missing.append("ioc_collection_uuid")
        if not self.module.configuration.get("misp_url"):
            missing.append("misp_url")
        if not self.module.configuration.get("misp_api_key"):
            missing.append("misp_api_key")
        if not self.sekoia_api_key:
            missing.append("sekoia_api_key")

        if missing:
            for field in missing:
                self.log(message=f"Missing required parameter: {field}", level="error")
            return False
        return True

    def _initialize_components(self) -> bool:
        try:
            self.initialize_misp_client()
            self.initialize_cache()
            return True
        except Exception as error:
            self.log(message=f"Failed to initialize trigger: {error}", level="error")
            self.log(message=format_exc(), level="error")
            return False
```

Then simplify `run`:

```python
def run(self):
    self.log(
        message="Starting MISP IDS Attributes to IOC Collection trigger",
        level="info",
    )

    if not self._validate_configuration():
        return
    if not self._initialize_components():
        return

    while self.running:
        try:
            self._process_cycle()
        except KeyboardInterrupt:
            self.log(message="Trigger stopped by user", level="info")
            break
        except Exception as error:
            self.log(message=f"Error in trigger loop: {error}", level="error")
            self.log(message=format_exc(), level="error")
            time.sleep(60)

    self.log(
        message="MISP IDS Attributes to IOC Collection trigger stopped",
        level="info",
    )
```

And move the body of one iteration into `_process_cycle`:

```python
def _process_cycle(self):
    attributes = self.fetch_attributes(self.publish_timestamp)
    supported_attributes = self.filter_supported_types(attributes)
    new_attributes = self._get_new_attributes(supported_attributes)

    if new_attributes:
        self.log(
            message=f"Found {len(new_attributes)} new IOCs to process",
            level="info",
        )
        ioc_values = [self.extract_ioc_value(attr) for attr in new_attributes]
        self.push_to_sekoia(ioc_values)
        self._mark_processed(new_attributes)
        self.log(
            message=f"Successfully processed {len(new_attributes)} new IOCs",
            level="info",
        )
    else:
        self.log(message="No new IOCs to process", level="info")

    self.log(message=f"Sleeping for {self.sleep_time} seconds", level="info")
    time.sleep(self.sleep_time)
```

### 2. Extract dedup helpers

Move the dedup logic out of the loop:

```python
def _get_new_attributes(self, attributes):
    return [attr for attr in attributes if attr.uuid not in self.processed_attributes]

def _mark_processed(self, attributes):
    for attr in attributes:
        self.processed_attributes[attr.uuid] = True
```

### 3. Simplify type handling

Avoid rebuilding the supported types list every call and reduce branching in `extract_ioc_value`:

```python
# at module or class level
SUPPORTED_TYPES = {
    "ip-dst",
    "domain",
    "url",
    "sha256",
    "md5",
    "sha1",
    "ip-dst|port",
    "domain|ip",
    "filename|sha256",
    "filename|md5",
    "filename|sha1",
}

COMPOSITE_SPLIT_INDEX = {
    "filename|sha256": 1,
    "filename|md5": 1,
    "filename|sha1": 1,
    "ip-dst|port": 0,
    "domain|ip": 0,
}

def filter_supported_types(self, attributes):
    filtered = [attr for attr in attributes if attr.type in SUPPORTED_TYPES]
    self.log(
        message=f"Filtered to {len(filtered)} supported attributes (from {len(attributes)} total)",
        level="info",
    )
    return filtered

def extract_ioc_value(self, attribute):
    value = attribute.value
    if "|" in value and attribute.type in COMPOSITE_SPLIT_INDEX:
        idx = COMPOSITE_SPLIT_INDEX[attribute.type]
        return value.split("|", 1)[idx]
    return value
```

### 4. Split push logic into batch + retry helpers

Keep `push_to_sekoia` as orchestration and move retry logic out:

```python
def push_to_sekoia(self, ioc_values):
    if not ioc_values:
        self.log(message="No IOC values to push", level="info")
        return

    # assume configuration already validated
    batch_size = 1000
    total_batches = (len(ioc_values) + batch_size - 1) // batch_size
    self.log(
        message=f"Pushing {len(ioc_values)} IOCs in {total_batches} batch(es)",
        level="info",
    )

    for batch_num, i in enumerate(range(0, len(ioc_values), batch_size), 1):
        batch = ioc_values[i : i + batch_size]
        self._push_batch(batch, batch_num, total_batches)

def _push_batch(self, batch, batch_num, total_batches):
    indicators_text = "\n".join(batch)
    url = (
        f"{self.ioc_collection_server}/v2/inthreat/ioc-collections/"
        f"{self.ioc_collection_uuid}/indicators/text"
    )
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {self.sekoia_api_key}",
    }
    payload = {"indicators": indicators_text, "format": "one_per_line"}

    result = self._post_with_retry(url, payload, headers)
    if result is not None:
        self.log(
            message=(
                f"Batch {batch_num}/{total_batches} pushed successfully: "
                f"{result.get('created', 0)} created, "
                f"{result.get('updated', 0)} updated, "
                f"{result.get('ignored', 0)} ignored"
            ),
            level="info",
        )
```

And encapsulate the existing retry/status handling in `_post_with_retry` (reusing your current branching):

```python
def _post_with_retry(self, url, payload, headers):
    retry_count = 0
    max_retries = 3

    while retry_count < max_retries:
        try:
            response = requests.post(url, json=payload, headers=headers, timeout=30)

            if response.status_code == 200:
                return response.json()

            if response.status_code == 429:
                retry_after = response.headers.get("Retry-After")
                wait_time = int(retry_after) if retry_after else 2**retry_count * 10
                self.log(
                    message=f"Rate limited. Waiting {wait_time} seconds...",
                    level="info",
                )
                time.sleep(wait_time)
                retry_count += 1
                continue

            if response.status_code in (401, 403):
                self.log(
                    message=f"Authentication error: {response.status_code} - {response.text}",
                    level="error",
                )
                raise Exception(f"Sekoia API authentication error: {response.status_code}")

            if response.status_code == 404:
                self.log(
                    message=f"IOC Collection not found: {response.status_code} - {response.text}",
                    level="error",
                )
                raise Exception(f"IOC Collection not found: {self.ioc_collection_uuid}")

            self.log(
                message=f"Error {response.status_code}: {response.text}",
                level="error",
            )
            retry_count += 1
            time.sleep(5)

        except requests.exceptions.Timeout:
            self.log(message="Request timeout", level="error")
            retry_count += 1
            time.sleep(5)
        except requests.exceptions.RequestException as error:
            self.log(message=f"Request error: {error}", level="error")
            retry_count += 1
            time.sleep(5)

    self.log(
        message=f"Failed to push IOCs after {max_retries} retries",
        level="error",
    )
    return None
```

### 5. Remove unused `FORMAT`

`FORMAT` in `__init__` is unused and can be dropped, or reused to configure logging globally elsewhere. Removing it in this class reduces noise:

```python
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.misp_client = None
    self.processed_attributes = None
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@socket-security
Copy link

socket-security bot commented Jan 13, 2026

All alerts resolved. Learn more about Socket for GitHub.

This PR previously contained dependency changes with security issues that have been resolved, removed, or ignored.

View full report

@Imothep-Akonis
Copy link
Author

  • The MISPIDSAttributesToIOCCollectionTrigger mixes use of module.configuration and configuration for settings (and even keeps a commented-out sekoia_api_key source); it would be clearer and less error-prone to consistently read all parameters from a single configuration source and remove dead code.

No, it's clearly a relevant but out of context suggestion, since that mechanism is inherited from sekoia_automation framework.
I have removed a comment though:
#return self.configuration.get("sekoia_api_key", "")

@Imothep-Akonis
Copy link
Author

  • In trigger_misp_ids_attributes_to_ioc_collection.py, the os import and the FORMAT constant are never used; consider removing them to keep the module lean.
  • You added tenacity as a dependency but implemented custom retry logic with while retry_count < max_retries; either refactor to use tenacity or drop the unused dependency to avoid maintenance overhead.

I've successfully cleaned up the module:

Changes made:

  1. Removed unused os import from [trigger_misp_ids_attributes_to_ioc_collection.py]
  2. Removed unused FORMAT constant from the init method (was at line 20)
  3. Removed tenacity dependency from [pyproject.toml] since custom retry logic is implemented using while retry_count < max_retries pattern

The module is now leaner with no unused imports or dependencies. The custom retry logic in the push_to_sekoia method handles rate limiting and transient errors without needing the tenacity library.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant