Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Sekoia.io/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

## 2026-02-10 - 2.69.1

### Added

- Add optional `indicator` field in `AddIOCtoIOCCollectionAction`
## 2026-02-25 - 2.70.0

### Added
Expand Down
9 changes: 9 additions & 0 deletions Sekoia.io/action_add_ioc_to_ioc_collection.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
"arguments": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"indicator": {
"description": "Single indicator to add to an IOC collection.",
"type": "string"
},
"indicators": {
"description": "List of indicators to add to an IOC collection",
"type": "array"
Expand Down Expand Up @@ -33,6 +37,11 @@
}
},
"oneOf": [
{
"required": [
"indicator"
]
},
{
"required": [
"indicators"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import requests
import ipaddress
from datetime import datetime, timedelta

from .base import InThreatBaseAction
import requests

from sekoiaio.utils import datetime_to_str

from .base import InThreatBaseAction


class AddIOCtoIOCCollectionAction(InThreatBaseAction):
def perform_request(self, indicators, ioc_collection_id, indicator_type, valid_for):
Expand Down Expand Up @@ -50,18 +52,21 @@ def run(self, arguments: dict):
"hash": "file.hashes",
}

indicators = self.json_argument("indicators", arguments)
indicators = self.json_argument("indicators", arguments, required=False)
single_indicator = arguments.get("indicator")
ioc_collection_id = arguments.get("ioc_collection_id")
indicator_type = arguments.get("indicator_type")
valid_for = int(arguments.get("valid_for", 0))

result_indicators = indicators or [single_indicator]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: Guard against both indicators and single_indicator being missing to avoid passing [None] downstream.

When both are absent, result_indicators becomes [None] and is passed to add_IP_action/perform_request, likely causing confusing downstream errors. Add an explicit guard, for example:

if not indicators and not single_indicator:
    raise ValueError("You must provide either 'indicators' or 'indicator'")

result_indicators = indicators or [single_indicator]


if str(indicator_type) == "IP address":
if not isinstance(indicators, list):
raise ValueError("Indicators should be list type")
if not isinstance(indicators, list) and not single_indicator:
raise ValueError("Indicators should be list type, or you should provide a single indicator value")

self.add_IP_action(indicators, ioc_collection_id, valid_for)
self.add_IP_action(result_indicators, ioc_collection_id, valid_for)
Comment on lines 63 to +67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Align validation error message with the actual accepted shapes and ensure list type is also enforced when a single indicator is supplied as a list.

The current condition (not isinstance(indicators, list) and not single_indicator) permits non-list indicators whenever single_indicator is set, which conflicts with the message and weakens type guarantees. If indicators should always be a list when present, you could instead enforce:

if indicators is not None and not isinstance(indicators, list):
    raise ValueError("'indicators' must be a list; alternatively, use 'indicator' for a single value")

Then rely on the normalization of result_indicators to handle cases where both fields are provided.

Suggested change
if str(indicator_type) == "IP address":
if not isinstance(indicators, list):
raise ValueError("Indicators should be list type")
if not isinstance(indicators, list) and not single_indicator:
raise ValueError("Indicators should be list type, or you should provide a single indicator value")
self.add_IP_action(indicators, ioc_collection_id, valid_for)
self.add_IP_action(result_indicators, ioc_collection_id, valid_for)
if str(indicator_type) == "IP address":
if indicators is not None and not isinstance(indicators, list):
raise ValueError("'indicators' must be a list; alternatively, use 'indicator' for a single value")
self.add_IP_action(result_indicators, ioc_collection_id, valid_for)

else:
if _type := indicator_type_mapping.get(str(indicator_type)):
self.perform_request(indicators, ioc_collection_id, _type, valid_for)
self.perform_request(result_indicators, ioc_collection_id, _type, valid_for)
else:
self.error(f"Improper indicator type {indicator_type}")
126 changes: 126 additions & 0 deletions Sekoia.io/tests/test_add_ioc_2_ioc_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,132 @@ def test_add_ioc_incorrect_type(arguments_invalid_type):
assert action._error is not None


@pytest.fixture
def arguments_single_indicator_ipv4():
return {
"indicator": "8.8.8.8",
"ioc_collection_id": "ioc-collection--00000000-0000-0000-0000-000000000000",
"indicator_type": "IP address",
}


@pytest.fixture
def arguments_single_indicator_ipv6():
return {
"indicator": "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
"ioc_collection_id": "ioc-collection--00000000-0000-0000-0000-000000000000",
"indicator_type": "IP address",
}


@pytest.fixture
def arguments_single_indicator_domain():
return {
"indicator": "www.sekoia.io",
"ioc_collection_id": "ioc-collection--00000000-0000-0000-0000-000000000000",
"indicator_type": "domain",
}


@pytest.fixture
def arguments_single_indicator_with_valid_for():
return {
"indicator": "8.8.8.8",
"ioc_collection_id": "ioc-collection--00000000-0000-0000-0000-000000000000",
"indicator_type": "IP address",
"valid_for": "30",
}


@pytest.fixture
def arguments_no_indicators():
return {
"ioc_collection_id": "ioc-collection--00000000-0000-0000-0000-000000000000",
"indicator_type": "IP address",
}


def test_add_ioc_single_indicator_ipv4(arguments_single_indicator_ipv4):
action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction()
action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"}

response = {"task_id": "00000000-0000-0000-0000-000000000000"}
with requests_mock.Mocker() as mock:
mock.post(
"http://fake.url/api/v2/inthreat/ioc-collections/ioc-collection--00000000-0000-0000-0000-000000000000/indicators/text",
json=response,
)
action.run(arguments_single_indicator_ipv4)

history = mock.request_history
assert mock.call_count == 1
assert history[0].method == "POST"
assert "8.8.8.8" in history[0].text


def test_add_ioc_single_indicator_ipv6(arguments_single_indicator_ipv6):
action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction()
action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"}

response = {"task_id": "00000000-0000-0000-0000-000000000000"}
with requests_mock.Mocker() as mock:
mock.post(
"http://fake.url/api/v2/inthreat/ioc-collections/ioc-collection--00000000-0000-0000-0000-000000000000/indicators/text",
json=response,
)
action.run(arguments_single_indicator_ipv6)

history = mock.request_history
assert mock.call_count == 1
assert history[0].method == "POST"
assert "2001:0db8:85a3:0000:0000:8a2e:0370:7334" in history[0].text


def test_add_ioc_single_indicator_domain(arguments_single_indicator_domain):
action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction()
action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"}

response = {"task_id": "00000000-0000-0000-0000-000000000000"}
with requests_mock.Mocker() as mock:
mock.post(
"http://fake.url/api/v2/inthreat/ioc-collections/ioc-collection--00000000-0000-0000-0000-000000000000/indicators/text",
json=response,
)
action.run(arguments_single_indicator_domain)

history = mock.request_history
assert mock.call_count == 1
assert history[0].method == "POST"


def test_add_ioc_single_indicator_with_valid_for(arguments_single_indicator_with_valid_for):
action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction()
action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"}

response = {"task_id": "00000000-0000-0000-0000-000000000000"}
with requests_mock.Mocker() as mock:
mock.post(
"http://fake.url/api/v2/inthreat/ioc-collections/ioc-collection--00000000-0000-0000-0000-000000000000/indicators/text",
json=response,
)
action.run(arguments_single_indicator_with_valid_for)

history = mock.request_history
assert mock.call_count == 1
assert history[0].method == "POST"
assert "valid_until" in history[0].text


def test_add_ioc_no_indicators_raises_error(arguments_no_indicators):
action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction()
action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"}

with pytest.raises(
ValueError, match="Indicators should be list type, or you should provide a single indicator value"
):
action.run(arguments_no_indicators)


def test_add_ioc_failed(arguments_http_error):
action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction()
action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"}
Expand Down