Skip to content

Created GoogleThreatList Trigger#1945

Open
Imothep-Akonis wants to merge 14 commits intoSEKOIA-IO:developfrom
Akonis-cybersecurity:main
Open

Created GoogleThreatList Trigger#1945
Imothep-Akonis wants to merge 14 commits intoSEKOIA-IO:developfrom
Akonis-cybersecurity:main

Conversation

@Imothep-Akonis
Copy link

@Imothep-Akonis Imothep-Akonis commented Jan 21, 2026

Summary by Sourcery

Add a new Google Threat Intelligence Threat List integration that pulls IoCs from VirusTotal and pushes them to a SEKOIA.IO IOC collection, with checkpointing, metrics, and structured logging.

New Features:

  • Introduce GoogleThreatIntelligenceThreatListToIOCCollectionTrigger to retrieve IoCs from Google Threat Intelligence (via VirusTotal) and send them to a SEKOIA.IO IOC collection.
  • Support configurable threat list selection, IOC types, pagination, rate limiting handling, and advanced GTI query filtering.
  • Add checkpointing, cursor management, and basic run-loop metrics for continuous polling of threat lists.

Enhancements:

  • Implement IOC normalization, hashing-based deduplication, and batched pushes to the IOC collection with retry logic and backoff.
  • Add structured logging helpers for better observability of trigger lifecycle and error conditions.

Build:

  • Introduce a dedicated Google Threat List integration package with pyproject, Dockerfile, and dependencies for requests, cachetools, and tenacity.

Documentation:

  • Add manifest and changelog entries describing the new Google Threat List integration.

Tests:

  • Add an extensive pytest suite with fixtures and unit tests covering configuration handling, query validation, HTTP error handling, IOC transformation/deduplication, Sekoia push logic, checkpointing, and the trigger run loop.

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Jan 21, 2026

Reviewer's Guide

Introduce a new Google Threat Intelligence Threat List to IOC Collection integration, including the trigger implementation, its registration module, Docker/Poetry packaging, manifest, and an extensive pytest suite covering configuration, VirusTotal querying, deduplication, checkpointing, pushing IOCs to SEKOIA, and the trigger run loop.

Sequence diagram for VirusTotal to SEKOIA IOC Collection trigger run loop

sequenceDiagram
    actor SekoiaPlatform
    participant Module
    participant Trigger as GoogleThreatIntelligenceThreatListToIOCCollectionTrigger
    participant VirusTotalAPI
    participant SekoiaIOCApi

    SekoiaPlatform->>Module: start()
    Module->>Module: register trigger class
    Module->>Trigger: instantiate
    Module->>Trigger: run()

    Trigger->>Trigger: initialize_client()
    Trigger->>Trigger: load_metrics()
    Trigger->>Trigger: load_cursor()

    loop polling loop while running
        Trigger->>VirusTotalAPI: GET /threat_lists/{threat_list_id}/latest
        VirusTotalAPI-->>Trigger: IoCs batch (data, meta.cursor)

        Trigger->>Trigger: transform_ioc() for each IoC
        Trigger->>Trigger: filter_new_events()

        alt new IoCs to push
            Trigger->>SekoiaIOCApi: POST /ioc-collections/{uuid}/indicators/text
            SekoiaIOCApi-->>Trigger: result created/updated/ignored
        else no new IoCs
            Trigger->>Trigger: skip push
        end

        Trigger->>Trigger: save_cursor(next_cursor)
        alt no continuation cursor
            Trigger->>Trigger: sleep(sleep_time)
        else more pages
            Trigger->>Trigger: continue without sleeping
        end
    end

    Trigger-->>Module: stop
    Module-->>SekoiaPlatform: trigger stopped
Loading

Class diagram for GoogleThreatIntelligenceThreatListToIOCCollectionTrigger and related exceptions

classDiagram
    class Trigger

    class VirusTotalAPIError {
        <<exception>>
    }

    class QuotaExceededError {
        <<exception>>
        +retryable : bool
    }

    class InvalidAPIKeyError {
        <<exception>>
        +retryable : bool
    }

    class ThreatListNotFoundError {
        <<exception>>
        +retryable : bool
    }

    class QueryValidationError {
        <<exception>>
    }

    class GoogleThreatIntelligenceThreatListToIOCCollectionTrigger {
        +BASE_URL : str
        -session
        -processed_events
        +polling_frequency_hours : int
        +sleep_time : int
        +api_key : str
        +threat_list_id : str
        +ioc_types : list~str~
        +max_iocs : int
        +extra_query_params : str
        +ioc_collection_server : str
        +ioc_collection_uuid : str
        +sekoia_api_key : str
        +initialize_client() void
        +validate_threat_list_id(threat_list_id : str) bool
        +validate_query(query : str) bool
        +build_query_url(threat_list_id : str, cursor : str) str
        +fetch_events(cursor : str) dict
        +transform_ioc(ioc : dict) dict
        +filter_new_events(events : list~dict~) list~dict~
        +push_to_sekoia(ioc_values : list~str~) void
        +load_checkpoint() dict
        +save_checkpoint(checkpoint : dict) void
        +load_cursor() str
        +save_cursor(cursor : str) void
        +load_metrics() dict
        +inc_metric(name : str, value : int) void
        +log_kv(level : str, event : str, **fields) void
        +run() void
        -_validate_numeric_filter(value : str) bool
        -_make_request(url : str) dict
        -_compute_ioc_hash(ioc_type : str, value : str) str
        -_get_context_store() dict
    }

    GoogleThreatIntelligenceThreatListToIOCCollectionTrigger --|> Trigger

    QuotaExceededError --|> VirusTotalAPIError
    InvalidAPIKeyError --|> VirusTotalAPIError
    ThreatListNotFoundError --|> VirusTotalAPIError
    QueryValidationError --|> Exception
Loading

File-Level Changes

Change Details Files
Implement GoogleThreatIntelligenceThreatListToIOCCollectionTrigger to pull IoCs from VirusTotal threat lists and push them into a SEKOIA IOC Collection with validation, deduplication, checkpointing, metrics, and robust error handling.
  • Define constants for valid threat list IDs, IOC types, and query has: values used for validation.
  • Implement configuration-driven properties for polling frequency, sleep time, threat list selection, IOC types, IOC limits, extra query params, IOC collection target, and both VirusTotal and SEKOIA API keys.
  • Initialize an authenticated requests.Session client for VirusTotal with basic API key validation.
  • Add query/threat list validation helpers, including numeric filter validation and support for gti_score, positives, and has: filters combined with AND.
  • Build VirusTotal API URLs with limit, IOC type filters, extra query, and continuation cursor support.
  • Wrap HTTP GETs to VirusTotal in a tenacity retry decorator that retries on quota (429) and maps other statuses to custom exceptions.
  • Implement IOC transformation from VirusTotal schema into a normalized dict including a deterministic ioc_hash used for deduplication.
  • Maintain a TTLCache of processed IOC hashes for 7‑day deduplication and provide filtering of new events.
  • Implement push_to_sekoia that batches indicators, handles auth/4xx/5xx/429 with retries and backoff, and logs detailed outcomes.
  • Add context-based checkpointing utilities (with in-memory fallback) for cursor and metrics, plus helpers for incrementing metrics and structured key/value logging.
  • Implement the main run loop to initialize the client, resume from cursor, fetch and transform pages, dedupe and push IOCs, update cursor/metrics, handle quota/fatal/other errors, and sleep between polling cycles.
GoogleThreatList/google_threat_intelligence/trigger_google_threat_intelligence_threat_list_to_ioc_collection.py
Register the new trigger as a SEKOIA module entry point and provide integration packaging (manifest, Dockerfile, pyproject).
  • Add a main.py module that registers GoogleThreatIntelligenceThreatListToIOCCollectionTrigger under the expected trigger slug and runs the Module.
  • Define manifest.json describing the integration metadata, configuration schema (including VirusTotal and SEKOIA API keys), and docker image name.
  • Add a pyproject.toml using Poetry with runtime dependencies (sekoia-automation-sdk, requests, cachetools, pydantic, tenacity) and pytest-based dev tooling.
  • Add a Dockerfile that installs dependencies via Poetry and runs main.py in a python:3.12-slim base image.
  • Add assorted integration assets and placeholders such as CHANGELOG.md, logo.svg, .gitignore, backup manifest, lockfile, and a JSON trigger descriptor.
GoogleThreatList/main.py
GoogleThreatList/manifest.json
GoogleThreatList/pyproject.toml
GoogleThreatList/Dockerfile
GoogleThreatList/CHANGELOG.md
GoogleThreatList/logo.svg
GoogleThreatList/.gitignore
GoogleThreatList/manifest.json.bak
GoogleThreatList/poetry.lock
GoogleThreatList/trigger_google_threat_intelligence_threat_list_to_ioc_collection.json
Add an extensive pytest suite with fixtures to validate the trigger’s configuration behavior, API interactions, transformation logic, deduplication, checkpointing, push-to-SEKOIA behavior, exception classes, constants, and run loop control flow.
  • Create test_trigger_google_threat_intelligence_threat_list_to_ioc_collection.py with unit tests for initialization, configuration properties, threat list and query validation, numeric filter helper, URL building, and VirusTotal HTTP error handling.
  • Test fetch_events behavior including cursor handling, configuration validation, and logging, as well as the IOC transformation for all supported types and unknown types while preserving raw data and hash computation semantics.
  • Test deduplication via filter_new_events across multiple calls and events without ioc_hash.
  • Test push_to_sekoia’s behavior for empty input, missing configuration, handling of 2xx/4xx/5xx/429 responses, retries, and exception raising for auth/not-found/client errors.
  • Test context and checkpoint helpers, including fallback when context is missing and cursor persistence.
  • Test run loop scenarios: initialization failure, resume from cursor, no data to push, single-iteration success path, fatal/recoverable errors, KeyboardInterrupt, quota exceeded, and that IOCs are passed to push_to_sekoia.
  • Add tests for exception class retryable flags and for VALID_* constants containing expected values.
  • Provide shared pytest fixtures in conftest.py for sample IoCs, VirusTotal responses, and default module/trigger configuration.
GoogleThreatList/tests/test_trigger_google_threat_intelligence_threat_list_to_ioc_collection.py
GoogleThreatList/tests/conftest.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 10 issues, and left some high level feedback:

  • The module configuration exposes google_threat_list_url but the trigger always uses the hardcoded BASE_URL and never reads this setting; either wire this config into the trigger or remove it from manifest.json to avoid confusion.
  • The sekoia_api_key is optional in the manifest but the trigger treats its absence as an error when pushing IOCs; consider marking it as required (or clearly documenting its optionality in behavior) to align configuration with runtime expectations.
  • There is duplicated test data/fixtures for VirusTotal responses (e.g. sample_vt_response defined both in conftest.py and in the trigger test file); centralize these in conftest.py or a shared helper to reduce duplication and ease maintenance.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The module configuration exposes `google_threat_list_url` but the trigger always uses the hardcoded `BASE_URL` and never reads this setting; either wire this config into the trigger or remove it from `manifest.json` to avoid confusion.
- The `sekoia_api_key` is optional in the manifest but the trigger treats its absence as an error when pushing IOCs; consider marking it as required (or clearly documenting its optionality in behavior) to align configuration with runtime expectations.
- There is duplicated test data/fixtures for VirusTotal responses (e.g. `sample_vt_response` defined both in `conftest.py` and in the trigger test file); centralize these in `conftest.py` or a shared helper to reduce duplication and ease maintenance.

## Individual Comments

### Comment 1
<location> `GoogleThreatList/google_threat_intelligence/trigger_google_threat_intelligence_threat_list_to_ioc_collection.py:161-163` </location>
<code_context>
+        return [t for t in types if t in VALID_IOC_TYPES]
+
+    @property
+    def max_iocs(self) -> int:
+        """Get maximum number of IoCs per request."""
+        return min(int(self.configuration.get("max_iocs", 4000)), 4000)
+
+    @property
</code_context>

<issue_to_address>
**issue:** Guard `max_iocs` against non-integer configuration values to avoid `ValueError` at runtime.

Here `max_iocs` is cast directly to `int`, so a misconfigured value (e.g. "4k") will raise `ValueError` and abort on property access. Please apply the same defensive pattern as `sleep_time` (wrap the cast in `try`/`except` for `TypeError`/`ValueError` and fall back to the default) before clamping to 4000.
</issue_to_address>

### Comment 2
<location> `GoogleThreatList/google_threat_intelligence/trigger_google_threat_intelligence_threat_list_to_ioc_collection.py:87` </location>
<code_context>
+    and forward them to a SEKOIA.IO IOC Collection.
+    """
+
+    BASE_URL = "https://www.virustotal.com/api/v3"
+
+    def __init__(self, *args, **kwargs):
</code_context>

<issue_to_address>
**suggestion:** Consider wiring the base GTI URL to configuration instead of hard-coding it.

The trigger hard-codes `BASE_URL` even though the manifest already defines a configurable `google_threat_list_url`. If the GTI API base URL changes or needs to differ by environment, this would force a code change and rebuild. Reading the base URL from configuration (with a sensible default) would keep behavior consistent with the manifest and improve deployment flexibility.

Suggested implementation:

```python
class GoogleThreatIntelligenceThreatListToIOCCollectionTrigger(Trigger):
    """
    Trigger to retrieve IoCs from Google Threat Intelligence via VirusTotal API
    and forward them to a SEKOIA.IO IOC Collection.
    """

    # Default VirusTotal API base URL; can be overridden via configuration
    BASE_URL = "https://www.virustotal.com/api/v3"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.session = None
        # Allow overriding the base URL from configuration (e.g. `google_threat_list_url`)
        # while keeping a sensible default aligned with the manifest.
        self.base_url = getattr(
            self,
            "configuration",
            {},
        ).get("google_threat_list_url", self.BASE_URL)
        self.processed_events = TTLCache(maxsize=10000, ttl=604800)  # 7 days TTL

```

1. Ensure that the trigger code uses `self.base_url` instead of `self.BASE_URL` wherever HTTP requests to the GTI/VirusTotal API are made (e.g., when building endpoint URLs).
2. Confirm that the manifest option key is indeed `google_threat_list_url`; if it differs, adjust the dictionary key used in `self.configuration.get(...)` accordingly.
3. If the `Trigger` base class exposes configuration via a different attribute or accessor (for example `self.config` or `self.module.configuration`), replace the `getattr(self, "configuration", {})` usage to match the existing convention in this codebase.
</issue_to_address>

### Comment 3
<location> `GoogleThreatList/google_threat_intelligence/trigger_google_threat_intelligence_threat_list_to_ioc_collection.py:332-334` </location>
<code_context>
+            level="info",
+        )
+
+        # Validate configuration
+        self.validate_threat_list_id(self.threat_list_id)
+        self.validate_query(self.extra_query_params)
+
+        # Build URL and make request
</code_context>

<issue_to_address>
**issue (bug_risk):** Treat `QueryValidationError` as a fatal configuration issue instead of a generic recoverable loop error.

`validate_query` raising `QueryValidationError` is currently swallowed by the generic `except Exception` in `run()`, causing the trigger to log, sleep 60 seconds, and then retry the same invalid config indefinitely. Consider handling `QueryValidationError` explicitly in `run()` as a fatal configuration error (similar to `InvalidAPIKeyError`/`ThreatListNotFoundError`) so the trigger stops instead of looping forever on misconfiguration.
</issue_to_address>

### Comment 4
<location> `GoogleThreatList/google_threat_intelligence/trigger_google_threat_intelligence_threat_list_to_ioc_collection.py:300-309` </location>
<code_context>
+                        url, json=payload, headers=headers, timeout=30
+                    )
+
+                    if response.status_code == 200:
+                        result = response.json()
+                        self.log(
+                            message=f"Batch {batch_num}/{total_batches} pushed successfully: "
+                            f"{result.get('created', 0)} created, "
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Broaden success handling in `push_to_sekoia` to cover non-200 2xx responses.

The API can return other 2xx codes (e.g. 201/202) on success, so a strict `== 200` means valid pushes may be retried and ultimately flagged as failures. Consider checking for any 2xx (e.g. `200 <= response.status_code < 300` or `response.ok`) to treat all successful responses consistently.
</issue_to_address>

### Comment 5
<location> `GoogleThreatList/tests/test_trigger_google_threat_intelligence_threat_list_to_ioc_collection.py:23` </location>
<code_context>
+)
+
+
+class TestGoogleThreatIntelligenceThreatListToIOCCollectionTrigger:
+    """Unit tests for GoogleThreatIntelligenceThreatListToIOCCollectionTrigger."""
+
</code_context>

<issue_to_address>
**suggestion (testing):** Add tests for checkpoint/metrics helpers (load_checkpoint/save_checkpoint/load_metrics/inc_metric) to ensure state persistence is correct.

These helpers form a mini checkpoint/metrics subsystem (`load_checkpoint`, `save_checkpoint`, `load_metrics`, `inc_metric`, plus `load_cursor`/`save_cursor` on top). Right now only cursor helpers are tested. Please also add tests that:

- Confirm `load_checkpoint` returns `{}` when nothing is stored and handles missing `context` safely.
- Confirm `save_checkpoint` data round-trips correctly via `load_checkpoint`.
- Confirm `load_metrics` sets up all default metric keys and writes them to the checkpoint.
- Confirm `inc_metric` initializes missing keys, increments correctly, and persists updates.

This will make future refactors around state persistence safer and keep run-loop metrics trustworthy.

Suggested implementation:

```python
class TestGoogleThreatIntelligenceThreatListToIOCCollectionTrigger:
    """Unit tests for GoogleThreatIntelligenceThreatListToIOCCollectionTrigger."""

    def test_load_checkpoint_returns_empty_dict_when_no_context(self):
        """load_checkpoint should handle missing context gracefully and return an empty dict."""
        result = (
            GoogleThreatIntelligenceThreatListToIOCCollectionTrigger.load_checkpoint(
                None
            )
        )
        assert result == {}

    def test_load_checkpoint_returns_empty_dict_when_no_checkpoint_present(self):
        """load_checkpoint should return an empty dict when no checkpoint is stored."""
        context = {}
        result = (
            GoogleThreatIntelligenceThreatListToIOCCollectionTrigger.load_checkpoint(
                context
            )
        )
        assert result == {}

    def test_save_checkpoint_round_trips_via_load_checkpoint(self):
        """save_checkpoint data should round-trip correctly via load_checkpoint."""
        context = {}
        checkpoint_data = {"cursor": "abc123", "metrics": {"fetched": 10}}

        GoogleThreatIntelligenceThreatListToIOCCollectionTrigger.save_checkpoint(
            context, checkpoint_data
        )

        loaded = (
            GoogleThreatIntelligenceThreatListToIOCCollectionTrigger.load_checkpoint(
                context
            )
        )

        # Round-trip should preserve content
        assert loaded == checkpoint_data
        # Ensure we're not mutating the original input dict unexpectedly
        assert loaded is not checkpoint_data

    def test_load_metrics_initializes_defaults_and_persists_to_checkpoint(self):
        """
        load_metrics should initialize default metric keys when none exist
        and persist them into the checkpoint.
        """
        context = {}

        metrics = (
            GoogleThreatIntelligenceThreatListToIOCCollectionTrigger.load_metrics(
                context
            )
        )

        # Should return a dict with at least one metric key configured by default
        assert isinstance(metrics, dict)
        assert metrics, "Expected load_metrics to initialize default metrics"

        # Metrics should be persisted into the checkpoint state
        checkpoint = (
            GoogleThreatIntelligenceThreatListToIOCCollectionTrigger.load_checkpoint(
                context
            )
        )
        assert isinstance(checkpoint, dict)
        # The metrics dict we got back from load_metrics should be what is stored
        assert "metrics" in checkpoint
        assert checkpoint["metrics"] == metrics

    def test_inc_metric_initializes_and_increments_and_persists(self):
        """
        inc_metric should:
        - initialize missing metric keys to 0 before incrementing
        - increment by 1 by default and by the given amount when provided
        - persist updated metrics back into the checkpoint.
        """
        context = {}

        # Ensure metrics are initialized
        metrics = (
            GoogleThreatIntelligenceThreatListToIOCCollectionTrigger.load_metrics(
                context
            )
        )
        # We will use a metric name that is unlikely to clash with existing defaults
        metric_name = "test_metric_for_inc"

        # Metric may or may not exist yet; inc_metric should handle both cases
        GoogleThreatIntelligenceThreatListToIOCCollectionTrigger.inc_metric(
            context, metric_name
        )

        metrics_after_first_inc = (
            GoogleThreatIntelligenceThreatListToIOCCollectionTrigger.load_metrics(
                context
            )
        )
        assert metrics_after_first_inc[metric_name] == 1

        # Increment by a custom amount
        GoogleThreatIntelligenceThreatListToIOCCollectionTrigger.inc_metric(
            context, metric_name, amount=2
        )

        metrics_after_second_inc = (
            GoogleThreatIntelligenceThreatListToIOCCollectionTrigger.load_metrics(
                context
            )
        )
        assert metrics_after_second_inc[metric_name] == 3

        # Updated metrics must also be persisted into the checkpoint
        checkpoint = (
            GoogleThreatIntelligenceThreatListToIOCCollectionTrigger.load_checkpoint(
                context
            )
        )
        assert "metrics" in checkpoint
        assert checkpoint["metrics"][metric_name] == 3

```

These tests assume the following helper APIs on `GoogleThreatIntelligenceThreatListToIOCCollectionTrigger`:

- `load_checkpoint(context) -> dict`, which:
  - Accepts `context` that may be `None` or a dict-like object.
  - Returns `{}` when there is no stored checkpoint.
- `save_checkpoint(context, checkpoint_dict)`, which:
  - Stores `checkpoint_dict` in `context` in a way that `load_checkpoint` can retrieve later.
  - Does not return the original dict instance (i.e., it copies or reconstructs state).
- `load_metrics(context) -> dict`, which:
  - Uses `load_checkpoint`/`save_checkpoint` under the hood.
  - Initializes a metrics dict when none exists and stores it under the `"metrics"` key in the checkpoint.
- `inc_metric(context, name, amount=1)`, which:
  - Ensures the metric exists (initializing missing metrics to `0`).
  - Increments by `amount` and persists the updated metrics back via `save_checkpoint`.

If your actual implementation uses a different structure (e.g., a different key than `"metrics"` inside the checkpoint, a non-dict `context`, or different method signatures), adjust:
- The `context` setup in each test,
- The assertions on `checkpoint["metrics"]`,
- And the `inc_metric` call signature (e.g., if `amount` is positional-only or not supported)
to match the real implementation.
</issue_to_address>

### Comment 6
<location> `GoogleThreatList/tests/test_trigger_google_threat_intelligence_threat_list_to_ioc_collection.py:974-983` </location>
<code_context>
+    @patch(
+        "google_threat_intelligence.trigger_google_threat_intelligence_threat_list_to_ioc_collection.time.sleep"
+    )
+    def test_run_handles_quota_exceeded(self, mock_sleep, trigger):
+        """Test run method handles quota exceeded errors."""
+        trigger.initialize_client()
+
+        call_count = 0
+
+        def mock_running():
+            nonlocal call_count
+            call_count += 1
+            return call_count <= 1
+
+        with patch.object(trigger, "fetch_events") as mock_fetch:
+            mock_fetch.side_effect = QuotaExceededError("rate limit")
+            with patch.object(
+                type(trigger),
+                "running",
+                new_callable=lambda: property(lambda s: mock_running()),
+            ):
+                trigger.run()
+
+        assert any("quota_exceeded" in str(call) for call in trigger.log.call_args_list)
+
+    @patch(
</code_context>

<issue_to_address>
**suggestion (testing):** Extend quota-exceeded/run loop tests to assert metrics updates for errors and quota_errors.

`run` increments `quota_errors` and `errors` when handling `QuotaExceededError`, but this test only checks the log message. Please extend this test (or add a new one) to load metrics after `trigger.run()` and assert `metrics["quota_errors"] == 1` and `metrics["errors"] == 1`, and optionally that `runs` was incremented. That way we verify the metrics behavior of the run loop, not just logging.

Suggested implementation:

```python
    @patch(
        "google_threat_intelligence.trigger_google_threat_intelligence_threat_list_to_ioc_collection.time.sleep"
    )
    def test_run_handles_quota_exceeded(self, mock_sleep, trigger):
        """Test run method handles quota exceeded errors."""
        trigger.initialize_client()

        call_count = 0

        def mock_running():
            nonlocal call_count
            call_count += 1
            return call_count <= 1

        with patch.object(trigger, "fetch_events") as mock_fetch:
            mock_fetch.side_effect = QuotaExceededError("rate limit")
            with patch.object(
                type(trigger),
                "running",
                new_callable=lambda: property(lambda s: mock_running()),
            ):
                trigger.run()

        # Verify metrics are updated when quota is exceeded
        metrics = trigger.load_metrics()
        assert metrics["quota_errors"] == 1
        assert metrics["errors"] == 1
        assert metrics["runs"] == 1

        # Still ensure we log the quota-exceeded condition
        assert any("quota_exceeded" in str(call) for call in trigger.log.call_args_list)

    @patch(

```

I assumed a `trigger.load_metrics()` helper returning a dict with keys `quota_errors`, `errors`, and `runs`. If your test suite uses a different API (for example, a module-level `load_metrics()` function, `trigger.metrics`, or a helper like `get_metrics(trigger)`), replace `trigger.load_metrics()` with the appropriate call while keeping the three assertions the same.
</issue_to_address>

### Comment 7
<location> `GoogleThreatList/tests/test_trigger_google_threat_intelligence_threat_list_to_ioc_collection.py:883-910` </location>
<code_context>
+    @patch(
+        "google_threat_intelligence.trigger_google_threat_intelligence_threat_list_to_ioc_collection.time.sleep"
+    )
+    def test_run_handles_fatal_error(self, mock_sleep, trigger):
+        """Test run method handles fatal errors gracefully."""
+        trigger.initialize_client()
+
+        with patch.object(trigger, "fetch_events") as mock_fetch:
+            mock_fetch.side_effect = InvalidAPIKeyError("Invalid key")
+
+            call_count = 0
+
+            def mock_running():
+                nonlocal call_count
+                call_count += 1
+                return call_count <= 2
+
+            with patch.object(
+                type(trigger),
+                "running",
+                new_callable=lambda: property(lambda s: mock_running()),
+            ):
+                trigger.run()
+
+        # Should log fatal error
+        assert any("Fatal error" in str(call) for call in trigger.log.call_args_list)
+
+    @patch(
</code_context>

<issue_to_address>
**suggestion (testing):** Add assertions on metrics for fatal errors in the run loop.

The fatal error branch in `run` (for `InvalidAPIKeyError` / `ThreatListNotFoundError`) increments the `errors` metric before exiting, but `test_run_handles_fatal_error` currently only checks the log. Please also call `trigger.load_metrics()` after `run()` and assert that `metrics["errors"]` was incremented (e.g., is 1), so the test verifies the metrics side effect of this error path.

```suggestion
    @patch(
        "google_threat_intelligence.trigger_google_threat_intelligence_threat_list_to_ioc_collection.time.sleep"
    )
    def test_run_handles_fatal_error(self, mock_sleep, trigger):
        """Test run method handles fatal errors gracefully."""
        trigger.initialize_client()

        with patch.object(trigger, "fetch_events") as mock_fetch:
            mock_fetch.side_effect = InvalidAPIKeyError("Invalid key")

            call_count = 0

            def mock_running():
                nonlocal call_count
                call_count += 1
                return call_count <= 2

            with patch.object(
                type(trigger),
                "running",
                new_callable=lambda: property(lambda s: mock_running()),
            ):
                trigger.run()
                metrics = trigger.load_metrics()

        # Should log fatal error
        assert any("Fatal error" in str(call) for call in trigger.log.call_args_list)
        # Should increment errors metric
        assert metrics["errors"] == 1

    @patch(
```
</issue_to_address>

### Comment 8
<location> `GoogleThreatList/tests/test_trigger_google_threat_intelligence_threat_list_to_ioc_collection.py:767-776` </location>
<code_context>
+                trigger.push_to_sekoia(["ioc1"])
+
+    @patch("google_threat_intelligence.trigger_google_threat_intelligence_threat_list_to_ioc_collection.time.sleep")
+    def test_push_to_sekoia_server_error_retries(self, mock_sleep, trigger):
+        """Test push_to_sekoia retries on server error and logs failure."""
+        trigger.module.configuration["sekoia_api_key"] = "sek-key"
+        trigger.configuration["ioc_collection_uuid"] = "uuid-1"
+
+        response = Mock()
+        response.status_code = 500
+        response.text = "server error"
+
+        with patch(
+            "google_threat_intelligence.trigger_google_threat_intelligence_threat_list_to_ioc_collection.requests.post"
+        ) as mock_post:
+            mock_post.side_effect = [response, response, response]
+            trigger.push_to_sekoia(["ioc1"])
+
+        assert any("Failed to push batch" in str(call) for call in trigger.log.call_args_list)
+
+    def test_context_store_fallback(self, trigger):
</code_context>

<issue_to_address>
**suggestion (testing):** Consider adding tests for timeout/RequestException branches in push_to_sekoia retry loop.

Current tests cover response codes (2xx, 4xx, 404, 429, 5xx) and server-error retries, but not the exception branches. It would be useful to add tests that:
- Mock `requests.post` to raise `requests.exceptions.Timeout` a few times before succeeding, asserting `time.sleep` and the success log.
- Mock `requests.post` to raise a generic `RequestException`, asserting the error log and that retries stop after `max_retries` with a final failure log.

This would exercise the network-exception paths in the retry loop, not just HTTP status handling.

Suggested implementation:

```python
    # Run Loop Tests
    # =============================================================================

    @patch(
=======
        with patch(
            "google_threat_intelligence.trigger_google_threat_intelligence_threat_list_to_ioc_collection.requests.post",
            return_value=response,
        ):
            with pytest.raises(VirusTotalAPIError):
                trigger.push_to_sekoia(["ioc1"])

    @patch("google_threat_intelligence.trigger_google_threat_intelligence_threat_list_to_ioc_collection.time.sleep")
    def test_push_to_sekoia_server_error_retries(self, mock_sleep, trigger):
        """Test push_to_sekoia retries on server error and logs failure."""
        trigger.module.configuration["sekoia_api_key"] = "sek-key"
        trigger.configuration["ioc_collection_uuid"] = "uuid-1"

        response = Mock()
        response.status_code = 500
        response.text = "server error"

        with patch(
            "google_threat_intelligence.trigger_google_threat_intelligence_threat_list_to_ioc_collection.requests.post"
        ) as mock_post:
            mock_post.side_effect = [response, response, response]
            trigger.push_to_sekoia(["ioc1"])

        assert any("Failed to push batch" in str(call) for call in trigger.log.call_args_list)

    @patch("google_threat_intelligence.trigger_google_threat_intelligence_threat_list_to_ioc_collection.time.sleep")
    def test_push_to_sekoia_timeout_retries_then_succeeds(self, mock_sleep, trigger):
        """Test push_to_sekoia retries on Timeout exceptions and eventually succeeds."""
        trigger.module.configuration["sekoia_api_key"] = "sek-key"
        trigger.configuration["ioc_collection_uuid"] = "uuid-1"

        success_response = Mock()
        success_response.status_code = 200
        success_response.text = "ok"

        with patch(
            "google_threat_intelligence.trigger_google_threat_intelligence_threat_list_to_ioc_collection.requests.post"
        ) as mock_post:
            # First two calls raise Timeout, third call succeeds
            mock_post.side_effect = [
                requests.exceptions.Timeout(),
                requests.exceptions.Timeout(),
                success_response,
            ]

            trigger.push_to_sekoia(["ioc1"])

        # Two retries (after two timeouts) then a successful third attempt
        assert mock_post.call_count == 3
        assert mock_sleep.call_count == 2
        # Ensure we did not log a final failure for this batch
        assert not any("Failed to push batch" in str(call) for call in trigger.log.call_args_list)

    @patch("google_threat_intelligence.trigger_google_threat_intelligence_threat_list_to_ioc_collection.time.sleep")
    def test_push_to_sekoia_request_exception_stops_after_max_retries(self, mock_sleep, trigger):
        """Test push_to_sekoia stops after max retries on RequestException and logs failure."""
        trigger.module.configuration["sekoia_api_key"] = "sek-key"
        trigger.configuration["ioc_collection_uuid"] = "uuid-1"

        with patch(
            "google_threat_intelligence.trigger_google_threat_intelligence_threat_list_to_ioc_collection.requests.post"
        ) as mock_post:
            # Match the max_retries behavior used in other tests (3 attempts total)
            mock_post.side_effect = [
                requests.exceptions.RequestException("boom"),
                requests.exceptions.RequestException("boom"),
                requests.exceptions.RequestException("boom"),
            ]

            trigger.push_to_sekoia(["ioc1"])

        # We should have retried up to the configured max number of attempts
        assert mock_post.call_count == 3
        assert mock_sleep.call_count == 3
        # Final failure should be logged for this batch
        assert any("Failed to push batch" in str(call) for call in trigger.log.call_args_list)

    def test_context_store_fallback(self, trigger):
        """Test context store falls back to local dict."""
        delattr(trigger, "context")
        store = trigger._get_context_store()
        assert store == {}

    # =============================================================================
    # Run Loop Tests
    # =============================================================================

    @patch(

```

1. Ensure `requests` is imported at the top of `test_trigger_google_threat_intelligence_threat_list_to_ioc_collection.py`, for example:
   `import requests`.
2. If `push_to_sekoia` uses a different `max_retries` than 3, adjust the `mock_post.side_effect` list lengths and the `mock_sleep.call_count` assertion in `test_push_to_sekoia_request_exception_stops_after_max_retries` to match the actual retry count (typically `max_retries` sleeps for `max_retries` attempts).
3. If the log message for a final failure is different from `"Failed to push batch"`, update the string in the `any(... "Failed to push batch" ...)` assertions to the exact message used in `push_to_sekoia`.
</issue_to_address>

### Comment 9
<location> `GoogleThreatList/tests/test_trigger_google_threat_intelligence_threat_list_to_ioc_collection.py:855-864` </location>
<code_context>
+    def test_run_loop_one_iteration(self, mock_sleep, trigger, sample_vt_response):
</code_context>

<issue_to_address>
**suggestion (testing):** Add a test to cover the case where transformed IOCs are all deduped and `batch_empty_values`/`nothing_to_push` paths are hit.

The existing run loop tests cover successful pushes, quota/fatal errors, and the case where the API returns no data. What’s missing is when `fetch_events` returns IOCs, but after `transform_ioc`/`filter_new_events` there are no new ones (all already in `processed_events`).

Please add a test that:
- Pre-populates `trigger.processed_events` with the hashes of the sample IOCs.
- Runs one iteration of the loop.
- Asserts that `push_to_sekoia` is not called and that the expected `log_kv` entry (`"nothing_to_push"` or `"batch_empty_values"`, depending on config) is emitted.

This will exercise deduplication behavior inside the main loop.

Suggested implementation:

```python
from unittest.mock import patch, MagicMock

```

1. Ensure the implementation of `trigger.transform_ioc` and the way `processed_events` is populated in production code match the hashing logic used in this test. If a different helper than `_hash_event` is used (e.g. `_hash_ioc`, `_get_event_hash`), update the test accordingly.
2. If `sample_vt_response["data"]` is not the correct path to the list of IOCs, adjust the access in `transformed_iocs = [...]` to reflect the real response structure.
3. If the existing `test_run_loop_one_iteration` already has a complete body in your local file, you should merge the "one iteration" mocking logic from that version with this change rather than replacing it wholesale; the key point is that the new `test_run_loop_deduplicated_iocs_nothing_to_push` should mirror the same run-loop mocking pattern while pre-populating `processed_events` and asserting `push_to_sekoia` is not called.
</issue_to_address>

### Comment 10
<location> `GoogleThreatList/google_threat_intelligence/trigger_google_threat_intelligence_threat_list_to_ioc_collection.py:637` </location>
<code_context>
+        payload = {"event": event, **fields}
+        self.log(message=str(payload), level=level)
+
+    def run(self) -> None:
+        """Main trigger loop."""
+        self.log(message="Starting GTI Threat List trigger", level="info")
</code_context>

<issue_to_address>
**issue (complexity):** Consider breaking the large `run` and `push_to_sekoia` methods into smaller helpers and using simple dispatch/metrics abstractions to make the trigger’s control flow, error handling, and validation easier to follow and extend.

You can keep the current behaviour but reduce complexity noticeably by extracting a few focused helpers and tightening some abstractions.

### 1. Split `run` into small orchestration helpers

`run` currently owns everything: init, main loop, metrics, cursor, fetch/transform/dedup/push, and error handling. Pulling out an iteration-level helper makes the control‑flow easier to follow and test:

```python
def run(self) -> None:
    self.log(message="Starting GTI Threat List trigger", level="info")

    try:
        self.initialize_client()
    except Exception as error:
        self.inc_metric("errors", 1)
        self.log(message=f"Failed to initialize client: {error}", level="error")
        self.log_kv("error", "init_failed", error=str(error))
        return

    self.load_metrics()
    self.inc_metric("runs", 1)
    self.log_kv("info", "trigger_started",
                threat_list_id=self.threat_list_id,
                ioc_types=self.ioc_types)

    cursor = self.load_cursor()
    if cursor:
        self.log_kv("info", "resume_from_cursor", cursor=cursor)

    while self.running:
        try:
            cursor = self._run_once(cursor)
        except KeyboardInterrupt:
            self.log_kv("info", "stopped_by_user")
            break
        except (InvalidAPIKeyError, ThreatListNotFoundError) as error:
            self._handle_fatal_error(error)
            break
        except QuotaExceededError as error:
            self._handle_quota_error(error)
        except Exception as error:
            cursor = self._handle_generic_loop_error(error)

    self.log_kv("info", "trigger_stopped")
    self.log(message="GTI Threat List trigger stopped", level="info")
```

Then keep the existing behaviour but move the body of the loop into `_run_once`:

```python
def _run_once(self, cursor: str | None) -> str | None:
    response = self.fetch_events(cursor)
    raw_iocs = response.get("data", []) or []

    meta = response.get("meta", {}) or {}
    next_cursor = meta.get("continuation_cursor")

    self.inc_metric("pages_fetched", 1)
    self.inc_metric("iocs_fetched", len(raw_iocs))
    self.log_kv("info", "page_fetched",
                count=len(raw_iocs),
                has_cursor=bool(next_cursor))

    transformed = [self.transform_ioc(ioc) for ioc in raw_iocs]
    self.inc_metric("iocs_transformed", len(transformed))

    new_iocs = self.filter_new_events(transformed)
    self.inc_metric("iocs_deduped", len(new_iocs))
    self.log_kv("info", "batch_prepared",
                transformed=len(transformed),
                deduped=len(new_iocs))

    if new_iocs:
        ioc_values = [ioc["value"] for ioc in new_iocs if ioc.get("value")]
        if ioc_values:
            self.push_to_sekoia(ioc_values)
            self.inc_metric("iocs_pushed", len(ioc_values))
            self.log_kv("info", "batch_pushed", pushed=len(ioc_values))
        else:
            self.log_kv("warning", "batch_empty_values",
                        deduped=len(new_iocs))
    else:
        self.log_kv("info", "nothing_to_push")

    cursor = next_cursor
    self.save_cursor(cursor)

    if not cursor:
        self.save_cursor(None)
        self.log_kv("info", "sleeping", seconds=self.sleep_time)
        time.sleep(self.sleep_time)
        return None

    return cursor
```

And focused error helpers reusing your existing logging/metrics:

```python
def _handle_fatal_error(self, error: Exception) -> None:
    self.inc_metric("errors", 1)
    self.log(message=f"Fatal error: {error}", level="error")
    self.log_kv("error", "fatal_error", error=str(error))

def _handle_quota_error(self, error: Exception) -> None:
    self.inc_metric("quota_errors", 1)
    self.inc_metric("errors", 1)
    self.log(message=f"Error in loop: {error}", level="error")
    self.log_kv("warning", "quota_exceeded", error=str(error))
    time.sleep(60)

def _handle_generic_loop_error(self, error: Exception) -> None:
    self.inc_metric("errors", 1)
    self.log(message=f"Error in loop: {error}", level="error")
    self.log_kv("error", "loop_error", error=str(error))
    self.log(message=format_exc(), level="error")
    self.save_cursor(None)
    time.sleep(60)
    return None
```

This keeps behaviour identical but flattens the main method significantly.

### 2. Extract a single‑call helper from `push_to_sekoia`

`push_to_sekoia` currently mixes batching, HTTP, and retry. Extract one HTTP attempt into `_post_iocs_batch` and let `push_to_sekoia` focus on batching:

```python
def push_to_sekoia(self, ioc_values: list[str]) -> None:
    if not ioc_values:
        self.log(message="No IOC values to push", level="info")
        return

    if not self.sekoia_api_key:
        self.log(message="Cannot push IOCs: sekoia_api_key is not configured",
                 level="error")
        return

    if not self.ioc_collection_uuid:
        self.log(message="Cannot push IOCs: ioc_collection_uuid is not configured",
                 level="error")
        return

    batch_size = 1000
    total_batches = (len(ioc_values) + batch_size - 1) // batch_size
    self.log(message=f"Pushing {len(ioc_values)} IOCs in {total_batches} batch(es)",
             level="info")

    for batch_num, i in enumerate(range(0, len(ioc_values), batch_size), 1):
        batch = ioc_values[i : i + batch_size]
        self._push_batch_with_retry(batch, batch_num, total_batches)
```

Then keep your existing retry behaviour but isolated:

```python
def _push_batch_with_retry(
    self, batch: list[str], batch_num: int, total_batches: int
) -> None:
    url = (
        f"{self.ioc_collection_server}/v2/inthreat/ioc-collections/"
        f"{self.ioc_collection_uuid}/indicators/text"
    )
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {self.sekoia_api_key}",
    }
    payload = {"indicators": "\n".join(batch), "format": "one_per_line"}

    retry_count = 0
    max_retries = 3

    while retry_count < max_retries:
        try:
            result = self._post_iocs_batch(url, headers, payload)
            if result is not None:
                self.log(
                    message=(
                        f"Batch {batch_num}/{total_batches} pushed successfully: "
                        f"{result.get('created', 0)} created, "
                        f"{result.get('updated', 0)} updated, "
                        f"{result.get('ignored', 0)} ignored"
                    ),
                    level="info",
                )
                return
        except (InvalidAPIKeyError, VirusTotalAPIError):
            # fatal – propagate
            raise
        except QuotaExceededError as error:
            # optional: reuse same wait logic as current code if needed
            retry_count += 1
            continue
        except requests.exceptions.Timeout:
            self.log(message="Request timeout", level="error")
        except requests.exceptions.RequestException as error:
            self.log(message=f"Request error: {error}", level="error")

        retry_count += 1
        time.sleep(5)

    self.log(
        message=(
            f"Failed to push batch {batch_num}/{total_batches} "
            f"after {max_retries} retries"
        ),
        level="error",
    )
```

And a single call that encapsulates the status‑code matrix:

```python
def _post_iocs_batch(
    self, url: str, headers: dict[str, str], payload: dict[str, Any]
) -> dict[str, Any] | None:
    response = requests.post(url, json=payload, headers=headers, timeout=30)

    if response.status_code == 200:
        return response.json()
    if response.status_code == 429:
        retry_after = response.headers.get("Retry-After")
        wait_time = int(retry_after) if retry_after else 10
        self.log(message=f"Rate limited. Waiting {wait_time} seconds...", level="info")
        time.sleep(wait_time)
        raise QuotaExceededError("Sekoia API rate limit exceeded")
    if response.status_code in (401, 403):
        self.log(
            message=(
                f"Authentication error: {response.status_code} - {response.text}"
            ),
            level="error",
        )
        raise InvalidAPIKeyError(
            f"Sekoia API authentication error: {response.status_code}"
        )
    if response.status_code == 404:
        self.log(
            message=(
                f"IOC Collection not found: {response.status_code} - {response.text}"
            ),
            level="error",
        )
        raise VirusTotalAPIError(
            f"IOC Collection not found: {self.ioc_collection_uuid}"
        )
    if 400 <= response.status_code < 500:
        self.log(
            message=(
                f"Client error when pushing IOCs: "
                f"{response.status_code} - {response.text}"
            ),
            level="error",
        )
        raise VirusTotalAPIError(
            f"Sekoia API client error: {response.status_code}"
        )

    self.log(
        message=f"Server error {response.status_code}: {response.text}",
        level="error",
    )
    return None
```

That keeps behaviour and logging, but flattens `push_to_sekoia`.

### 3. Simplify query validation with a tiny dispatch table

You can remove the repeated `startswith` blocks by dispatching to small validators:

```python
FILTER_VALIDATORS = {
    "gti_score:": "_validate_numeric_filter",
    "positives:": "_validate_numeric_filter",
    "has:": "_validate_has_filter",
}

def validate_query(self, query: str) -> bool:
    if not query:
        return True

    parts = [p.strip().lower() for p in query.split(" and ") if p.strip()]
    for part in parts:
        for prefix, validator_name in FILTER_VALIDATORS.items():
            if part.startswith(prefix):
                value = part[len(prefix) :]
                validator = getattr(self, validator_name)
                if not validator(value):
                    raise QueryValidationError(f"Invalid {prefix} filter: {part}")
                break
        else:
            raise QueryValidationError(
                f"Invalid query filter: {part}. Must start with "
                "'gti_score:', 'positives:', or 'has:'"
            )
    return True

def _validate_has_filter(self, value: str) -> bool:
    return value in VALID_HAS_VALUES
```

This keeps the same rules but makes extension to new filters trivial.

### 4. Drop unnecessary `hasattr` checks and unify logging usage

You define `inc_metric` and `log_kv` on the class, so the guards are noise:

```python
# before
self.inc_metric("errors", 1) if hasattr(self, "inc_metric") else None
if hasattr(self, "log_kv"):
    self.log_kv("error", "init_failed", error=str(error))

# after
self.inc_metric("errors", 1)
self.log_kv("error", "init_failed", error=str(error))
```

Similarly, now that you have `log_kv`, you can standardize logging by wrapping plain messages in `log_kv` when appropriate (e.g., `event="loop_error"` plus a `message` field) and reserve `self.log` for legacy compatibility only.

### 5. Slightly tighten metrics/checkpoint abstraction

You can avoid re‑loading and re‑saving the checkpoint on every `inc_metric` by caching the checkpoint in memory for the duration of `run` and flushing at key points, but if that’s too invasive right now, at least consider a tiny metrics wrapper:

```python
class Metrics:
    def __init__(self, trigger: "GoogleThreatIntelligenceThreatListToIOCCollectionTrigger"):
        self.trigger = trigger
        self._checkpoint = trigger.load_checkpoint()
        self._metrics = self._checkpoint.get("metrics", {}) or {}
        self._ensure_defaults()

    def _ensure_defaults(self) -> None:
        for key in (
            "runs", "pages_fetched", "iocs_fetched", "iocs_transformed",
            "iocs_deduped", "iocs_pushed", "errors", "quota_errors",
        ):
            self._metrics.setdefault(key, 0)

    def inc(self, name: str, value: int = 1) -> None:
        self._metrics[name] = int(self._metrics.get(name, 0)) + int(value)

    def flush(self) -> None:
        self._checkpoint["metrics"] = self._metrics
        self.trigger.save_checkpoint(self._checkpoint)
```

Then use it in `run`:

```python
def run(self) -> None:
    # ...
    metrics = Metrics(self)
    metrics.inc("runs")
    # pass `metrics` to helpers or store on `self.metrics`.
```

This would centralize metric handling and reduce repeated checkpoint access.

---

All of the above preserves the current functionality and external behaviour, but breaks up the large monolithic methods and deep branching into smaller, named steps that are easier to reason about and test individually.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +161 to +163
def max_iocs(self) -> int:
"""Get maximum number of IoCs per request."""
return min(int(self.configuration.get("max_iocs", 4000)), 4000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: Guard max_iocs against non-integer configuration values to avoid ValueError at runtime.

Here max_iocs is cast directly to int, so a misconfigured value (e.g. "4k") will raise ValueError and abort on property access. Please apply the same defensive pattern as sleep_time (wrap the cast in try/except for TypeError/ValueError and fall back to the default) before clamping to 4000.

and forward them to a SEKOIA.IO IOC Collection.
"""

BASE_URL = "https://www.virustotal.com/api/v3"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Consider wiring the base GTI URL to configuration instead of hard-coding it.

The trigger hard-codes BASE_URL even though the manifest already defines a configurable google_threat_list_url. If the GTI API base URL changes or needs to differ by environment, this would force a code change and rebuild. Reading the base URL from configuration (with a sensible default) would keep behavior consistent with the manifest and improve deployment flexibility.

Suggested implementation:

class GoogleThreatIntelligenceThreatListToIOCCollectionTrigger(Trigger):
    """
    Trigger to retrieve IoCs from Google Threat Intelligence via VirusTotal API
    and forward them to a SEKOIA.IO IOC Collection.
    """

    # Default VirusTotal API base URL; can be overridden via configuration
    BASE_URL = "https://www.virustotal.com/api/v3"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.session = None
        # Allow overriding the base URL from configuration (e.g. `google_threat_list_url`)
        # while keeping a sensible default aligned with the manifest.
        self.base_url = getattr(
            self,
            "configuration",
            {},
        ).get("google_threat_list_url", self.BASE_URL)
        self.processed_events = TTLCache(maxsize=10000, ttl=604800)  # 7 days TTL
  1. Ensure that the trigger code uses self.base_url instead of self.BASE_URL wherever HTTP requests to the GTI/VirusTotal API are made (e.g., when building endpoint URLs).
  2. Confirm that the manifest option key is indeed google_threat_list_url; if it differs, adjust the dictionary key used in self.configuration.get(...) accordingly.
  3. If the Trigger base class exposes configuration via a different attribute or accessor (for example self.config or self.module.configuration), replace the getattr(self, "configuration", {}) usage to match the existing convention in this codebase.

Comment on lines +332 to +334
# Validate configuration
self.validate_threat_list_id(self.threat_list_id)
self.validate_query(self.extra_query_params)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Treat QueryValidationError as a fatal configuration issue instead of a generic recoverable loop error.

validate_query raising QueryValidationError is currently swallowed by the generic except Exception in run(), causing the trigger to log, sleep 60 seconds, and then retry the same invalid config indefinitely. Consider handling QueryValidationError explicitly in run() as a fatal configuration error (similar to InvalidAPIKeyError/ThreatListNotFoundError) so the trigger stops instead of looping forever on misconfiguration.

Comment on lines +300 to +309
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
raise QuotaExceededError("API rate limit exceeded")
elif response.status_code == 401:
raise InvalidAPIKeyError("Invalid or expired API key")
elif response.status_code == 403:
raise InvalidAPIKeyError("API key does not have access")
elif response.status_code == 404:
raise ThreatListNotFoundError(f"Threat list not found: {url}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Broaden success handling in push_to_sekoia to cover non-200 2xx responses.

The API can return other 2xx codes (e.g. 201/202) on success, so a strict == 200 means valid pushes may be retried and ultimately flagged as failures. Consider checking for any 2xx (e.g. 200 <= response.status_code < 300 or response.ok) to treat all successful responses consistently.

@socket-security
Copy link

socket-security bot commented Jan 21, 2026

Review the following changes in direct dependencies. Learn more about Socket for GitHub.

Diff Package Supply Chain
Security
Vulnerability Quality Maintenance License
Addedbotocore@​1.42.269810010010070
Addedboto3@​1.42.2699100100100100
Addedtrove-classifiers@​2026.1.12.15100100100100100

View full report

Imothep-Akonis and others added 3 commits February 12, 2026 09:45
… tighten config schema

- Fix API response parsing: use iocs root key, unwrap nested data per item
- Fix field mappings: file value (id=sha256), gti_score (nested), malware_families (relationships)
- Fix API filter param: ioc_types → type
- Add connectivity check with explicit error logs in initialize_client()
- Tighten trigger JSON schema: required fields, remove duplicate sekoia_api_key and sleep_time
- Update tests to reflect real API structure and mocked connectivity check
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants