Conversation
Reviewer's GuideRefactors error handling around log file processing to ensure safe variable usage and adds tracking/aggregation of the number of forwarded events per batch, updating the public interface, logging, metrics, and tests accordingly. Sequence diagram for updated batch log processing and event forwardingsequenceDiagram
participant Runner as Runner
participant Fetcher as ImpervaLogsFetcher
participant Executor as ThreadPoolExecutor
participant Worker as process_file
participant Handler as handle_file
participant Decrypt as decrypt_file
participant Content as handle_log_decrypted_content
participant Intake as push_events_to_intakes
Runner->>Fetcher: run()
activate Fetcher
Fetcher->>Fetcher: start_batch_timer()
Fetcher->>Executor: create_pool(max_workers=NUM_WORKERS)
activate Executor
loop for each addition in additions
Executor->>Worker: process_file(log_file)
activate Worker
Worker->>Handler: handle_file(log_name)
activate Handler
Handler->>Fetcher: get_file_response(log_name)
Handler->>Decrypt: decrypt_file(response.content, filename)
activate Decrypt
Decrypt-->>Handler: decrypted_file
deactivate Decrypt
Handler->>Content: handle_log_decrypted_content(decrypted_file)
activate Content
Content->>Content: split_to_events(decrypted_file_text)
Content->>Content: OUTCOMING_EVENTS.inc(len(events_list))
Content->>Intake: push_events_to_intakes(events_list)
activate Intake
Intake-->>Content: events_ids
deactivate Intake
Content-->>Handler: nb_events_forwarded
deactivate Content
Handler-->>Worker: HandlingFileResult(successful=True, last_timestamp, nb_forwarded_events)
deactivate Handler
Worker-->>Executor: HandlingFileResult
deactivate Worker
end
Executor-->>Fetcher: iterator_of_HandlingFileResult
deactivate Executor
Fetcher->>Fetcher: collect_last_timestamp(results)
Fetcher->>Fetcher: collect_nb_forwarded_events(results)
Fetcher->>Fetcher: total_forwarded_events = sum(nb_forwarded_events)
Fetcher->>Fetcher: compute_batch_duration()
Fetcher->>Fetcher: log("Fetched and forwarded total_forwarded_events events in batch_duration seconds")
Fetcher->>Fetcher: FORWARD_EVENTS_DURATION.observe(batch_duration)
Fetcher-->>Runner: batch_completed
deactivate Fetcher
Class diagram for updated HandlingFileResult and log fetcher methodsclassDiagram
class HandlingFileResult {
LogFileId log_name
bool successful
int last_timestamp
int nb_forwarded_events
}
class ImpervaLogsFetcher {
+handle_file(log_name: LogFileId) HandlingFileResult
+handle_log_decrypted_content(decrypted_file: bytes) int
+decrypt_file(file_content: bytes, filename: str) bytes
+push_events_to_intakes(events_list: list~str~) list~str~
+run() None
}
ImpervaLogsFetcher --> HandlingFileResult : returns
ImpervaLogsFetcher --> HandlingFileResult : aggregates nb_forwarded_events
ImpervaLogsFetcher ..> LogFileId : uses as parameter
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 1 issue
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="Imperva/imperva/fetch_logs_v2.py" line_range="133" />
<code_context>
- return HandlingFileResult(log_name=log_name, successful=False)
-
- return HandlingFileResult(log_name=log_name, successful=True, last_timestamp=last_timestamp)
+ return HandlingFileResult(log_name=log_name, successful=False, nb_forwarded_events=0)
- def handle_log_decrypted_content(self, decrypted_file: bytes) -> None:
</code_context>
<issue_to_address>
**question:** Clarify whether a failed file handling should report 0 forwarded events or an unknown/None value.
Using `nb_forwarded_events=0` on failure makes it impossible to distinguish “no events forwarded” from “processing failed and event count is unknown.” If metrics should only aggregate successfully processed files, consider keeping `nb_forwarded_events` as `None` on failure and excluding those from the batch sum. If the current semantics (treating failures as 0) are intentional, please confirm that this matches how downstream metrics are interpreted.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Pull request overview
This PR updates the Imperva connector to avoid unassigned-variable issues in try/except paths and adds batch-level logging for the number of forwarded events.
Changes:
- Extend
HandlingFileResultto includenb_forwarded_events, and return it from per-file processing. - Track and log total forwarded events per batch along with batch duration.
- Bump connector version to
1.21.4and add corresponding changelog entry.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
Imperva/imperva/fetch_logs_v2.py |
Return forwarded event counts per file, aggregate them per batch, and log totals + duration. |
Imperva/tests/test_fetch_logs_v2.py |
Update expectations to include nb_forwarded_events in HandlingFileResult. |
Imperva/manifest.json |
Version bump to 1.21.4. |
Imperva/CHANGELOG.md |
Document fixes/additions for 1.21.4. |
| res = trigger.handle_file(logs[0]) | ||
| assert res == HandlingFileResult(successful=True, log_name=logs[0], last_timestamp=1759413775485) | ||
| assert res == HandlingFileResult( | ||
| successful=True, log_name=logs[0], last_timestamp=1759413775485, nb_forwarded_events=0 | ||
| ) |
There was a problem hiding this comment.
test_handle_file now asserts nb_forwarded_events=0, but the fixture file contains 3 events and handle_log_decrypted_content() returns len(push_events_to_intakes(...)). Because push_events_to_intakes is a bare MagicMock, this test will always see len(...) == 0 and won’t validate the new behavior. Set an explicit return_value for push_events_to_intakes (e.g., a list of 3 ids) and assert the corresponding forwarded count.
| OUTCOMING_EVENTS.labels(intake_key=self.configuration.intake_key).inc(len(events_list)) | ||
|
|
||
| self.push_events_to_intakes(events_list) | ||
| events_ids = self.push_events_to_intakes(events_list) | ||
| return len(events_ids) |
There was a problem hiding this comment.
OUTCOMING_EVENTS is documented as "Number of events forwarded", but it is incremented using len(events_list) (events attempted) while nb_forwarded_events is computed from the ids returned by push_events_to_intakes (events actually forwarded). To keep metrics consistent with the new forwarded-events logging, increment OUTCOMING_EVENTS based on the number of successfully forwarded events (i.e., the length of the returned ids).
78aa0a4 to
f6fd1f7
Compare
f6fd1f7 to
7cef53f
Compare
Summary by Sourcery
Improve Imperva log fetching by safely handling file processing errors and reporting forwarded event counts per batch.
New Features:
Bug Fixes:
Enhancements:
Documentation:
Tests:
Chores: