Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Imperva/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

## 2026-02-24 - 1.21.4

### Added

- Track the number of forwarded events in the logs

### Fixed

- Fix try-except blocks to avoid unassigned variable errors

## 2025-10-28 - 1.21.3

### Changed
Expand Down
40 changes: 28 additions & 12 deletions Imperva/imperva/fetch_logs_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
from posixpath import join as urljoin

import requests
from cryptography.hazmat.primitives import padding as sym_padding
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import padding as sym_padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from pydantic.v1 import BaseModel
from sekoia_automation.checkpoint import CheckpointCursor
Expand All @@ -30,6 +30,7 @@ class HandlingFileResult(BaseModel):
log_name: LogFileId
successful: bool
last_timestamp: int | None = None
nb_forwarded_events: int | None = None

class Config:
arbitrary_types_allowed = True
Expand Down Expand Up @@ -110,23 +111,28 @@ def handle_file(self, log_name: LogFileId) -> HandlingFileResult:
try:
last_timestamp = extract_last_timestamp(response.content)
decrypted_file = self.decrypt_file(response.content, log_name.get_filename())
self.handle_log_decrypted_content(decrypted_file)
nb_events_forwarded = self.handle_log_decrypted_content(decrypted_file)

self.log(
message=f"File {log_name.get_filename()} downloading and processing completed successfully",
level="info",
)

return HandlingFileResult(
log_name=log_name,
successful=True,
last_timestamp=last_timestamp,
nb_forwarded_events=nb_events_forwarded,
)

except Exception as e:
self.log(
message=f"Fail file decryption or handling : {str(e)}",
level="error",
)
return HandlingFileResult(log_name=log_name, successful=False)

return HandlingFileResult(log_name=log_name, successful=True, last_timestamp=last_timestamp)

def handle_log_decrypted_content(self, decrypted_file: bytes) -> None:
def handle_log_decrypted_content(self, decrypted_file: bytes) -> int:
decrypted_file_text: str = decrypted_file.decode("utf-8") # many lines
events_list: list[str] = decrypted_file_text.split("\n")

Expand All @@ -135,7 +141,8 @@ def handle_log_decrypted_content(self, decrypted_file: bytes) -> None:

OUTCOMING_EVENTS.labels(intake_key=self.configuration.intake_key).inc(len(events_list))

self.push_events_to_intakes(events_list)
events_ids = self.push_events_to_intakes(events_list)
return len(events_ids)
Comment on lines 142 to +145
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OUTCOMING_EVENTS is documented as "Number of events forwarded", but it is incremented using len(events_list) (events attempted) while nb_forwarded_events is computed from the ids returned by push_events_to_intakes (events actually forwarded). To keep metrics consistent with the new forwarded-events logging, increment OUTCOMING_EVENTS based on the number of successfully forwarded events (i.e., the length of the returned ids).

Copilot uses AI. Check for mistakes.

def decrypt_file(self, file_content: bytes, filename: str) -> bytes:
# each log file is built from a header section and a content section, the two are divided by a |==| mark
Expand Down Expand Up @@ -263,6 +270,7 @@ def run(self) -> None:

self.in_progress.extend(additions)
last_timestamp = None
nb_forwarded_events = list()
try:
with ThreadPoolExecutor(max_workers=self.NUM_WORKERS) as pool:
for item in pool.map(self.process_file, additions, timeout=3600):
Expand All @@ -271,6 +279,9 @@ def run(self) -> None:
):
last_timestamp = item.last_timestamp

if item.nb_forwarded_events is not None:
nb_forwarded_events.append(item.nb_forwarded_events)

if self.processed:
if last_timestamp:
now = datetime.now(tz=timezone.utc).timestamp()
Expand All @@ -279,19 +290,24 @@ def run(self) -> None:

self.last_seen_log = max(self.processed)
self.cursor.offset = self.last_seen_log.get_filename()

# get the ending time and compute the duration to fetch the events
batch_end_time = time.time()
batch_duration = int(batch_end_time - batch_start_time)
self.log(f"Fetched and forwarded events in {batch_duration} seconds", level="info")
FORWARD_EVENTS_DURATION.labels(intake_key=self.configuration.intake_key).observe(batch_duration)
except Exception as e:
self.log_exception(e)

# Clear failed items - successful ones already removed in process_file
additions_set = set(additions)
self.in_progress = deque(item for item in self.in_progress if item not in additions_set)

# Compute the total number of forwarded events in this batch
total_forwarded_events = sum(nb_forwarded_events) if nb_forwarded_events else 0

# get the ending time and compute the duration to fetch the events
batch_end_time = time.time()
batch_duration = int(batch_end_time - batch_start_time)
self.log(
f"Fetched and forwarded {total_forwarded_events} events in {batch_duration} seconds", level="info"
)
FORWARD_EVENTS_DURATION.labels(intake_key=self.configuration.intake_key).observe(batch_duration)

# compute the remaining sleeping time. If greater than 0, sleep
delta_sleep = self.configuration.frequency - batch_duration
if delta_sleep > 0:
Expand Down
2 changes: 1 addition & 1 deletion Imperva/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"name": "Imperva",
"uuid": "ee0e5c81-5410-48d4-b155-135679c5ebb8",
"slug": "imperva",
"version": "1.21.3",
"version": "1.21.4",
"categories": [
"Network"
]
Expand Down
12 changes: 5 additions & 7 deletions Imperva/tests/test_fetch_logs_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ def test_handle_file(trigger, file_1):
assert len(logs) == 1

res = trigger.handle_file(logs[0])
assert res == HandlingFileResult(successful=True, log_name=logs[0], last_timestamp=1759413775485)
assert res == HandlingFileResult(
successful=True, log_name=logs[0], last_timestamp=1759413775485, nb_forwarded_events=0
)

assert trigger.push_events_to_intakes.call_count == 1

Expand Down Expand Up @@ -150,18 +152,14 @@ def test_decrypt_file_with_encryption(trigger, encrypted_aes_key, aes_key, publi
content_encrypted_sym_key = base64.b64encode(encrypted_aes_key)
true_value = b"Event 1: 11232323423\nEvent2: 234234234234\nEvent 3: 23234234243\nEvent2: 234234234234\nEvent 3: 23234234243\nEvent2: 234234234234\nEvent 3: 23234234242\n"

header = (
b"""accountId:1
header = b"""accountId:1
configId:2
checksum:549c035bf2ffcaa0fe1b7644f8edf61b
format:CEF
startTime:1759413560916
endTime:1759413775485
publicKeyId:1
key:"""
+ content_encrypted_sym_key
+ b"\n|==|\n"
)
key:""" + content_encrypted_sym_key + b"\n|==|\n"

encrypted_content = encrypt_with_aes(true_value, aes_key)
encrypted_without_compression = header + encrypted_content
Expand Down