feat: Add Anozrway connector#2025
Draft
helkabouss wants to merge 21 commits intoSEKOIA-IO:developfrom
Draft
Conversation
Contributor
Reviewer's GuideImplements a new Anozrway historical connector module for SEKOIA.IO, including an async HTTP client with OAuth2, rate limiting and retry logic, a stateful historical connector with deduplication and checkpointing, Prometheus metrics, packaging/manifest/Docker support, and a comprehensive pytest suite. Sequence diagram for Anozrway historical collection run loopsequenceDiagram
actor Operator
participant Module as SekoiaModule
participant Connector as AnozrwayHistoricalConnector
participant Client as AnozrwayClient
participant API as AnozrwayAPI
participant Intake as SekoiaIntake
participant Metrics as PrometheusMetrics
Operator->>Module: start module.run()
Module->>Connector: register and start anozrway_historical
activate Connector
loop while Connector.running
Connector->>Client: __aenter__(module_config, trigger)
activate Client
Client->>API: _get_access_token() via token_url
API-->>Client: access_token
loop fetch_events windows
Connector->>Client: fetch_events(context, domain, start_date, end_date)
activate Client
Client->>API: POST /events
API-->>Client: { events: [...] }
Client-->>Connector: List events
deactivate Client
Connector->>Connector: deduplicate, enrich, update max_seen_ts
Connector->>Metrics: api_requests, api_request_duration, events_collected, events_duplicated
alt non_empty_batch
Connector->>Intake: push_data_to_intakes(events=batch)
Connector->>Metrics: events_forwarded
end
end
Connector->>Connector: save_checkpoint(last_seen+1s)
Connector->>Metrics: checkpoint_age
Client->>Client: __aexit__()
deactivate Client
Connector->>Connector: sleep(configuration.frequency)
end
Connector-->>Module: shutdown
deactivate Connector
Class diagram for Anozrway connector, client and moduleclassDiagram
class Module
class Trigger
class AsyncConnector
class DefaultConnectorConfiguration
class BaseModel
class AnozrwayModule {
+str name
+str description
}
class AnozrwayModuleConfiguration {
}
class AnozrwayHistoricalConfiguration {
+Optional~str~ intake_server
+str intake_key
+int frequency
+int chunk_size
+str context
+str domains
+int lookback_days
+int window_seconds
}
class AnozrwayHistoricalConnector {
+str name
+AnozrwayHistoricalConfiguration configuration
-PersistentJSON context_store
-PersistentJSON event_cache_store
-timedelta event_cache_ttl
+last_checkpoint() datetime
+save_checkpoint(last_seen datetime) void
+fetch_events(client AnozrwayClient) AsyncGenerator~List~
+next_batch() AsyncGenerator~List~
+run() void
+_async_run() void
-_parse_domains() List~str~
-_cleanup_event_cache() void
-_is_new_event(cache_key str) bool
+_compute_dedup_key(searched_domain str, event Dict) str
+_extract_event_ts(event Dict) datetime
+_extract_entity_id(event Dict) str
+_safe_str(x Any) str
}
class AnozrwayClient {
-Dict cfg
-Optional~Trigger~ trigger
-str base_url
-str token_url
-str client_id
-str client_secret
-Any x_restrict_access
-int timeout
-Optional~aiohttp.ClientSession~ _session
-Optional~str~ _access_token
-Optional~datetime~ _token_expires_at
-AsyncLimiter _rate_limiter
+AnozrwayClient(module_config Dict, trigger Trigger)
+log(message str, level str) void
+search_domain_v1(context str, domain str, start_date datetime, end_date datetime) List~Dict~
+fetch_events(context str, domain str, start_date datetime, end_date datetime) List~Dict~
+__aenter__() AnozrwayClient
+__aexit__(exc_type Any, exc_val Any, exc_tb Any) void
-_get_access_token() str
-_to_iso(dt datetime) str
}
class AnozrwayError {
}
class AnozrwayAuthError {
}
class AnozrwayRateLimitError {
}
AnozrwayModule --|> Module
AnozrwayModuleConfiguration --|> BaseModel
AnozrwayHistoricalConfiguration --|> DefaultConnectorConfiguration
AnozrwayHistoricalConnector --|> AsyncConnector
AnozrwayHistoricalConnector o--> AnozrwayHistoricalConfiguration
AnozrwayHistoricalConnector o--> AnozrwayClient
AnozrwayClient ..> Trigger : optional
AnozrwayError --|> Exception
AnozrwayAuthError --|> AnozrwayError
AnozrwayRateLimitError --|> AnozrwayError
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Contributor
There was a problem hiding this comment.
Hey - I've found 4 issues, and left some high level feedback:
- The Dockerfile uses
python:3.11whilepyproject.tomldeclarespython = "^3.12"; aligning these versions will avoid surprises at runtime and ensure the image matches the declared runtime constraints. - In
AnozrwayClient.__aenter__, if_get_access_tokenraises (e.g. bad credentials), theaiohttp.ClientSessionis never closed; consider wrapping the token fetch in a try/except that closes the session on failure to avoid leaking connections.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The Dockerfile uses `python:3.11` while `pyproject.toml` declares `python = "^3.12"`; aligning these versions will avoid surprises at runtime and ensure the image matches the declared runtime constraints.
- In `AnozrwayClient.__aenter__`, if `_get_access_token` raises (e.g. bad credentials), the `aiohttp.ClientSession` is never closed; consider wrapping the token fetch in a try/except that closes the session on failure to avoid leaking connections.
## Individual Comments
### Comment 1
<location> `Anozrway/anozrway_modules/historical_connector.py:154` </location>
<code_context>
+ """
+ nom_fuite = cls._safe_str(event.get("nom_fuite")).strip().lower()
+ ts = cls._extract_event_ts(event)
+ ts_s = ts.isoformat().replace("+00:00", "Z") if ts else ""
+
+ raw = "|".join(
</code_context>
<issue_to_address>
**issue (bug_risk):** Events without a parsable timestamp will all share the same dedup key segment, potentially collapsing distinct events.
When `ts` is `None`, `ts_s` becomes an empty string, so all events with the same `(searched_domain, nom_fuite)` but missing/unparseable timestamps collapse to one dedup key. If multiple such events are valid from the upstream API, this will drop data. Consider falling back to another stable field (or the raw timestamp string) instead of `""` to distinguish them.
</issue_to_address>
### Comment 2
<location> `Anozrway/anozrway_modules/historical_connector.py:282-285` </location>
<code_context>
+ import asyncio
+ import signal
+
+ loop = asyncio.get_event_loop()
+
+ def handle_stop_signal():
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Using `asyncio.get_event_loop()` in `run` can be fragile on newer Python versions and when no default loop exists.
On Python 3.11+ this call is deprecated when no loop is running and may raise in some environments. Since this is the connector entrypoint, prefer creating and setting an explicit loop, e.g. `loop = asyncio.new_event_loop(); asyncio.set_event_loop(loop)`, rather than relying on the implicit global loop.
```suggestion
import asyncio
import signal
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
```
</issue_to_address>
### Comment 3
<location> `Anozrway/tests/test_misc.py:29-35` </location>
<code_context>
+ assert AnozrwayModuleConfiguration() is not None
+
+
+def test_metrics_smoke():
+ assert events_collected._name == "anozrway_historical_events_collected"
+ assert events_forwarded._name == "anozrway_historical_events_forwarded"
+ assert events_duplicated._name == "anozrway_historical_events_duplicated"
+ assert api_requests._name == "anozrway_api_requests"
+ assert api_request_duration._name == "anozrway_api_request_duration_seconds"
+ assert checkpoint_age._name == "anozrway_checkpoint_age_seconds"
</code_context>
<issue_to_address>
**issue (testing):** Metric name expectations don't match the Prometheus convention used in the implementation
`Counter` metrics in `metrics.py` are defined with the `_total` suffix (e.g. `anozrway_historical_events_collected_total`), but these assertions expect names without `_total`, so `test_metrics_smoke` will fail despite correct metric definitions. Please align the expected names with the actual ones (including `_total` for counters) or relax the checks (e.g. `endswith("_events_collected_total")`) to assert the intended semantics rather than the full literal name.
</issue_to_address>
### Comment 4
<location> `Anozrway/anozrway_modules/client/http_client.py:89` </location>
<code_context>
+ def _to_iso(dt: datetime) -> str:
+ return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
+
+ async def search_domain_v1(
+ self,
+ context: str,
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the shared POST/retry/backoff/response-handling logic into a single private helper so the two public methods only construct payloads and delegate to it.
You can remove a lot of duplication by centralizing the “POST + retry + backoff + result extraction” logic into a single private helper, then make the two public methods just build payloads and delegate.
A focused refactor could look like this:
```python
# helper inside AnozrwayClient
async def _post_with_retry(
self,
path: str,
payload: Dict[str, Any],
*,
result_key: str,
unauthorized_msg: str,
generic_error_msg: str,
) -> List[Dict[str, Any]]:
if not self._session:
raise AnozrwayError("HTTP session not initialized")
access_token = await self._get_access_token()
url = f"{self.base_url}{path}"
headers = {
"Content-Type": "application/json",
"authorization": f"Bearer {access_token}",
}
if self.x_restrict_access:
headers["x-restrict-access"] = str(self.x_restrict_access)
max_attempts = 3
attempt = 0
backoff = 1
while attempt < max_attempts:
attempt += 1
async with self._rate_limiter:
async with self._session.post(
url,
json=payload,
headers=headers,
timeout=self.timeout,
raise_for_status=False,
) as resp:
status = resp.status
if status == 401:
# drop token and retry once
self._access_token = None
self._token_expires_at = None
if attempt < max_attempts:
continue
raise AnozrwayAuthError(unauthorized_msg)
if status == 429:
await asyncio.sleep(60 * backoff)
backoff *= 2
continue
if status != 200:
text = await resp.text()
raise AnozrwayError(f"{generic_error_msg} ({status}): {text}")
data = await resp.json()
results = data.get(result_key) or []
if not isinstance(results, list):
return []
return results
raise AnozrwayRateLimitError(
f"Exceeded maximum retry attempts while calling {generic_error_msg}"
)
```
Then your two public methods become much smaller and easier to reason about:
```python
async def search_domain_v1(
self,
context: str,
domain: str,
start_date: datetime,
end_date: datetime,
) -> List[Dict[str, Any]]:
payload = {
"context": context,
"domain": domain,
"start_date": self._to_iso(start_date),
"end_date": self._to_iso(end_date),
}
return await self._post_with_retry(
"/v1/domain/searches",
payload,
result_key="results",
unauthorized_msg="Unauthorized when calling Anozrway v1 domain search",
generic_error_msg="v1 domain search failed",
)
async def fetch_events(
self,
context: str,
domain: str,
start_date: datetime,
end_date: datetime,
) -> List[Dict[str, Any]]:
payload = {
"context": context,
"domain": domain,
"start_date": self._to_iso(start_date),
"end_date": self._to_iso(end_date),
}
return await self._post_with_retry(
"/events",
payload,
result_key="events",
unauthorized_msg="Unauthorized when calling Balise Pipeline /events",
generic_error_msg="Balise Pipeline /events failed",
)
```
This keeps all existing behavior (tokens, headers, rate limiting, 401 handling, 429 backoff, error messages) but makes future changes to the HTTP/retry logic localized to one place.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
Review the following changes in direct dependencies. Learn more about Socket for GitHub.
|
|
All alerts resolved. Learn more about Socket for GitHub. This PR previously contained dependency changes with security issues that have been resolved, removed, or ignored. |
…1/domain/searches
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
This PR adds a new connector for Anozrway to the SEKOIA.IO automation library.
Features
Components
anozrway_modules/client/: HTTP client and error handlinganozrway_modules/historical_connector.py: Main connector logicanozrway_modules/models.py: Data modelsanozrway_modules/metrics.py: Metrics collectiontests/: Complete test suitemanifest.json: Connector configurationDockerfile: Container configurationType of change
Testing
Checklist
Summary by Sourcery
Introduce an Anozrway integration module providing a historical connector that ingests leak detection events from the Anozrway Balise Pipeline into SEKOIA.IO, backed by an OAuth2-enabled HTTP client and observability via Prometheus metrics.
New Features:
Enhancements:
Build:
Documentation:
Tests: