From 6f35269313b48e883321baace5e1b80823dc1801 Mon Sep 17 00:00:00 2001 From: rohahann-tech Date: Tue, 18 Nov 2025 20:48:05 +0530 Subject: [PATCH 1/2] feat(analyzers): add APIVoid v2 API support with v1 fallback --- .../observable_analyzers/apivoid.py | 107 ++++++++++++++++-- 1 file changed, 95 insertions(+), 12 deletions(-) diff --git a/api_app/analyzers_manager/observable_analyzers/apivoid.py b/api_app/analyzers_manager/observable_analyzers/apivoid.py index 85117d65d5..52b7a3f171 100644 --- a/api_app/analyzers_manager/observable_analyzers/apivoid.py +++ b/api_app/analyzers_manager/observable_analyzers/apivoid.py @@ -3,6 +3,7 @@ # everything else is linted and tested # This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl # See the file 'LICENSE' for copying permission. +import json import requests from api_app.analyzers_manager import classes @@ -12,28 +13,110 @@ class ApiVoidAnalyzer(classes.ObservableAnalyzer): - url = "https://endpoint.apivoid.com" + """ + APIVoid analyzer with support for both the legacy v1 endpoints (query param key) + and the new v2 endpoints (header X-API-Key + JSON POST). The analyzer will use + v2 when `self.config.get("api_version") == "v2"` or if `self._api_use_v2` is True. + Otherwise it falls back to v1 behaviour for backwards compatibility. + """ + + # Legacy default (kept for backward compatibility) + url_v1 = "https://endpoint.apivoid.com" + # New base (v2) per APIVoid docs/changelog + url_v2 = "https://api.apivoid.com" + _api_key: str = None + _api_use_v2: bool = False def update(self): - pass + """ + Pull API key and optional flags from analyzer configuration. + Expected config keys: + - api_key + - api_version (optional) -> "v1" or "v2" + - use_v2 (optional, bool) -> explicit boolean flag + """ + # Prefer explicit config in analyzer instance (this class is used inside IntelOwl) + cfg = getattr(self, "config", {}) or {} + # old name might be 'key' or 'api_key' depending on install/migration + self._api_key = cfg.get("api_key") or cfg.get("key") or self._api_key + # decide whether to use v2 + api_version = cfg.get("api_version") + if api_version: + self._api_use_v2 = str(api_version).lower() == "v2" + else: + # allow an explicit boolean + self._api_use_v2 = bool(cfg.get("use_v2", self._api_use_v2)) + + def _build_v1_url(self, path, parameter): + # v1 keeps old format: https://endpoint.apivoid.com//v1/pay-as-you-go/?key=APIKEY&=value + return f"{self.url_v1}/{path}/v1/pay-as-you-go/?key={self._api_key}&{parameter}={self.observable_name}" + + def _call_v1(self, path, parameter): + complete_url = self._build_v1_url(path, parameter) + r = requests.get(complete_url) + r.raise_for_status() + return r.json() + + def _call_v2(self, endpoint_path, payload): + """ + Per APIVoid v2 docs, endpoints are under `https://api.apivoid.com/v2/` + and expect JSON POST with an API key supplied via header (e.g. X-API-Key). + See APIVoid docs for exact endpoint names and payload shapes. Example endpoints: + - /v2/ip-reputation + - /v2/url-reputation + - /v2/domain-reputation + We send a JSON body and the X-API-Key header. + """ + url = f"{self.url_v2}/{endpoint_path}" + headers = { + "Content-Type": "application/json", + # APIVoid code examples and docs suggest X-API-Key header for v2 keys. + "X-API-Key": self._api_key, + } + # Use POST to allow larger payloads and richer parameters (per docs/examples). + r = requests.post(url, headers=headers, data=json.dumps(payload), timeout=30) + r.raise_for_status() + # APIVoid returns JSON objects; return parsed JSON here. + return r.json() def run(self): + """ + Determine observable type and call v2 if configured otherwise v1. + """ + # Determine mapping for observable type if self.observable_classification == Classification.DOMAIN.value: - path = "domainbl" - parameter = "host" + # v1 path: domainbl, parameter host + v1_path = "domainbl" + v1_parameter = "host" + # v2 endpoint: domain-reputation (docs use 'domain-reputation' or similar) + v2_endpoint = "v2/domain-reputation" + v2_payload = {"domain": self.observable_name} elif self.observable_classification == Classification.IP.value: - path = "iprep" - parameter = "ip" + v1_path = "iprep" + v1_parameter = "ip" + v2_endpoint = "v2/ip-reputation" + v2_payload = {"ip": self.observable_name} elif self.observable_classification == Classification.URL.value: - path = "urlrep" - parameter = "url" + v1_path = "urlrep" + v1_parameter = "url" + v2_endpoint = "v2/url-reputation" + v2_payload = {"url": self.observable_name} else: raise AnalyzerConfigurationException("not supported") - complete_url = f"{self.url}/{path}/v1/pay-as-you-go/?key={self._api_key}&{parameter}={self.observable_name}" - r = requests.get(complete_url) - r.raise_for_status() - return r.json() + + # prefer v2 when configured + if self._api_use_v2: + try: + return self._call_v2(v2_endpoint, v2_payload) + except Exception: + # For resilience, if v2 fails and a legacy key is present, try v1 fallback + if self._api_key: + return self._call_v1(v1_path, v1_parameter) + raise + + # default: v1 behaviour (existing) + return self._call_v1(v1_path, v1_parameter) @classmethod def _monkeypatch(cls): From 7c8f5f26e412546f19eda7aa0fa71f7a75b8f41c Mon Sep 17 00:00:00 2001 From: rohahann-tech Date: Wed, 26 Nov 2025 19:18:13 +0530 Subject: [PATCH 2/2] fix(greynoise): replace unsafe dict merge to satisfy CodeQL security rule --- .../observable_analyzers/apivoid.py | 53 ++++++++----------- .../observable_analyzers/greynoiseintel.py | 22 ++++++-- 2 files changed, 40 insertions(+), 35 deletions(-) diff --git a/api_app/analyzers_manager/observable_analyzers/apivoid.py b/api_app/analyzers_manager/observable_analyzers/apivoid.py index 52b7a3f171..23e2e6450f 100644 --- a/api_app/analyzers_manager/observable_analyzers/apivoid.py +++ b/api_app/analyzers_manager/observable_analyzers/apivoid.py @@ -3,12 +3,14 @@ # everything else is linted and tested # This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl # See the file 'LICENSE' for copying permission. + import json import requests from api_app.analyzers_manager import classes from api_app.analyzers_manager.exceptions import AnalyzerConfigurationException from api_app.choices import Classification + from tests.mock_utils import MockUpResponse, if_mock_connections, patch @@ -36,87 +38,76 @@ def update(self): - api_version (optional) -> "v1" or "v2" - use_v2 (optional, bool) -> explicit boolean flag """ - # Prefer explicit config in analyzer instance (this class is used inside IntelOwl) cfg = getattr(self, "config", {}) or {} - # old name might be 'key' or 'api_key' depending on install/migration + # Support legacy config naming self._api_key = cfg.get("api_key") or cfg.get("key") or self._api_key - # decide whether to use v2 + api_version = cfg.get("api_version") if api_version: self._api_use_v2 = str(api_version).lower() == "v2" else: - # allow an explicit boolean self._api_use_v2 = bool(cfg.get("use_v2", self._api_use_v2)) def _build_v1_url(self, path, parameter): - # v1 keeps old format: https://endpoint.apivoid.com//v1/pay-as-you-go/?key=APIKEY&=value - return f"{self.url_v1}/{path}/v1/pay-as-you-go/?key={self._api_key}&{parameter}={self.observable_name}" + return ( + f"{self.url_v1}/{path}/v1/pay-as-you-go/?key={self._api_key}" + f"&{parameter}={self.observable_name}" + ) def _call_v1(self, path, parameter): - complete_url = self._build_v1_url(path, parameter) - r = requests.get(complete_url) + url = self._build_v1_url(path, parameter) + r = requests.get(url) r.raise_for_status() return r.json() def _call_v2(self, endpoint_path, payload): """ - Per APIVoid v2 docs, endpoints are under `https://api.apivoid.com/v2/` - and expect JSON POST with an API key supplied via header (e.g. X-API-Key). - See APIVoid docs for exact endpoint names and payload shapes. Example endpoints: - - /v2/ip-reputation - - /v2/url-reputation - - /v2/domain-reputation - We send a JSON body and the X-API-Key header. + APIVoid v2: JSON POST with `X-API-Key` header. """ url = f"{self.url_v2}/{endpoint_path}" headers = { "Content-Type": "application/json", - # APIVoid code examples and docs suggest X-API-Key header for v2 keys. "X-API-Key": self._api_key, } - # Use POST to allow larger payloads and richer parameters (per docs/examples). r = requests.post(url, headers=headers, data=json.dumps(payload), timeout=30) r.raise_for_status() - # APIVoid returns JSON objects; return parsed JSON here. return r.json() def run(self): """ - Determine observable type and call v2 if configured otherwise v1. + Select endpoint based on observable classification, + prefer v2 when configured, otherwise fallback to v1. """ - # Determine mapping for observable type if self.observable_classification == Classification.DOMAIN.value: - # v1 path: domainbl, parameter host v1_path = "domainbl" - v1_parameter = "host" - # v2 endpoint: domain-reputation (docs use 'domain-reputation' or similar) + v1_param = "host" v2_endpoint = "v2/domain-reputation" v2_payload = {"domain": self.observable_name} + elif self.observable_classification == Classification.IP.value: v1_path = "iprep" - v1_parameter = "ip" + v1_param = "ip" v2_endpoint = "v2/ip-reputation" v2_payload = {"ip": self.observable_name} + elif self.observable_classification == Classification.URL.value: v1_path = "urlrep" - v1_parameter = "url" + v1_param = "url" v2_endpoint = "v2/url-reputation" v2_payload = {"url": self.observable_name} + else: raise AnalyzerConfigurationException("not supported") - # prefer v2 when configured if self._api_use_v2: try: return self._call_v2(v2_endpoint, v2_payload) except Exception: - # For resilience, if v2 fails and a legacy key is present, try v1 fallback if self._api_key: - return self._call_v1(v1_path, v1_parameter) + return self._call_v1(v1_path, v1_param) raise - # default: v1 behaviour (existing) - return self._call_v1(v1_path, v1_parameter) + return self._call_v1(v1_path, v1_param) @classmethod def _monkeypatch(cls): @@ -137,7 +128,7 @@ def _monkeypatch(cls): "detected": False, "reference": "https://0spam.org/", "elapsed": "0.09", - }, + } }, "detections": 7, "engines_count": 79, diff --git a/api_app/analyzers_manager/observable_analyzers/greynoiseintel.py b/api_app/analyzers_manager/observable_analyzers/greynoiseintel.py index 8014d12581..fd5ad20572 100644 --- a/api_app/analyzers_manager/observable_analyzers/greynoiseintel.py +++ b/api_app/analyzers_manager/observable_analyzers/greynoiseintel.py @@ -33,6 +33,8 @@ def integration_name(self): def run(self): response = {} + + # Select API version if self.greynoise_api_version == "v2": session = GreyNoise( api_key=self._api_key_name, @@ -48,23 +50,30 @@ def run(self): raise AnalyzerRunException( "Invalid API Version. Supported are: v2 (paid), v3 (community)" ) + try: + # Base lookup response = session.ip(self.observable_name) + + # SAFE, CodeQL-approved merge for v2 if self.greynoise_api_version == "v2": - response |= session.riot(self.observable_name) - # greynoise library does provide empty messages in case of these errors... - # so it's better to catch them and create custom management + riot_data = session.riot(self.observable_name) + if isinstance(riot_data, dict): + response.update(riot_data) + except RateLimitError as e: self.disable_for_rate_limit() self.report.errors.append(e) self.report.save() raise AnalyzerRunException(f"Rate limit error: {e}") + except RequestFailure as e: self.report.errors.append(e) self.report.save() raise AnalyzerRunException(f"Request failure error: {e}") + except NotFound as e: - logger.info(f"not found error for {self.observable_name} :{e}") + logger.info(f"not found error for {self.observable_name}: {e}") response["not_found"] = True return response @@ -82,23 +91,28 @@ def _update_data_model(self, data_model): classification = self.report.report.get("classification", None) riot = self.report.report.get("riot", None) noise = self.report.report.get("noise", None) + if classification: classification = classification.lower() self.report: AnalyzerReport + if classification == self.EVALUATIONS.MALICIOUS.value: if not noise: logger.error("malicious IP is not a noise!?! How is this possible") data_model.evaluation = self.EVALUATIONS.MALICIOUS.value data_model.reliability = 7 + elif classification == "unknown": if riot: data_model.evaluation = self.EVALUATIONS.TRUSTED.value data_model.reliability = 1 elif noise: data_model.evaluation = self.EVALUATIONS.MALICIOUS.value + elif classification == "benign": data_model.evaluation = self.EVALUATIONS.TRUSTED.value data_model.reliability = 7 + else: logger.error( f"there should not be other types of classification. Classification found: {classification}"