diff --git a/Ubika/CHANGELOG.md b/Ubika/CHANGELOG.md index fac505375..cce8e6dab 100644 --- a/Ubika/CHANGELOG.md +++ b/Ubika/CHANGELOG.md @@ -8,6 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +## 2025-12-09 - 1.0.4 + +### Added + +- Add timestepper to avoid timeouts on large data fetches + ## 2025-11-28 - 1.0.3 ### Fixed diff --git a/Ubika/manifest.json b/Ubika/manifest.json index c921f8ee8..de135e9f1 100644 --- a/Ubika/manifest.json +++ b/Ubika/manifest.json @@ -11,7 +11,7 @@ "name": "Ubika", "uuid": "0c82ee9b-f645-47f9-8e16-a689cfc246c4", "slug": "ubika", - "version": "1.0.3", + "version": "1.0.4", "categories": [ "Network" ] diff --git a/Ubika/tests/test_timestepper.py b/Ubika/tests/test_timestepper.py new file mode 100644 index 000000000..8f1c92492 --- /dev/null +++ b/Ubika/tests/test_timestepper.py @@ -0,0 +1,244 @@ +import datetime +from unittest.mock import MagicMock, patch + +import pytest + +from ubika_modules.timestepper import TimeStepper + + +@pytest.fixture +def mock_trigger(): + trigger = MagicMock() + trigger.log = MagicMock() + trigger.configuration.intake_key = "test_intake_key" + return trigger + + +@pytest.fixture +def fixed_time(): + return datetime.datetime(2024, 1, 15, 12, 0, 0, tzinfo=datetime.UTC) + + +def test_timestepper_initialization(mock_trigger): + start = datetime.datetime(2024, 1, 15, 10, 0, 0, tzinfo=datetime.UTC) + end = datetime.datetime(2024, 1, 15, 11, 0, 0, tzinfo=datetime.UTC) + frequency = datetime.timedelta(seconds=60) + timedelta = datetime.timedelta(minutes=1) + + stepper = TimeStepper(mock_trigger, start, end, frequency, timedelta) + + assert stepper.trigger == mock_trigger + assert stepper.start == start + assert stepper.end == end + assert stepper.frequency == frequency + assert stepper.timedelta == timedelta + + +def test_ranges_yields_current_time_range_first(mock_trigger): + start = datetime.datetime(2024, 1, 15, 10, 0, 0, tzinfo=datetime.UTC) + end = datetime.datetime(2024, 1, 15, 11, 0, 0, tzinfo=datetime.UTC) + frequency = datetime.timedelta(seconds=60) + timedelta = datetime.timedelta(minutes=1) + + stepper = TimeStepper(mock_trigger, start, end, frequency, timedelta) + + with patch("ubika_modules.timestepper.datetime") as mock_datetime, patch("ubika_modules.timestepper.time"): + mock_datetime.datetime.now.return_value = datetime.datetime(2024, 1, 15, 12, 0, 0, tzinfo=datetime.UTC) + mock_datetime.UTC = datetime.UTC + mock_datetime.timedelta = datetime.timedelta + + generator = stepper.ranges() + first_range = next(generator) + + assert first_range == (start, end) + + +def test_ranges_updates_start_and_end_after_iteration(mock_trigger): + start = datetime.datetime(2024, 1, 15, 10, 0, 0, tzinfo=datetime.UTC) + end = datetime.datetime(2024, 1, 15, 10, 1, 0, tzinfo=datetime.UTC) + frequency = datetime.timedelta(seconds=60) + timedelta = datetime.timedelta(minutes=1) + + stepper = TimeStepper(mock_trigger, start, end, frequency, timedelta) + + with patch("ubika_modules.timestepper.datetime") as mock_datetime, patch("ubika_modules.timestepper.time"): + mock_datetime.datetime.now.return_value = datetime.datetime(2024, 1, 15, 12, 0, 0, tzinfo=datetime.UTC) + mock_datetime.UTC = datetime.UTC + mock_datetime.timedelta = datetime.timedelta + + generator = stepper.ranges() + next(generator) + second_range = next(generator) + + assert second_range[0] == end + assert second_range[1] > end + + +def test_ranges_sleeps_when_next_end_is_in_future(mock_trigger): + start = datetime.datetime(2024, 1, 15, 11, 58, 0, tzinfo=datetime.UTC) + end = datetime.datetime(2024, 1, 15, 11, 59, 0, tzinfo=datetime.UTC) + frequency = datetime.timedelta(seconds=60) + timedelta = datetime.timedelta(minutes=1) + + stepper = TimeStepper(mock_trigger, start, end, frequency, timedelta) + + with patch("ubika_modules.timestepper.datetime") as mock_datetime, patch( + "ubika_modules.timestepper.time" + ) as mock_time: + now = datetime.datetime(2024, 1, 15, 11, 59, 30, tzinfo=datetime.UTC) + mock_datetime.datetime.now.return_value = now + mock_datetime.UTC = datetime.UTC + mock_datetime.timedelta = datetime.timedelta + mock_time.sleep = MagicMock() + + generator = stepper.ranges() + next(generator) + next(generator) + + assert mock_time.sleep.call_count == 1 + sleep_duration = mock_time.sleep.call_args[0][0] + assert sleep_duration > 0 + + +def test_ranges_does_not_sleep_when_next_end_is_in_past(mock_trigger): + start = datetime.datetime(2024, 1, 15, 10, 0, 0, tzinfo=datetime.UTC) + end = datetime.datetime(2024, 1, 15, 10, 1, 0, tzinfo=datetime.UTC) + frequency = datetime.timedelta(seconds=60) + timedelta = datetime.timedelta(minutes=1) + + stepper = TimeStepper(mock_trigger, start, end, frequency, timedelta) + + with patch("ubika_modules.timestepper.datetime") as mock_datetime, patch( + "ubika_modules.timestepper.time" + ) as mock_time: + mock_datetime.datetime.now.return_value = datetime.datetime(2024, 1, 15, 12, 0, 0, tzinfo=datetime.UTC) + mock_datetime.UTC = datetime.UTC + mock_datetime.timedelta = datetime.timedelta + mock_time.sleep = MagicMock() + + generator = stepper.ranges() + next(generator) + next(generator) + + assert mock_time.sleep.call_count == 0 + + +def test_create_with_default_parameters(mock_trigger, fixed_time): + with patch("ubika_modules.timestepper.datetime") as mock_datetime: + mock_datetime.datetime.now.return_value = fixed_time + mock_datetime.UTC = datetime.UTC + mock_datetime.timedelta = datetime.timedelta + + stepper = TimeStepper.create(mock_trigger) + + assert stepper.trigger == mock_trigger + assert stepper.frequency == datetime.timedelta(seconds=60) + assert stepper.timedelta == datetime.timedelta(minutes=1) + assert stepper.end < fixed_time + assert stepper.start < stepper.end + + +def test_create_with_start_time_zero(mock_trigger, fixed_time): + with patch("ubika_modules.timestepper.datetime") as mock_datetime: + mock_datetime.datetime.now.return_value = fixed_time + mock_datetime.UTC = datetime.UTC + mock_datetime.timedelta = datetime.timedelta + + stepper = TimeStepper.create(mock_trigger, start_time=0) + + expected_end = fixed_time - datetime.timedelta(minutes=1) + assert stepper.end == expected_end + + +def test_create_with_custom_start_time(mock_trigger, fixed_time): + with patch("ubika_modules.timestepper.datetime") as mock_datetime: + mock_datetime.datetime.now.return_value = fixed_time + mock_datetime.UTC = datetime.UTC + mock_datetime.timedelta = datetime.timedelta + + stepper = TimeStepper.create(mock_trigger, start_time=2) + + expected_end = fixed_time - datetime.timedelta(hours=2) + assert stepper.end == expected_end + + +def test_create_with_custom_frequency_and_timedelta(mock_trigger, fixed_time): + with patch("ubika_modules.timestepper.datetime") as mock_datetime: + mock_datetime.datetime.now.return_value = fixed_time + mock_datetime.UTC = datetime.UTC + mock_datetime.timedelta = datetime.timedelta + + stepper = TimeStepper.create(mock_trigger, frequency=120, timedelta=5) + + assert stepper.frequency == datetime.timedelta(seconds=120) + assert stepper.timedelta == datetime.timedelta(minutes=5) + + +def test_create_from_time_with_default_parameters(mock_trigger): + start = datetime.datetime(2024, 1, 15, 10, 0, 0, tzinfo=datetime.UTC) + + stepper = TimeStepper.create_from_time(mock_trigger, start) + + assert stepper.trigger == mock_trigger + assert stepper.start == start + assert stepper.end == start + datetime.timedelta(seconds=60) + assert stepper.frequency == datetime.timedelta(seconds=60) + assert stepper.timedelta == datetime.timedelta(minutes=1) + + +def test_create_from_time_with_custom_parameters(mock_trigger): + start = datetime.datetime(2024, 1, 15, 10, 0, 0, tzinfo=datetime.UTC) + + stepper = TimeStepper.create_from_time(mock_trigger, start, frequency=300, timedelta=10) + + assert stepper.start == start + assert stepper.end == start + datetime.timedelta(seconds=300) + assert stepper.frequency == datetime.timedelta(seconds=300) + assert stepper.timedelta == datetime.timedelta(minutes=10) + + +def test_ranges_logs_current_lag(mock_trigger): + start = datetime.datetime(2024, 1, 15, 10, 0, 0, tzinfo=datetime.UTC) + end = datetime.datetime(2024, 1, 15, 10, 1, 0, tzinfo=datetime.UTC) + frequency = datetime.timedelta(seconds=60) + timedelta = datetime.timedelta(minutes=1) + + stepper = TimeStepper(mock_trigger, start, end, frequency, timedelta) + + with patch("ubika_modules.timestepper.datetime") as mock_datetime, patch("ubika_modules.timestepper.time"): + mock_datetime.datetime.now.return_value = datetime.datetime(2024, 1, 15, 12, 0, 0, tzinfo=datetime.UTC) + mock_datetime.UTC = datetime.UTC + mock_datetime.timedelta = datetime.timedelta + + generator = stepper.ranges() + next(generator) + next(generator) + + assert mock_trigger.log.called + log_calls = [call for call in mock_trigger.log.call_args_list if "Current lag" in str(call)] + assert len(log_calls) > 0 + + +def test_ranges_logs_waiting_message_when_sleeping(mock_trigger): + start = datetime.datetime(2024, 1, 15, 11, 58, 0, tzinfo=datetime.UTC) + end = datetime.datetime(2024, 1, 15, 11, 59, 0, tzinfo=datetime.UTC) + frequency = datetime.timedelta(seconds=60) + timedelta = datetime.timedelta(minutes=1) + + stepper = TimeStepper(mock_trigger, start, end, frequency, timedelta) + + with patch("ubika_modules.timestepper.datetime") as mock_datetime, patch( + "ubika_modules.timestepper.time" + ) as mock_time: + now = datetime.datetime(2024, 1, 15, 11, 59, 30, tzinfo=datetime.UTC) + mock_datetime.datetime.now.return_value = now + mock_datetime.UTC = datetime.UTC + mock_datetime.timedelta = datetime.timedelta + mock_time.sleep = MagicMock() + + generator = stepper.ranges() + next(generator) + next(generator) + + log_calls = [call for call in mock_trigger.log.call_args_list if "Waiting" in str(call)] + assert len(log_calls) > 0 diff --git a/Ubika/tests/test_ubika_cloud_protector_next_gen.py b/Ubika/tests/test_ubika_cloud_protector_next_gen.py index 09facba47..8f35941fd 100644 --- a/Ubika/tests/test_ubika_cloud_protector_next_gen.py +++ b/Ubika/tests/test_ubika_cloud_protector_next_gen.py @@ -1,4 +1,5 @@ from unittest.mock import MagicMock, patch +from datetime import datetime, timezone import pytest import requests_mock @@ -151,7 +152,7 @@ def test_fetch_events_with_pagination(trigger, message1, message2): ) trigger.from_timestamp = 1747326567845 - events = trigger.fetch_events() + events = trigger._UbikaCloudProtectorNextGenConnector__fetch_next_events(1747326567845, 1747326667845) assert list(events) == [message1["spec"]["items"]] @@ -172,8 +173,7 @@ def test_next_batch_sleep_until_next_round(trigger, message1, message2): ) mock_requests.get( - "https://api.ubika.io/rest/logs.ubika.io/v1/ns/sekoia/security-events?filters.fromDate=1747326567845&" - "pagination.realtime=True&pagination.pageSize=100", + "https://api.ubika.io/rest/logs.ubika.io/v1/ns/sekoia/security-events?filters.fromDate=1747326560000&filters.toDate=1747326560000&pagination.realtime=True&pagination.pageSize=100", status_code=200, json=message1, ) @@ -192,7 +192,10 @@ def test_next_batch_sleep_until_next_round(trigger, message1, message2): end_time = start_time + batch_duration mock_time.time.side_effect = [start_time, end_time, end_time] - trigger.next_batch() + start = datetime.fromtimestamp(1747326560, tz=timezone.utc) + end = datetime.fromtimestamp(1747326560, tz=timezone.utc) + + trigger.next_batch(start, end) assert trigger.push_events_to_intakes.call_count == 1 assert mock_time.sleep.call_count == 0 @@ -206,5 +209,8 @@ def test_authorization_error(trigger): status_code=400, ) + start = datetime.fromtimestamp(1747326560, tz=timezone.utc) + end = datetime.fromtimestamp(1747326560, tz=timezone.utc) + with pytest.raises(AuthorizationError): - trigger.next_batch() + trigger.next_batch(start=start, end=end) diff --git a/Ubika/ubika_modules/connector_ubika_cloud_protector_next_gen.py b/Ubika/ubika_modules/connector_ubika_cloud_protector_next_gen.py index 1bbe0918b..935930709 100644 --- a/Ubika/ubika_modules/connector_ubika_cloud_protector_next_gen.py +++ b/Ubika/ubika_modules/connector_ubika_cloud_protector_next_gen.py @@ -1,7 +1,10 @@ import time from collections.abc import Generator -from datetime import timedelta +from datetime import UTC, datetime, timedelta from functools import cached_property +from typing import Any + +from dateutil.parser import isoparse import orjson import requests @@ -14,7 +17,8 @@ from . import UbikaModule from .client import UbikaCloudProtectorNextGenApiClient from .client.auth import AuthorizationError -from .metrics import EVENTS_LAG, FORWARD_EVENTS_DURATION, INCOMING_MESSAGES, OUTCOMING_EVENTS +from .metrics import FORWARD_EVENTS_DURATION, INCOMING_MESSAGES, OUTCOMING_EVENTS +from .timestepper import TimeStepper class FetchEventsException(Exception): @@ -26,7 +30,11 @@ class UbikaCloudProtectorNextGenConnectorConfiguration(DefaultConnectorConfigura refresh_token: str = Field(..., description="API refresh token", secret=True) frequency: int = 60 + chunk_size: int = 200 + # Time stepper settings + timedelta: int = 1 + start_time: int = 1 class UbikaCloudProtectorNextGenConnector(Connector): @@ -46,10 +54,44 @@ def __init__(self, *args, **kwargs): ) self.from_timestamp = self.cursor.offset + self.context = PersistentJSON("context.json", self.data_path) self.cache_context = PersistentJSON("cache.json", self.data_path) self.cache_size = 1000 self.events_cache: Cache = self.load_events_cache() + @cached_property + def stepper(self): + # Read the most recent date seen from the context + with self.context as cache: + most_recent_date_str = cache.get("most_recent_date_seen") + + # if not defined, create a new time stepper from the configuration + if most_recent_date_str is None: + return TimeStepper.create( + self, + self.configuration.frequency, + self.configuration.timedelta, + self.configuration.start_time, + ) + + # parse the most recent requested date + most_recent_date = isoparse(most_recent_date_str) + + # Ensure we don't go back more than one month + now = datetime.now(UTC) + one_month_ago = now - timedelta(days=30) + # if the most recent date is older than one month, set it to one month ago + if most_recent_date < one_month_ago: + most_recent_date = one_month_ago + + # create a time stepper from the most recent date seen + return TimeStepper.create_from_time( + self, + most_recent_date, + self.configuration.frequency, + self.configuration.timedelta, + ) + def load_events_cache(self) -> Cache: """ Load the events cache. @@ -106,12 +148,15 @@ def _handle_response_error(self, response: requests.Response) -> None: raise FetchEventsException(message) - def __fetch_next_events(self, from_timestamp: int) -> Generator[list, None, None]: + def __fetch_next_events( + self, start_timestamp: int, end_timestamp: int + ) -> Generator[list[dict[str, Any]], None, None]: # get the first page of events headers = {"Content-Type": "application/json"} url = f"https://api.ubika.io/rest/logs.ubika.io/v1/ns/{self.configuration.namespace}/security-events" params = { - "filters.fromDate": from_timestamp, + "filters.fromDate": start_timestamp, + "filters.toDate": end_timestamp, "pagination.realtime": True, "pagination.pageSize": self.configuration.chunk_size, } @@ -161,42 +206,17 @@ def __fetch_next_events(self, from_timestamp: int) -> Generator[list, None, None ) except AuthorizationError as err: - self.log(f"Authorization error: {err.args[1]}", level="critical") + self.log(f"Authorization error: {err.args[1] if len(err.args) > 1 else str(err)}", level="critical") raise - def fetch_events(self) -> Generator[list, None, None]: - most_recent_date_seen = self.from_timestamp - current_lag: int = 0 - - try: - for next_events in self.__fetch_next_events(most_recent_date_seen): - if next_events: - # save the greatest date ever seen - last_event = max(next_events, key=lambda x: int(x["timestamp"])) - last_event_timestamp = int(last_event["timestamp"]) - - if last_event_timestamp > most_recent_date_seen: - most_recent_date_seen = last_event_timestamp - - yield next_events - finally: - # save the most recent date - if most_recent_date_seen > self.from_timestamp: - self.from_timestamp = most_recent_date_seen - - # save in context the most recent date seen - self.cursor.offset = self.from_timestamp - - current_lag = time.time() - most_recent_date_seen - - EVENTS_LAG.labels(intake_key=self.configuration.intake_key).set(current_lag) - - def next_batch(self) -> None: + def next_batch(self, start: datetime, end: datetime) -> None: # save the starting time batch_start_time = time.time() + start_timestamp = int(start.timestamp() * 1000) + end_timestamp = int(end.timestamp() * 1000) # Fetch next batch - for events in self.fetch_events(): + for events in self.__fetch_next_events(start_timestamp, end_timestamp): batch_of_events = [orjson.dumps(event).decode("utf-8") for event in self.filter_processed_events(events)] # if the batch is full, push it @@ -225,22 +245,20 @@ def next_batch(self) -> None: ) # pragma: no cover FORWARD_EVENTS_DURATION.labels(intake_key=self.configuration.intake_key).observe(batch_duration) - # compute the remaining sleeping time. If greater than 0, sleep - delta_sleep = self.configuration.frequency - batch_duration - if delta_sleep > 0: - self.log( - message=f"Next batch in the future. Waiting {delta_sleep} seconds", - level="debug", - ) # pragma: no cover - time.sleep(delta_sleep) - def run(self) -> None: self.log(message=f"Start fetching {self.NAME} events", level="info") # pragma: no cover - while self.running: + for start, end in self.stepper.ranges(): + # Check if we need to stop + if self._stop_event.is_set(): + break + try: - self.next_batch() + self.next_batch(start, end) except Exception as error: self.log_exception(error, message="Failed to forward events") # pragma: no cover + finally: + with self.context as cache: + cache["most_recent_date_seen"] = end.isoformat() self.save_events_cache() diff --git a/Ubika/ubika_modules/timestepper.py b/Ubika/ubika_modules/timestepper.py new file mode 100644 index 000000000..130c0865b --- /dev/null +++ b/Ubika/ubika_modules/timestepper.py @@ -0,0 +1,112 @@ +import datetime +import time +from collections.abc import Generator + +from sekoia_automation.trigger import Trigger + +from ubika_modules.metrics import EVENTS_LAG + + +class TimeStepper: + """ + A class to generate time ranges based on a start time, frequency, and timedelta. + It yields successive time ranges and adjusts for any lag in processing. + Attributes: + trigger (Trigger): The trigger instance for logging. + start (datetime.datetime): The start time of the current range. + end (datetime.datetime): The end time of the current range. + frequency (datetime.timedelta): The duration of each time range. + timedelta (datetime.timedelta): The delay to account for late-arriving data. + Methods: + ranges() -> Generator[tuple[datetime.datetime, datetime.datetime], None, None]: + Yields successive time ranges as tuples of (start, end). + create(trigger: Trigger, frequency: int = 60, timedelta: int = 1, start_time: int = 1) -> "TimeStepper": + Class method to create a TimeStepper instance with specified parameters. + create_from_time(trigger: Trigger, start: datetime.datetime, frequency: int = 60, timedelta: int = 1) -> "TimeStepper": Class method to create a TimeStepper instance starting from a specific time. + """ + + def __init__( + self, + trigger: Trigger, + start: datetime.datetime, + end: datetime.datetime, + frequency: datetime.timedelta, + timedelta: datetime.timedelta, + ): + self.trigger = trigger + self.start = start + self.end = end + self.frequency = frequency + self.timedelta = timedelta + + def ranges( + self, + ) -> Generator[tuple[datetime.datetime, datetime.datetime], None, None]: + while True: + # return the current time range + yield self.start, self.end + + # compute the next time range + next_end = self.end + self.frequency + now = datetime.datetime.now(datetime.UTC) - self.timedelta + + # Compute current lag + current_lag = now - next_end + self.trigger.log( + message=f"Current lag {int(current_lag.total_seconds())} seconds.", + level="info", + ) + EVENTS_LAG.labels(intake_key=self.trigger.configuration.intake_key).set(int(current_lag.total_seconds())) + + # If the next end is in the future + if next_end > now: + # compute the max date allowed in the future and set the next_end according + current_difference = int((next_end - now).total_seconds()) + max_difference = min( + current_difference, int(self.frequency.total_seconds()) + ) # limit the end date in the future + next_end = now + datetime.timedelta(seconds=max_difference) + + self.trigger.log( + message=f"Timerange in the future. Waiting {max_difference} seconds for next batch.", + level="info", + ) + time.sleep(max_difference) + + self.start = self.end + self.end = next_end + + @classmethod + def create( + cls, + trigger: Trigger, + frequency: int = 60, + timedelta: int = 1, + start_time: int = 1, + ) -> "TimeStepper": + t_frequency = datetime.timedelta(seconds=frequency) + t_timedelta = datetime.timedelta(minutes=timedelta) + + if start_time == 0: + end = datetime.datetime.now(datetime.UTC) - t_timedelta + else: + end = datetime.datetime.now(datetime.UTC) - datetime.timedelta(hours=start_time) + + start = end - t_frequency + + return cls(trigger, start, end, t_frequency, t_timedelta) + + @classmethod + def create_from_time( + cls, + trigger: Trigger, + start: datetime.datetime, + frequency: int = 60, + timedelta: int = 1, + ) -> "TimeStepper": + t_frequency = datetime.timedelta(seconds=frequency) + t_timedelta = datetime.timedelta(minutes=timedelta) + + end = start + t_frequency + + return cls(trigger, start, end, t_frequency, t_timedelta)