Skip to content

Ubika : Add timestepper to connector#1664

Open
TOUFIKIzakarya wants to merge 6 commits intodevelopfrom
Add/ubika_timestepper
Open

Ubika : Add timestepper to connector#1664
TOUFIKIzakarya wants to merge 6 commits intodevelopfrom
Add/ubika_timestepper

Conversation

@TOUFIKIzakarya
Copy link
Collaborator

@TOUFIKIzakarya TOUFIKIzakarya commented Dec 9, 2025

Add timestepper to ubika
Related issue : issue

Summary by Sourcery

Introduce a time-stepped polling mechanism for Ubika Cloud Protector Next Gen connector and persist the last processed timestamp to control event retrieval windows.

New Features:

  • Add a configurable TimeStepper utility to generate time ranges for event collection based on frequency, timedelta, and start time.
  • Persist and reuse the most recent processed event time in connector context to resume polling from the correct point across runs.

Bug Fixes:

  • Avoid failures when AuthorizationError is raised without a secondary message argument by safely logging the error.

Enhancements:

  • Refactor the connector to fetch events between explicit start and end timestamps and drive batching via the new TimeStepper ranges.
  • Cap the historical resume point to at most one month in the past to limit backfill range and control lag metrics reporting.

Tests:

  • Adapt existing connector tests to the new time-ranged fetching API and implement comprehensive unit tests for the TimeStepper behavior, including lag logging and sleep handling.

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Dec 9, 2025

Reviewer's Guide

Introduce a reusable TimeStepper utility for time-range based polling and refactor the Ubika Cloud Protector Next Gen connector to use it for range-based event fetching, lag tracking, persistence of the last seen timestamp, and improved stop/error handling, while updating tests accordingly.

Sequence diagram for UbikaCloudProtectorNextGenConnector run loop with TimeStepper

sequenceDiagram
    participant Runner
    participant Connector as UbikaCloudProtectorNextGenConnector
    participant Stepper as TimeStepper
    participant UbikaAPI
    participant Context as PersistentJSON

    Runner->>Connector: run()
    Connector->>Connector: stepper (cached_property)
    loop for each range
        Stepper-->>Connector: ranges() yield (start, end)
        alt stop event is set
            Connector->>Runner: break loop
        else continue
            Connector->>Connector: next_batch(start, end)
            Note over Connector: convert start,end to ms timestamps
            loop fetch pages in range
                Connector->>UbikaAPI: GET /security-events
                UbikaAPI-->>Connector: list of events
                Connector->>Connector: filter_processed_events(events)
                Connector->>Connector: push events when batch full
            end
            Connector->>Connector: metrics FORWARD_EVENTS_DURATION
        end
        Connector->>Context: __enter__()
        Connector->>Context: set most_recent_date_seen = end.isoformat()
        Context-->>Connector: __exit__()
    end
    Connector->>Connector: save_events_cache()
Loading

Class diagram for TimeStepper and updated UbikaCloudProtectorNextGenConnector

classDiagram
    class UbikaCloudProtectorNextGenConnectorConfiguration {
        +str namespace
        +str refresh_token
        +int frequency
        +int chunk_size
        +int timedelta
        +int start_time
    }

    class UbikaCloudProtectorNextGenConnector {
        +str NAME
        +UbikaCloudProtectorNextGenConnectorConfiguration configuration
        +int from_timestamp
        +PersistentJSON context
        +PersistentJSON cache_context
        +int cache_size
        +Cache events_cache
        +cached_property stepper
        -Generator~list_dict~ __fetch_next_events(start_timestamp int, end_timestamp int)
        +Cache load_events_cache()
        +None next_batch(start datetime, end datetime)
        +None run()
    }

    class TimeStepper {
        +Trigger trigger
        +datetime start
        +datetime end
        +timedelta frequency
        +timedelta timedelta
        +__init__(trigger Trigger, start datetime, end datetime, frequency timedelta, timedelta timedelta)
        +Generator~tuple_datetime_datetime~ ranges()
        +create(trigger Trigger, frequency int, timedelta int, start_time int) TimeStepper
        +create_from_time(trigger Trigger, start datetime, frequency int, timedelta int) TimeStepper
    }

    class Trigger {
        +Any configuration
        +None log(message str, level str)
    }

    class PersistentJSON {
        +__init__(filename str, base_path str)
        +Any __enter__()
        +None __exit__(exc_type Any, exc_val Any, exc_tb Any)
    }

    class Cache {
        +int maxlen
        +append(item Any) None
        +__iter__() Iterator
    }

    UbikaCloudProtectorNextGenConnectorConfiguration <|-- UbikaCloudProtectorNextGenConnector : uses
    UbikaCloudProtectorNextGenConnector --> TimeStepper : stepper
    UbikaCloudProtectorNextGenConnector --> PersistentJSON : context
    UbikaCloudProtectorNextGenConnector --> PersistentJSON : cache_context
    UbikaCloudProtectorNextGenConnector --> Cache : events_cache
    TimeStepper --> Trigger : holds
Loading

File-Level Changes

Change Details Files
Refactor Ubika Cloud Protector Next Gen connector to fetch events over explicit time ranges driven by a shared TimeStepper and persist the last seen time in context.
  • Add PersistentJSON-backed context storage and a cached TimeStepper instance that is initialized from configuration or resumed from the last stored most_recent_date_seen with a one-month lookback cap.
  • Change the low-level event fetcher to accept start and end timestamps and include both filters.fromDate and filters.toDate in API requests.
  • Replace the old fetch_events timestamp loop and inline lag metric logic with a next_batch(start,end) method that operates on datetime ranges and relies on the TimeStepper for scheduling and lag computation.
  • Update the run loop to iterate over TimeStepper.ranges(), honour the connector stop event, and persist the end of each processed range back to context.
  • Improve AuthorizationError handling to log a meaningful message even when the error args tuple does not contain a second element.
Ubika/ubika_modules/connector_ubika_cloud_protector_next_gen.py
Introduce a reusable TimeStepper utility to generate time ranges, manage lag metrics, and handle sleep between batches.
  • Implement a TimeStepper class that yields successive (start,end) datetime ranges, advances by a configurable frequency, and logs current lag in seconds while updating the EVENTS_LAG metric.
  • Ensure TimeStepper limits how far into the future it schedules ranges by truncating next_end to at most one frequency ahead and sleeping only for that bounded interval, logging a waiting message when sleeping.
  • Provide factory constructors TimeStepper.create and TimeStepper.create_from_time to build steppers either from ‘now’ minus configurable offsets or from an explicit start datetime, with parameters for frequency and timedelta window size.
Ubika/ubika_modules/timestepper.py
Adapt existing Ubika connector tests and add a dedicated test suite for TimeStepper behavior.
  • Update connector tests to call the now-internal __fetch_next_events(start,end) instead of the removed fetch_events(), and to pass explicit start/end datetimes into next_batch().
  • Adjust test URL expectations to include both filters.fromDate and filters.toDate query parameters in the Ubika API calls.
  • Add comprehensive TimeStepper tests covering initialization, range sequencing, lag logging, sleep behavior when the next range is in the future, non-sleep behavior when it is in the past, factory constructors, and log messages for lag and waiting.
Ubika/tests/test_ubika_cloud_protector_next_gen.py
Ubika/tests/test_timestepper.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and found some issues that need to be addressed.

  • In TimeStepper.create, datetime.datetime.now(datetime.UTC) is called multiple times; consider capturing now once and deriving end/start from that single value to avoid subtle inconsistencies around boundaries.
  • The tests now reach into the private __fetch_next_events method via name-mangled access, which is brittle; it might be cleaner to keep a small public wrapper (or a non-dunder helper) that can be exercised by tests instead.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `TimeStepper.create`, `datetime.datetime.now(datetime.UTC)` is called multiple times; consider capturing `now` once and deriving `end`/`start` from that single value to avoid subtle inconsistencies around boundaries.
- The tests now reach into the private `__fetch_next_events` method via name-mangled access, which is brittle; it might be cleaner to keep a small public wrapper (or a non-dunder helper) that can be exercised by tests instead.

## Individual Comments

### Comment 1
<location> `Ubika/ubika_modules/connector_ubika_cloud_protector_next_gen.py:205-214` </location>
<code_context>
-                        most_recent_date_seen = last_event_timestamp
-
-                    yield next_events
-        finally:
-            # save the most recent date
-            if most_recent_date_seen > self.from_timestamp:
</code_context>

<issue_to_address>
**issue (bug_risk):** Persisting `most_recent_date_seen` in `finally` can skip unprocessed ranges after errors.

Because the context is updated in `finally`, `most_recent_date_seen` is moved to `end` even when `next_batch(start, end)` fails. On the next run, `TimeStepper.create_from_time` will start after this window and the failed interval may never be retried.

Only persist `most_recent_date_seen` after a successful batch. For example, move the context update into the `try` block after `next_batch` returns without raising, or guard it with a flag that’s set only on success.
</issue_to_address>

### Comment 2
<location> `Ubika/tests/test_ubika_cloud_protector_next_gen.py:198` </location>
<code_context>
+        start = datetime.fromtimestamp(1747326560, tz=timezone.utc)
+        end = datetime.fromtimestamp(1747326560, tz=timezone.utc)
+
+        trigger.next_batch(start, end)

         assert trigger.push_events_to_intakes.call_count == 1
</code_context>

<issue_to_address>
**suggestion (testing):** Missing tests that exercise `run()` with `TimeStepper` and context persistence of `most_recent_date_seen` (including one-month cap).

Current tests only call `next_batch(start, end)` directly and don’t validate the new `run()` behavior that depends on `TimeStepper.ranges()` and the persisted `context`. Please add an integration-style test that:

1. Mocks `TimeStepper.ranges()` to return known `(start, end)` ranges.
2. Runs `trigger.run()` until it stops (e.g., via `_stop_event`).
3. Asserts that `most_recent_date_seen` is written to context as the `end` of each range.
4. Runs a second time with `most_recent_date_seen` far in the past and asserts that the start time is `max(most_recent_date_seen, now - 30 days)`.

This will verify both the time-stepped polling behavior and the one-month cap across runs.

Suggested implementation:

```python
        start = datetime.fromtimestamp(1747326560, tz=timezone.utc)
        end = datetime.fromtimestamp(1747326560, tz=timezone.utc)

        with pytest.raises(AuthorizationError):
            trigger.next_batch(start=start, end=end)


def test_run_updates_most_recent_date_seen_with_time_stepper_ranges(trigger, mocker):
    """
    Integration-style test:
    - Mocks TimeStepper.ranges() to return known (start, end) ranges.
    - Runs trigger.run() once.
    - Asserts that most_recent_date_seen in the context is updated to the last range's end.
    """
    # Arrange
    first_start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc)
    first_end = datetime(2025, 1, 1, 1, 0, tzinfo=timezone.utc)
    second_start = datetime(2025, 1, 1, 1, 0, tzinfo=timezone.utc)
    second_end = datetime(2025, 1, 1, 2, 0, tzinfo=timezone.utc)
    ranges = [(first_start, first_end), (second_start, second_end)]

    # Ensure we start from a clean context
    # (adjust this if your trigger uses a different context structure)
    if getattr(trigger, "context", None) is None:
        trigger.context = {}
    else:
        trigger.context.clear()

    # Mock TimeStepper.ranges to return our known ranges
    mocker.patch.object(trigger.time_stepper, "ranges", return_value=ranges)

    # Make next_batch() a no-op to avoid network / side effects
    mocker.patch.object(trigger, "next_batch")

    # Stop after a single loop iteration
    stop_event = mocker.Mock()
    stop_event.is_set.side_effect = [False, True]
    trigger._stop_event = stop_event

    # Act
    trigger.run()

    # Assert that most_recent_date_seen was updated to the end of the last range
    assert "most_recent_date_seen" in trigger.context
    assert trigger.context["most_recent_date_seen"] == second_end


def test_run_respects_one_month_cap_on_start_time(trigger, mocker):
    """
    Integration-style test for the one-month cap:
    - Sets most_recent_date_seen far in the past.
    - Freezes "now".
    - Runs trigger.run().
    - Asserts that TimeStepper.ranges() is called with start = max(most_recent_date_seen, now - 30 days).
    """
    # Arrange: far past last-seen date
    fake_now = datetime(2025, 6, 1, 0, 0, tzinfo=timezone.utc)
    most_recent_date_seen = fake_now - timedelta(days=90)  # 3 months ago
    one_month_ago = fake_now - timedelta(days=30)
    expected_start = max(most_recent_date_seen, one_month_ago)

    # Initialize / reset context
    if getattr(trigger, "context", None) is None:
        trigger.context = {}
    else:
        trigger.context.clear()
    trigger.context["most_recent_date_seen"] = most_recent_date_seen

    # Patch datetime in the trigger module so that now() returns fake_now.
    # Adjust the module path if UbikaCloudProtectorNextGenTrigger lives elsewhere.
    datetime_mock = mocker.patch("Ubika.ubika_cloud_protector_next_gen.datetime")
    datetime_mock.now.return_value = fake_now
    datetime_mock.fromtimestamp.side_effect = datetime.fromtimestamp
    datetime_mock.side_effect = None  # ensure datetime.datetime usage works as expected
    datetime_mock.timezone = timezone

    # Capture the arguments used when building ranges
    ranges_mock = mocker.patch.object(trigger.time_stepper, "ranges", return_value=[])

    # Make next_batch() a no-op
    mocker.patch.object(trigger, "next_batch")

    # Stop after a single loop iteration
    stop_event = mocker.Mock()
    stop_event.is_set.side_effect = [False, True]
    trigger._stop_event = stop_event

    # Act
    trigger.run()

    # Assert TimeStepper.ranges() was called with a start time that respects the one-month cap
    assert ranges_mock.call_count == 1
    _, kwargs = ranges_mock.call_args
    start_arg = kwargs.get("start")
    assert start_arg == expected_start

```

The added tests assume the following about your implementation:

1. `trigger.context` is a mutable mapping (e.g. dict) where `most_recent_date_seen` is stored under the `"most_recent_date_seen"` key. If your trigger uses a different mechanism (e.g. `trigger.get_context()`, `trigger.persist_context()`, or a different key name), update the tests to read/write the context using the correct API.
2. `trigger.time_stepper` exists and has a `ranges(start=..., end=...)` method; if it is accessed differently, adjust the `mocker.patch.object(trigger.time_stepper, "ranges", ...)` calls.
3. The trigger's module path for patching `datetime` is `"Ubika.ubika_cloud_protector_next_gen"`. If the trigger class lives in a different module, change the `mocker.patch("Ubika.ubika_cloud_protector_next_gen.datetime")` call to the correct fully-qualified module path.
4. `trigger.run()` uses an attribute `_stop_event` with an `is_set()` method; if your stop condition is implemented differently, adapt the stop mocking logic in the tests accordingly.
</issue_to_address>

### Comment 3
<location> `Ubika/tests/test_ubika_cloud_protector_next_gen.py:215-216` </location>
<code_context>
+        start = datetime.fromtimestamp(1747326560, tz=timezone.utc)
+        end = datetime.fromtimestamp(1747326560, tz=timezone.utc)
+
         with pytest.raises(AuthorizationError):
-            trigger.next_batch()
+            trigger.next_batch(start=start, end=end)
</code_context>

<issue_to_address>
**issue (testing):** No test covers `AuthorizationError` raised with a single argument, which is part of the new logging behavior.

Since the change is specifically about logging, it would be good to assert the log output and cover both branches of the new expression. For example, patch `UbikaCloudProtectorNextGenConnector.log` to a `MagicMock`, have `requests.get` raise `AuthorizationError("some message")` (single argument), call `next_batch(start, end)`, and assert both that the exception is raised and that the logged message includes `"some message"` rather than attempting `err.args[1]`. Optionally, add a second test using a two-argument `AuthorizationError` to cover the other branch.
</issue_to_address>

### Comment 4
<location> `Ubika/tests/test_timestepper.py:198-207` </location>
<code_context>
+def test_ranges_logs_current_lag(mock_trigger):
</code_context>

<issue_to_address>
**suggestion (testing):** Extend `TimeStepper` lag tests to also assert that `EVENTS_LAG` metrics are updated correctly.

This test exercises the log output, but `TimeStepper.ranges()` also updates the `EVENTS_LAG` Prometheus metric, which remains untested.

Consider monkeypatching `ubika_modules.timestepper.EVENTS_LAG` with a mock exposing `labels().set(...)`, and assert that:
- `EVENTS_LAG.labels(intake_key=mock_trigger.configuration.intake_key)` is called with the expected intake key
- `.set()` is called with the integer number of seconds for `now - next_end`

That will directly cover the metric update path and catch regressions there.

Suggested implementation:

```python
def test_ranges_logs_current_lag(mock_trigger):
    start = datetime.datetime(2024, 1, 15, 10, 0, 0, tzinfo=datetime.UTC)
    end = datetime.datetime(2024, 1, 15, 10, 1, 0, tzinfo=datetime.UTC)
    frequency = datetime.timedelta(seconds=60)
    timedelta = datetime.timedelta(minutes=1)

    stepper = TimeStepper(mock_trigger, start, end, frequency, timedelta)

    with patch("ubika_modules.timestepper.datetime") as mock_datetime, \
         patch("ubika_modules.timestepper.time"), \
         patch("ubika_modules.timestepper.EVENTS_LAG") as mock_events_lag:
        mock_datetime.datetime.now.return_value = datetime.datetime(2024, 1, 15, 12, 0, 0, tzinfo=datetime.UTC)
        mock_datetime.UTC = datetime.UTC

        # Prepare mock for Prometheus metric: EVENTS_LAG.labels(...).set(...)
        mock_lag_metric = mock_events_lag.labels.return_value

```

Because I can't see the rest of `test_ranges_logs_current_lag`, you'll need to add the metric assertions immediately after the call that exercises `TimeStepper.ranges(...)` (and still inside the `with` block). For example, if the test currently does something like:

```python
with patch("ubika_modules.timestepper.datetime") as mock_datetime, \
     patch("ubika_modules.timestepper.time"), \
     patch("ubika_modules.timestepper.EVENTS_LAG") as mock_events_lag:
    mock_datetime.datetime.now.return_value = datetime.datetime(2024, 1, 15, 12, 0, 0, tzinfo=datetime.UTC)
    mock_datetime.UTC = datetime.UTC

    mock_lag_metric = mock_events_lag.labels.return_value

    # existing log assertions, e.g. caplog / stepper.ranges
    list(stepper.ranges(...))  # or however you trigger ranges()
```

then extend it with:

```python
    # Trigger the ranges iteration (existing code)
    ranges = list(stepper.ranges())

    # Assert Prometheus EVENTS_LAG metric is updated correctly
    mock_events_lag.labels.assert_called_once_with(intake_key=mock_trigger.configuration.intake_key)

    now = mock_datetime.datetime.now.return_value
    # With the test parameters, there is a single range [start, end),
    # so next_end for the lag computation should be `end`.
    expected_lag_seconds = int((now - end).total_seconds())

    mock_lag_metric.set.assert_called_once_with(expected_lag_seconds)
```

Adjust the `ranges = list(stepper.ranges())` line to match whatever the existing test already does to exercise `TimeStepper.ranges()`. The key parts are:
- `mock_events_lag.labels.assert_called_once_with(intake_key=mock_trigger.configuration.intake_key)`
- `mock_lag_metric.set.assert_called_once_with(expected_lag_seconds)` where `expected_lag_seconds` matches the logic `now - next_end` in `TimeStepper.ranges()`.
</issue_to_address>

### Comment 5
<location> `Ubika/tests/test_timestepper.py:77-86` </location>
<code_context>
+def test_ranges_sleeps_when_next_end_is_in_future(mock_trigger):
</code_context>

<issue_to_address>
**suggestion (testing):** Add a test for the case where `next_end` is significantly ahead of `now` to verify the `min(current_difference, frequency)` clamping logic.

This test only covers a slightly-future `next_end` and checks for a positive sleep duration, but it doesn’t hit the path where `current_difference` exceeds `frequency.total_seconds()` and the wait time is clamped.

Please add a test that:
- Configures `start`, `end`, and `frequency` so `next_end` is far in the future (e.g., `current_difference` is several times `frequency.total_seconds()`).
- Advances the generator until that branch is executed.
- Asserts that `sleep` is called with `frequency.total_seconds()` rather than the full `current_difference`.

That will validate the clamping behavior and protect against regressions.
</issue_to_address>

### Comment 6
<location> `Ubika/ubika_modules/timestepper.py:10` </location>
<code_context>
+from ubika_modules.metrics import EVENTS_LAG
+
+
+class TimeStepper:
+    def __init__(
+        self,
</code_context>

<issue_to_address>
**issue (complexity):** Consider splitting the pure time-range computation from logging/metrics side effects and clarifying factory parameter names to make TimeStepper simpler and more testable.

You can keep the `TimeStepper` abstraction while reducing complexity by separating responsibilities and simplifying flow.

### 1. Split range computation from side effects

Right now `ranges()` both:
- computes ranges
- logs lag, updates metrics, and sleeps

You can isolate the pure time-stepping logic into a small helper, and keep the side effects in a thin wrapper that `Trigger` uses.

```python
class TimeStepper:
    # __init__ unchanged

    def next_range(
        self,
        now: datetime.datetime | None = None,
    ) -> tuple[datetime.datetime, datetime.datetime, int]:
        """
        Returns (start, end, sleep_seconds) and advances internal state.
        No logging, no metrics, no sleeping.
        """
        if now is None:
            now = datetime.datetime.now(datetime.UTC) - self.timedelta

        # compute next range boundaries
        next_end = self.end + self.frequency

        # Compute current lag (in seconds)
        current_lag_seconds = int((now - next_end).total_seconds())

        # Bound next_end in the future
        if next_end > now:
            current_difference = int((next_end - now).total_seconds())
            max_difference = min(current_difference, int(self.frequency.total_seconds()))
            next_end = now + datetime.timedelta(seconds=max_difference)
            sleep_seconds = max_difference
        else:
            sleep_seconds = 0

        # advance internal state
        current_start, current_end = self.start, self.end
        self.start, self.end = self.end, next_end

        return current_start, current_end, sleep_seconds
```

Then make `ranges()` just the side-effectful wrapper:

```python
    def ranges(self) -> Generator[tuple[datetime.datetime, datetime.datetime], None, None]:
        while True:
            now = datetime.datetime.now(datetime.UTC) - self.timedelta
            start, end, sleep_seconds = self.next_range(now)

            current_lag_seconds = int((now - (end + self.frequency)).total_seconds())
            self.trigger.log(
                message=f"Current lag {current_lag_seconds} seconds.",
                level="info",
            )
            EVENTS_LAG.labels(
                intake_key=self.trigger.configuration.intake_key
            ).set(current_lag_seconds)

            if sleep_seconds > 0:
                self.trigger.log(
                    message=f"Timerange in the future. Waiting {sleep_seconds} seconds for next batch.",
                    level="info",
                )
                time.sleep(sleep_seconds)

            yield start, end
```

Benefits:
- `next_range()` is deterministic and easily testable (no sleep, no logging, no metrics).
- `ranges()` keeps current behavior but is now a simple loop orchestrating pure logic + side effects.

### 2. Clarify factory parameters and intent

The factories are doing time arithmetic but the meaning of `timedelta` and `start_time` is not obvious. You can improve readability with naming and small helpers without changing behavior:

```python
    @classmethod
    def create(
        cls,
        trigger: Trigger,
        frequency: int = 60,
        lag_minutes: int = 1,
        start_hours_ago: int = 1,
    ) -> "TimeStepper":
        frequency_delta = datetime.timedelta(seconds=frequency)
        lag_delta = datetime.timedelta(minutes=lag_minutes)

        now = datetime.datetime.now(datetime.UTC)

        if start_hours_ago == 0:
            end = now - lag_delta
        else:
            end = now - datetime.timedelta(hours=start_hours_ago)

        start = end - frequency_delta

        return cls(trigger, start, end, frequency_delta, lag_delta)
```

This keeps all functionality but:
- separates pure time-step logic from side effects
- makes the generator more testable
- makes the factory arguments self-explanatory without additional indirection.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines 205 to +214
)

except AuthorizationError as err:
self.log(f"Authorization error: {err.args[1]}", level="critical")
self.log(f"Authorization error: {err.args[1] if len(err.args)>1 else str(err)}", level="critical")
raise

def fetch_events(self) -> Generator[list, None, None]:
most_recent_date_seen = self.from_timestamp
current_lag: int = 0

try:
for next_events in self.__fetch_next_events(most_recent_date_seen):
if next_events:
# save the greatest date ever seen
last_event = max(next_events, key=lambda x: int(x["timestamp"]))
last_event_timestamp = int(last_event["timestamp"])

if last_event_timestamp > most_recent_date_seen:
most_recent_date_seen = last_event_timestamp

yield next_events
finally:
# save the most recent date
if most_recent_date_seen > self.from_timestamp:
self.from_timestamp = most_recent_date_seen

# save in context the most recent date seen
self.cursor.offset = self.from_timestamp

current_lag = time.time() - most_recent_date_seen

EVENTS_LAG.labels(intake_key=self.configuration.intake_key).set(current_lag)

def next_batch(self) -> None:
def next_batch(self, start: datetime, end: datetime) -> None:
# save the starting time
batch_start_time = time.time()
start_timestamp = int(start.timestamp() * 1000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Persisting most_recent_date_seen in finally can skip unprocessed ranges after errors.

Because the context is updated in finally, most_recent_date_seen is moved to end even when next_batch(start, end) fails. On the next run, TimeStepper.create_from_time will start after this window and the failed interval may never be retried.

Only persist most_recent_date_seen after a successful batch. For example, move the context update into the try block after next_batch returns without raising, or guard it with a flag that’s set only on success.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a reusable TimeStepper class to manage time-windowed event fetching for the Ubika Cloud Protector Next Gen connector, replacing the previous timestamp-based approach with a more sophisticated time-range mechanism that includes sleep/wait logic.

Key Changes

  • Added new TimeStepper class that generates time ranges with configurable frequency and handles delays when fetching events in the future
  • Refactored connector to use time ranges (start/end) instead of single timestamps for event queries
  • Updated state persistence to use most_recent_date_seen in a separate context file instead of checkpoint-based timestamps

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 10 comments.

File Description
Ubika/ubika_modules/timestepper.py New module implementing TimeStepper class with time range generation, lag tracking, and intelligent sleep logic
Ubika/ubika_modules/connector_ubika_cloud_protector_next_gen.py Refactored to use TimeStepper with time ranges; added configuration for timedelta/start_time; updated state persistence mechanism
Ubika/tests/test_timestepper.py Comprehensive test suite for TimeStepper functionality covering initialization, range generation, sleep behavior, and factory methods
Ubika/tests/test_ubika_cloud_protector_next_gen.py Updated existing tests to work with new time-range based API signature for next_batch()
Comments suppressed due to low confidence (1)

Ubika/ubika_modules/connector_ubika_cloud_protector_next_gen.py:2

  • Import of 'replace_errors' is not used.
from codecs import replace_errors


trigger.from_timestamp = 1747326567845
events = trigger.fetch_events()
events = trigger._UbikaCloudProtectorNextGenConnector__fetch_next_events(1747326567845, 1747326667845)
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test URL mock on line 141 is outdated and doesn't match the actual API call being made. The __fetch_next_events method now includes filters.toDate parameter (line 155 passes both start and end timestamps), but the mocked URL doesn't include this parameter. This test will likely fail. Update the mock URL to:

"https://api.ubika.io/rest/logs.ubika.io/v1/ns/sekoia/security-events?filters.fromDate=1747326567845&filters.toDate=1747326667845&pagination.realtime=True&pagination.pageSize=100"

Copilot uses AI. Check for mistakes.
Copy link
Collaborator

@squioc squioc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the fixes.
Some changes to apply


current_lag = time.time() - most_recent_date_seen

EVENTS_LAG.labels(intake_key=self.configuration.intake_key).set(current_lag)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this removal, we are losing the EVENTS_LAG, can you reintroduce it?

Comment on lines +261 to +262
with self.context as cache:
cache["most_recent_date_seen"] = end.isoformat()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest moving this code with the self.next_batch method and to move loop in the try-catch.
Currently, if we faces an issue with the API, we will continue to progress and lost events.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants