Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Sekoia.io/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

## 2026-02-10 - 2.69.1

### Added

- Add optional `indicator` field in `AddIOCtoIOCCollectionAction`

## 2026-02-10 - 2.69.0

### Fixed
Expand Down
9 changes: 9 additions & 0 deletions Sekoia.io/action_add_ioc_to_ioc_collection.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
"arguments": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"indicator": {
"description": "Single indicator to add to an IOC collection.",
"type": "string"
},
"indicators": {
"description": "List of indicators to add to an IOC collection",
"type": "array"
Expand Down Expand Up @@ -33,6 +37,11 @@
}
},
"oneOf": [
{
"required": [
"indicator"
]
},
{
"required": [
"indicators"
Expand Down
2 changes: 1 addition & 1 deletion Sekoia.io/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"name": "Sekoia.io",
"uuid": "92d8bb47-7c51-445d-81de-ae04edbb6f0a",
"slug": "sekoia.io",
"version": "2.69.0",
"version": "2.69.1",
"categories": [
"Generic"
]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import requests
import ipaddress
from datetime import datetime, timedelta

from .base import InThreatBaseAction
import requests

from sekoiaio.utils import datetime_to_str

from .base import InThreatBaseAction


class AddIOCtoIOCCollectionAction(InThreatBaseAction):
def perform_request(self, indicators, ioc_collection_id, indicator_type, valid_for):
Expand Down Expand Up @@ -50,18 +52,21 @@ def run(self, arguments: dict):
"hash": "file.hashes",
}

indicators = self.json_argument("indicators", arguments)
indicators = self.json_argument("indicators", arguments, required=False)
single_indicator = arguments.get("indicator")
ioc_collection_id = arguments.get("ioc_collection_id")
indicator_type = arguments.get("indicator_type")
valid_for = int(arguments.get("valid_for", 0))

result_indicators = indicators or [single_indicator]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: Guard against both indicators and single_indicator being missing to avoid passing [None] downstream.

When both are absent, result_indicators becomes [None] and is passed to add_IP_action/perform_request, likely causing confusing downstream errors. Add an explicit guard, for example:

if not indicators and not single_indicator:
    raise ValueError("You must provide either 'indicators' or 'indicator'")

result_indicators = indicators or [single_indicator]


if str(indicator_type) == "IP address":
if not isinstance(indicators, list):
raise ValueError("Indicators should be list type")
if not isinstance(indicators, list) and not single_indicator:
raise ValueError("Indicators should be list type, or you should provide a single indicator value")

self.add_IP_action(indicators, ioc_collection_id, valid_for)
self.add_IP_action(result_indicators, ioc_collection_id, valid_for)
Comment on lines 63 to +67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Align validation error message with the actual accepted shapes and ensure list type is also enforced when a single indicator is supplied as a list.

The current condition (not isinstance(indicators, list) and not single_indicator) permits non-list indicators whenever single_indicator is set, which conflicts with the message and weakens type guarantees. If indicators should always be a list when present, you could instead enforce:

if indicators is not None and not isinstance(indicators, list):
    raise ValueError("'indicators' must be a list; alternatively, use 'indicator' for a single value")

Then rely on the normalization of result_indicators to handle cases where both fields are provided.

Suggested change
if str(indicator_type) == "IP address":
if not isinstance(indicators, list):
raise ValueError("Indicators should be list type")
if not isinstance(indicators, list) and not single_indicator:
raise ValueError("Indicators should be list type, or you should provide a single indicator value")
self.add_IP_action(indicators, ioc_collection_id, valid_for)
self.add_IP_action(result_indicators, ioc_collection_id, valid_for)
if str(indicator_type) == "IP address":
if indicators is not None and not isinstance(indicators, list):
raise ValueError("'indicators' must be a list; alternatively, use 'indicator' for a single value")
self.add_IP_action(result_indicators, ioc_collection_id, valid_for)

else:
if _type := indicator_type_mapping.get(str(indicator_type)):
self.perform_request(indicators, ioc_collection_id, _type, valid_for)
self.perform_request(result_indicators, ioc_collection_id, _type, valid_for)
else:
self.error(f"Improper indicator type {indicator_type}")
126 changes: 126 additions & 0 deletions Sekoia.io/tests/test_add_ioc_2_ioc_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,132 @@ def test_add_ioc_incorrect_type(arguments_invalid_type):
assert action._error is not None


@pytest.fixture
def arguments_single_indicator_ipv4():
return {
"indicator": "8.8.8.8",
"ioc_collection_id": "ioc-collection--00000000-0000-0000-0000-000000000000",
"indicator_type": "IP address",
}


@pytest.fixture
def arguments_single_indicator_ipv6():
return {
"indicator": "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
"ioc_collection_id": "ioc-collection--00000000-0000-0000-0000-000000000000",
"indicator_type": "IP address",
}


@pytest.fixture
def arguments_single_indicator_domain():
return {
"indicator": "www.sekoia.io",
"ioc_collection_id": "ioc-collection--00000000-0000-0000-0000-000000000000",
"indicator_type": "domain",
}


@pytest.fixture
def arguments_single_indicator_with_valid_for():
return {
"indicator": "8.8.8.8",
"ioc_collection_id": "ioc-collection--00000000-0000-0000-0000-000000000000",
"indicator_type": "IP address",
"valid_for": "30",
}


@pytest.fixture
def arguments_no_indicators():
return {
"ioc_collection_id": "ioc-collection--00000000-0000-0000-0000-000000000000",
"indicator_type": "IP address",
}


def test_add_ioc_single_indicator_ipv4(arguments_single_indicator_ipv4):
action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction()
action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"}

response = {"task_id": "00000000-0000-0000-0000-000000000000"}
with requests_mock.Mocker() as mock:
mock.post(
"http://fake.url/api/v2/inthreat/ioc-collections/ioc-collection--00000000-0000-0000-0000-000000000000/indicators/text",
json=response,
)
action.run(arguments_single_indicator_ipv4)

history = mock.request_history
assert mock.call_count == 1
assert history[0].method == "POST"
assert "8.8.8.8" in history[0].text


def test_add_ioc_single_indicator_ipv6(arguments_single_indicator_ipv6):
action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction()
action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"}

response = {"task_id": "00000000-0000-0000-0000-000000000000"}
with requests_mock.Mocker() as mock:
mock.post(
"http://fake.url/api/v2/inthreat/ioc-collections/ioc-collection--00000000-0000-0000-0000-000000000000/indicators/text",
json=response,
)
action.run(arguments_single_indicator_ipv6)

history = mock.request_history
assert mock.call_count == 1
assert history[0].method == "POST"
assert "2001:0db8:85a3:0000:0000:8a2e:0370:7334" in history[0].text


def test_add_ioc_single_indicator_domain(arguments_single_indicator_domain):
action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction()
action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"}

response = {"task_id": "00000000-0000-0000-0000-000000000000"}
with requests_mock.Mocker() as mock:
mock.post(
"http://fake.url/api/v2/inthreat/ioc-collections/ioc-collection--00000000-0000-0000-0000-000000000000/indicators/text",
json=response,
)
action.run(arguments_single_indicator_domain)

history = mock.request_history
assert mock.call_count == 1
assert history[0].method == "POST"


def test_add_ioc_single_indicator_with_valid_for(arguments_single_indicator_with_valid_for):
action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction()
action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"}

response = {"task_id": "00000000-0000-0000-0000-000000000000"}
with requests_mock.Mocker() as mock:
mock.post(
"http://fake.url/api/v2/inthreat/ioc-collections/ioc-collection--00000000-0000-0000-0000-000000000000/indicators/text",
json=response,
)
action.run(arguments_single_indicator_with_valid_for)

history = mock.request_history
assert mock.call_count == 1
assert history[0].method == "POST"
assert "valid_until" in history[0].text


def test_add_ioc_no_indicators_raises_error(arguments_no_indicators):
action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction()
action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"}

with pytest.raises(
ValueError, match="Indicators should be list type, or you should provide a single indicator value"
):
action.run(arguments_no_indicators)


def test_add_ioc_failed(arguments_http_error):
action: AddIOCtoIOCCollectionAction = AddIOCtoIOCCollectionAction()
action.module.configuration = {"base_url": "http://fake.url/", "api_key": "fake_api_key"}
Expand Down