diff --git a/Sekoia.io/CHANGELOG.md b/Sekoia.io/CHANGELOG.md index 82ba97cc3..2170af7fe 100644 --- a/Sekoia.io/CHANGELOG.md +++ b/Sekoia.io/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +## 2026-02-10 - 2.69.1 + +### Added + +- Add optional `indicator` field in `AddIOCtoIOCCollectionAction` ## 2026-02-25 - 2.70.0 ### Added diff --git a/Sekoia.io/action_add_ioc_to_ioc_collection.json b/Sekoia.io/action_add_ioc_to_ioc_collection.json index 7b040e5d8..15af7b041 100644 --- a/Sekoia.io/action_add_ioc_to_ioc_collection.json +++ b/Sekoia.io/action_add_ioc_to_ioc_collection.json @@ -2,6 +2,10 @@ "arguments": { "$schema": "http://json-schema.org/draft-07/schema#", "properties": { + "indicator": { + "description": "Single indicator to add to an IOC collection.", + "type": "string" + }, "indicators": { "description": "List of indicators to add to an IOC collection", "type": "array" @@ -33,6 +37,11 @@ } }, "oneOf": [ + { + "required": [ + "indicator" + ] + }, { "required": [ "indicators" diff --git a/Sekoia.io/sekoiaio/intelligence_center/add_ioc_to_ioc_collection.py b/Sekoia.io/sekoiaio/intelligence_center/add_ioc_to_ioc_collection.py index a5a39ee2b..411732dff 100644 --- a/Sekoia.io/sekoiaio/intelligence_center/add_ioc_to_ioc_collection.py +++ b/Sekoia.io/sekoiaio/intelligence_center/add_ioc_to_ioc_collection.py @@ -1,10 +1,12 @@ -import requests import ipaddress from datetime import datetime, timedelta -from .base import InThreatBaseAction +import requests + from sekoiaio.utils import datetime_to_str +from .base import InThreatBaseAction + class AddIOCtoIOCCollectionAction(InThreatBaseAction): def perform_request(self, indicators, ioc_collection_id, indicator_type, valid_for): @@ -50,18 +52,21 @@ def run(self, arguments: dict): "hash": "file.hashes", } - indicators = self.json_argument("indicators", arguments) + indicators = self.json_argument("indicators", arguments, required=False) + single_indicator = arguments.get("indicator") ioc_collection_id = arguments.get("ioc_collection_id") indicator_type = arguments.get("indicator_type") valid_for = int(arguments.get("valid_for", 0)) + result_indicators = indicators or [single_indicator] + if str(indicator_type) == "IP address": - if not isinstance(indicators, list): - raise ValueError("Indicators should be list type") + if not isinstance(indicators, list) and not single_indicator: + raise ValueError("Indicators should be list type, or you should provide a single indicator value") - self.add_IP_action(indicators, ioc_collection_id, valid_for) + self.add_IP_action(result_indicators, ioc_collection_id, valid_for) else: if _type := indicator_type_mapping.get(str(indicator_type)): - self.perform_request(indicators, ioc_collection_id, _type, valid_for) + self.perform_request(result_indicators, ioc_collection_id, _type, valid_for) else: self.error(f"Improper indicator type {indicator_type}") diff --git a/Sekoia.io/tests/test_add_ioc_2_ioc_collection.py b/Sekoia.io/tests/test_add_ioc_2_ioc_collection.py index 7f631b423..0d549ac55 100644 --- a/Sekoia.io/tests/test_add_ioc_2_ioc_collection.py +++ b/Sekoia.io/tests/test_add_ioc_2_ioc_collection.py @@ -128,6 +128,132 @@ def test_add_ioc_incorrect_type(arguments_invalid_type): assert action._error is not None +@pytest.fixture +def arguments_single_indicator_ipv4(): + return { + "indicator": "8.8.8.8", + "ioc_collection_id": "ioc-collection--00000000-0000-0000-0000-000000000000", + "indicator_type": "IP address", + } + + +@pytest.fixture +def arguments_single_indicator_ipv6(): + return { + "indicator": "2001:0db8:85a3:0000:0000:8a2e:0370:7334", + "ioc_collection_id": "ioc-collection--00000000-0000-0000-0000-000000000000", + "indicator_type": "IP address", + } + + +@pytest.fixture +def arguments_single_indicator_domain(): + return { + "indicator": "www.sekoia.io", + "ioc_collection_id": "ioc-collection--00000000-0000-0000-0000-000000000000", + "indicator_type": "domain", + } + + +@pytest.fixture +def arguments_single_indicator_with_valid_for(): + return { + "indicator": "8.8.8.8", + "ioc_collection_id": "ioc-collection--00000000-0000-0000-0000-000000000000", + "indicator_type": "IP address", + "valid_for": "30", + } + + +@pytest.fixture +def arguments_no_indicators(): + return { + "ioc_collection_id": "ioc-collection--00000000-0000-0000-0000-000000000000", + "indicator_type": "IP address", + } + + +def test_add_ioc_single_indicator_ipv4(arguments_single_indicator_ipv4): + action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction() + action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"} + + response = {"task_id": "00000000-0000-0000-0000-000000000000"} + with requests_mock.Mocker() as mock: + mock.post( + "http://fake.url/api/v2/inthreat/ioc-collections/ioc-collection--00000000-0000-0000-0000-000000000000/indicators/text", + json=response, + ) + action.run(arguments_single_indicator_ipv4) + + history = mock.request_history + assert mock.call_count == 1 + assert history[0].method == "POST" + assert "8.8.8.8" in history[0].text + + +def test_add_ioc_single_indicator_ipv6(arguments_single_indicator_ipv6): + action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction() + action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"} + + response = {"task_id": "00000000-0000-0000-0000-000000000000"} + with requests_mock.Mocker() as mock: + mock.post( + "http://fake.url/api/v2/inthreat/ioc-collections/ioc-collection--00000000-0000-0000-0000-000000000000/indicators/text", + json=response, + ) + action.run(arguments_single_indicator_ipv6) + + history = mock.request_history + assert mock.call_count == 1 + assert history[0].method == "POST" + assert "2001:0db8:85a3:0000:0000:8a2e:0370:7334" in history[0].text + + +def test_add_ioc_single_indicator_domain(arguments_single_indicator_domain): + action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction() + action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"} + + response = {"task_id": "00000000-0000-0000-0000-000000000000"} + with requests_mock.Mocker() as mock: + mock.post( + "http://fake.url/api/v2/inthreat/ioc-collections/ioc-collection--00000000-0000-0000-0000-000000000000/indicators/text", + json=response, + ) + action.run(arguments_single_indicator_domain) + + history = mock.request_history + assert mock.call_count == 1 + assert history[0].method == "POST" + + +def test_add_ioc_single_indicator_with_valid_for(arguments_single_indicator_with_valid_for): + action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction() + action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"} + + response = {"task_id": "00000000-0000-0000-0000-000000000000"} + with requests_mock.Mocker() as mock: + mock.post( + "http://fake.url/api/v2/inthreat/ioc-collections/ioc-collection--00000000-0000-0000-0000-000000000000/indicators/text", + json=response, + ) + action.run(arguments_single_indicator_with_valid_for) + + history = mock.request_history + assert mock.call_count == 1 + assert history[0].method == "POST" + assert "valid_until" in history[0].text + + +def test_add_ioc_no_indicators_raises_error(arguments_no_indicators): + action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction() + action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"} + + with pytest.raises( + ValueError, match="Indicators should be list type, or you should provide a single indicator value" + ): + action.run(arguments_no_indicators) + + def test_add_ioc_failed(arguments_http_error): action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction() action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"}